You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by zh...@apache.org on 2022/08/08 02:26:26 UTC
[pulsar] branch branch-2.7 updated: Fix semaphore release duplicated in ProducerImpl (#16972)
This is an automated email from the ASF dual-hosted git repository.
zhangmingao pushed a commit to branch branch-2.7
in repository https://gitbox.apache.org/repos/asf/pulsar.git
The following commit(s) were added to refs/heads/branch-2.7 by this push:
new 43a8436ca6c Fix semaphore release duplicated in ProducerImpl (#16972)
43a8436ca6c is described below
commit 43a8436ca6cd6604cd25174d7388390bcd5d6b12
Author: lixinyang <84...@users.noreply.github.com>
AuthorDate: Mon Aug 8 10:26:19 2022 +0800
Fix semaphore release duplicated in ProducerImpl (#16972)
Co-authored-by: nicklixinyang <ni...@didiglobal.com>
---
.../pulsar/client/impl/ProducerSemaphoreTest.java | 43 ++++++++++++++++++++++
.../apache/pulsar/client/impl/ProducerImpl.java | 3 --
2 files changed, 43 insertions(+), 3 deletions(-)
diff --git a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
index 95d1c739db3..13d2ba6e13b 100644
--- a/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
+++ b/pulsar-broker/src/test/java/org/apache/pulsar/client/impl/ProducerSemaphoreTest.java
@@ -18,8 +18,10 @@
*/
package org.apache.pulsar.client.impl;
+import static org.mockito.ArgumentMatchers.any;
import java.lang.reflect.Field;
import java.nio.charset.StandardCharsets;
+import java.util.concurrent.TimeUnit;
import lombok.Cleanup;
import org.apache.pulsar.client.api.MessageId;
import org.apache.pulsar.client.api.ProducerConsumerBase;
@@ -27,6 +29,7 @@ import org.apache.pulsar.client.api.PulsarClientException;
import org.apache.pulsar.client.api.Schema;
import org.apache.pulsar.common.api.proto.PulsarApi;
import org.apache.pulsar.common.util.FutureUtil;
+import org.mockito.Mockito;
import org.testng.Assert;
import org.testng.annotations.AfterMethod;
import org.testng.annotations.BeforeMethod;
@@ -225,4 +228,44 @@ public class ProducerSemaphoreTest extends ProducerConsumerBase {
FutureUtil.waitForAll(futures).get();
Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
}
+
+ @Test(timeOut = 10_000)
+ public void testBatchMessageSendTimeoutProducerSemaphoreRelease() throws Exception {
+ final int pendingQueueSize = 10;
+ @Cleanup
+ ProducerImpl<byte[]> producer =
+ (ProducerImpl<byte[]>) pulsarClient.newProducer()
+ .topic("testProducerSemaphoreRelease")
+ .sendTimeout(2, TimeUnit.SECONDS)
+ .maxPendingMessages(pendingQueueSize)
+ .enableBatching(true)
+ .batchingMaxPublishDelay(100, TimeUnit.MILLISECONDS)
+ .batchingMaxBytes(15)
+ .create();
+ this.stopBroker();
+ try {
+ ProducerImpl<byte[]> spyProducer = Mockito.spy(producer);
+ // Make the pendingMessages not empty
+ spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
+ spyProducer.newMessage().value("semaphore-test".getBytes(StandardCharsets.UTF_8)).sendAsync();
+
+ Field batchMessageContainerField = ProducerImpl.class.getDeclaredField("batchMessageContainer");
+ batchMessageContainerField.setAccessible(true);
+ BatchMessageContainerImpl batchMessageContainer =
+ (BatchMessageContainerImpl) batchMessageContainerField.get(spyProducer);
+ batchMessageContainer.setProducer(spyProducer);
+ Mockito.doThrow(new PulsarClientException.CryptoException("crypto error")).when(spyProducer)
+ .encryptMessage(any(), any());
+
+ try {
+ spyProducer.newMessage().value("memory-test".getBytes(StandardCharsets.UTF_8)).sendAsync().get();
+ } catch (Exception e) {
+ throw PulsarClientException.unwrap(e);
+ }
+
+ throw new IllegalStateException("can not reach here");
+ } catch (PulsarClientException.TimeoutException ex) {
+ Assert.assertEquals(producer.getSemaphore().availablePermits(), pendingQueueSize);
+ }
+ }
}
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
index f421bf1f3c2..517de0e1a34 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ProducerImpl.java
@@ -1672,10 +1672,7 @@ public class ProducerImpl<T> extends ProducerBase<T> implements TimerTask, Conne
for (OpSendMsg opSendMsg : opSendMsgs) {
processOpSendMsg(opSendMsg);
}
- } catch (PulsarClientException e) {
- semaphore.release(batchMessageContainer.getNumMessagesInBatch());
} catch (Throwable t) {
- semaphore.release(batchMessageContainer.getNumMessagesInBatch());
log.warn("[{}] [{}] error while create opSendMsg by batch message container", topic, producerName, t);
}
}