You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by pe...@apache.org on 2022/08/08 10:29:07 UTC

[pulsar] branch branch-2.10 updated: Fix failed test introduced by cherry-pick, according to https://github.com/apache/pulsar/pull/16971/files\#diff-6e09b03081dd6077066c65b656ed10be6125d2b7fc9a12c9a78d391afbb6b081R72-R99

This is an automated email from the ASF dual-hosted git repository.

penghui pushed a commit to branch branch-2.10
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.10 by this push:
     new 6a4a2c5f707 Fix failed test introduced by cherry-pick, according to https://github.com/apache/pulsar/pull/16971/files\#diff-6e09b03081dd6077066c65b656ed10be6125d2b7fc9a12c9a78d391afbb6b081R72-R99
6a4a2c5f707 is described below

commit 6a4a2c5f707282dd1008644585b2fdd62fc2f44b
Author: penghui <pe...@apache.org>
AuthorDate: Mon Aug 8 18:28:56 2022 +0800

    Fix failed test introduced by cherry-pick, according to https://github.com/apache/pulsar/pull/16971/files\#diff-6e09b03081dd6077066c65b656ed10be6125d2b7fc9a12c9a78d391afbb6b081R72-R99
---
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 25 +++++++++++++++-------
 1 file changed, 17 insertions(+), 8 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 2bc81c48f9b..c01514a85e6 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -251,27 +251,36 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
         ProducerImpl<byte[]> producer =
                 (ProducerImpl<byte[]>) pulsarClient.newProducer()
                         .topic("testProducerSemaphoreRelease")
-                        .sendTimeout(5, TimeUnit.SECONDS)
+                        .sendTimeout(2, TimeUnit.SECONDS)
                         .maxPendingMessages(pendingQueueSize)
                         .enableBatching(true)
-                        .batchingMaxPublishDelay(500, TimeUnit.MILLISECONDS)
-                        .batchingMaxBytes(12)
+                        .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                        .batchingMaxBytes(15)
                         .create();
         this.stopBroker();
         try {
             ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
-            Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer)
-                    .encryptMessage(any(),any());
+            // Make the pendingMessages not empty
+            spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
+            spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
 
             Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
             batchMessageContainerField.setAccessible(true);
-            BatchMessageContainerImpl batchMessageContainer = (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
+            BatchMessageContainerImpl batchMessageContainer =
+                    (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
             batchMessageContainer.setProducer(spyProducer);
-            spyProducer.send("semaphore-test".getBytes(StandardCharsets.UTF_8));
+            Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer)
+                    .encryptMessage(any(), any());
+
+            try {
+                spyProducer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
+            } catch (Exception e) {
+                throw PulsarClientException.unwrap(e);
+            }
 
             throw new IllegalStateException("can not reach here");
         } catch (PulsarClientException.TimeoutException ex) {
-            Assert.assertEquals(producer.getSemaphore().get().availablePermits(), 10);
+            Assert.assertEquals(producer.getSemaphore().get().availablePermits(), pendingQueueSize);
         }
     }
 }