You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/08/08 02:26:26 UTC

[pulsar] branch branch-2.7 updated: Fix semaphore release duplicated in ProducerImpl (#16972)

This is an automated email from the ASF dual-hosted git repository.

zhangmingao pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git


The following commit(s) were added to refs/heads/branch-2.7 by this push:
     new 43a8436ca6c Fix semaphore release duplicated in ProducerImpl (#16972)
43a8436ca6c is described below

commit 43a8436ca6cd6604cd25174d7388390bcd5d6b12
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Mon Aug 8 10:26:19 2022 +0800

    Fix semaphore release duplicated in ProducerImpl (#16972)
    
    Co-authored-by: nicklixinyang <ni...@didiglobal.com>
---
 .../pulsar/client/impl/ProducerSemaphoreTest.java  | 43 ++++++++++++++++++++++
 .../apache/pulsar/client/impl/ProducerImpl.java    |  3 --
 2 files changed, 43 insertions(+), 3 deletions(-)

diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 95d1c739db3..13d2ba6e13b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -18,8 +18,10 @@
  */
 package org.apache.pulsar.client.impl;
 
+import static org.mockito.ArgumentMatchers.any;
 import java.lang.reflect.Field;
 import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
 import lombok.Cleanup;
 import org.apache.pulsar.client.api.MessageId;
 import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -27,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
 import org.apache.pulsar.client.api.Schema;
 import org.apache.pulsar.common.api.proto.PulsarApi;
 import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
 import org.testng.Assert;
 import org.testng.annotations.AfterMethod;
 import org.testng.annotations.BeforeMethod;
@@ -225,4 +228,44 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
         FutureUtil.waitForAll(futures).get();
         Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
     }
+
+    @Test(timeOut = 10_000)
+    public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception {
+        final int pendingQueueSize = 10;
+        @Cleanup
+        ProducerImpl<byte[]> producer =
+                (ProducerImpl<byte[]>) pulsarClient.newProducer()
+                        .topic("testProducerSemaphoreRelease")
+                        .sendTimeout(2, TimeUnit.SECONDS)
+                        .maxPendingMessages(pendingQueueSize)
+                        .enableBatching(true)
+                        .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+                        .batchingMaxBytes(15)
+                        .create();
+        this.stopBroker();
+        try {
+            ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+            // Make the pendingMessages not empty
+            spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
+            spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
+
+            Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+            batchMessageContainerField.setAccessible(true);
+            BatchMessageContainerImpl batchMessageContainer =
+                    (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
+            batchMessageContainer.setProducer(spyProducer);
+            Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer)
+                    .encryptMessage(any(), any());
+
+            try {
+                spyProducer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
+            } catch (Exception e) {
+                throw PulsarClientException.unwrap(e);
+            }
+
+            throw new IllegalStateException("can not reach here");
+        } catch (PulsarClientException.TimeoutException ex) {
+            Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
+        }
+    }
 }
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f421bf1f3c2..517de0e1a34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1672,10 +1672,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
                 for (OpSendMsg opSendMsg : opSendMsgs) {
                     processOpSendMsg(opSendMsg);
                 }
-            } catch (PulsarClientException e) {
-                semaphore.release(batchMessageContainer.getNumMessagesInBatch());
             } catch (Throwable t) {
-                semaphore.release(batchMessageContainer.getNumMessagesInBatch());
                 log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
             }
         }