You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/15 15:15:59 UTC
[pulsar] Diff for: [GitHub] sijie merged pull request #3337: Bug
fixes/Improvement for notify pending receive method
diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 648afb891e..c3d934da59 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -910,27 +910,44 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
* @param message
*/
void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
- if (!pendingReceives.isEmpty()) {
- // fetch receivedCallback from queue
- CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
- if (exception == null) {
- checkNotNull(message, "received message can't be null");
- if (receivedFuture != null) {
- if (conf.getReceiverQueueSize() == 0) {
- // return message to receivedCallback
- receivedFuture.complete(message);
- } else {
- // increase permits for available message-queue
- Message<T> interceptMsg = beforeConsume(message);
- messageProcessed(interceptMsg);
- // return message to receivedCallback
- listenerExecutor.execute(() -> receivedFuture.complete(interceptMsg));
- }
- }
- } else {
- listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
- }
+ if (pendingReceives.isEmpty()) {
+ return;
+ }
+
+ // fetch receivedCallback from queue
+ final CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+ if (receivedFuture == null) {
+ return;
+ }
+
+ if (exception != null) {
+ listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
+ return;
+ }
+
+ if (message == null) {
+ IllegalStateException e = new IllegalStateException("received message can't be null");
+ listenerExecutor.execute(() -> receivedFuture.completeExceptionally(e));
+ return;
}
+
+ if (conf.getReceiverQueueSize() == 0) {
+ // call interceptor and complete received callback
+ interceptAndComplete(message, receivedFuture);
+ return;
+ }
+
+ // increase permits for available message-queue
+ messageProcessed(message);
+ // call interceptor and complete received callback
+ interceptAndComplete(message, receivedFuture);
+ }
+
+ private void interceptAndComplete(final Message<T> message, final CompletableFuture<Message<T>> receivedFuture) {
+ // call proper interceptor
+ final Message<T> interceptMessage = beforeConsume(message);
+ // return message to receivedCallback
+ listenerExecutor.execute(() -> receivedFuture.complete(interceptMessage));
}
private void triggerZeroQueueSizeListener(final Message<T> message) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
new file mode 100644
index 0000000000..657b4f436a
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.mockito.Mockito.*;
+
+public class ConsumerImplTest {
+
+ private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+ private ConsumerImpl<ConsumerImpl> consumer;
+ private ConsumerConfigurationData consumerConf;
+
+ @BeforeMethod
+ public void setUp() {
+ consumerConf = new ConsumerConfigurationData<>();
+ ClientConfigurationData clientConf = new ClientConfigurationData();
+ PulsarClientImpl client = mock(PulsarClientImpl.class);
+ CompletableFuture<ClientCnx> clientCnxFuture = new CompletableFuture<>();
+ CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new CompletableFuture<>();
+ String topic = "non-persistent://tenant/ns1/my-topic";
+
+ // Mock connection for grabCnx()
+ when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
+ clientConf.setOperationTimeoutMs(100);
+ clientConf.setStatsIntervalSeconds(0);
+ when(client.getConfiguration()).thenReturn(clientConf);
+
+ consumerConf.setSubscriptionName("test-sub");
+ consumer = new ConsumerImpl<ConsumerImpl>(client, topic, consumerConf,
+ executorService, -1, subscribeFuture, null, null);
+ }
+
+ @Test(invocationTimeOut = 1000)
+ public void testNotifyPendingReceivedCallback_EmptyQueueNotThrowsException() {
+ consumer.notifyPendingReceivedCallback(null, null);
+ }
+
+ @Test(invocationTimeOut = 1000)
+ public void testNotifyPendingReceivedCallback_CompleteWithException() {
+ CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+ consumer.pendingReceives.add(receiveFuture);
+ Exception exception = new PulsarClientException.InvalidMessageException("some random exception");
+ consumer.notifyPendingReceivedCallback(null, exception);
+
+ try {
+ receiveFuture.join();
+ } catch (CompletionException e) {
+ // Completion exception must be the same we provided at calling time
+ Assert.assertEquals(e.getCause(), exception);
+ }
+
+ Assert.assertTrue(receiveFuture.isCompletedExceptionally());
+ }
+
+ @Test(invocationTimeOut = 1000)
+ public void testNotifyPendingReceivedCallback_CompleteWithExceptionWhenMessageIsNull() {
+ CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+ consumer.pendingReceives.add(receiveFuture);
+ consumer.notifyPendingReceivedCallback(null, null);
+
+ try {
+ receiveFuture.join();
+ } catch (CompletionException e) {
+ Assert.assertEquals("received message can't be null", e.getCause().getMessage());
+ }
+
+ Assert.assertTrue(receiveFuture.isCompletedExceptionally());
+ }
+
+ @Test(invocationTimeOut = 1000)
+ public void testNotifyPendingReceivedCallback_InterceptorsWorksWithPrefetchDisabled() {
+ CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+ MessageImpl message = mock(MessageImpl.class);
+ ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+
+ consumer.pendingReceives.add(receiveFuture);
+ consumerConf.setReceiverQueueSize(0);
+ doReturn(message).when(spy).beforeConsume(any());
+ spy.notifyPendingReceivedCallback(message, null);
+ Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+
+ verify(spy, times(1)).beforeConsume(message);
+ Assert.assertTrue(receiveFuture.isDone());
+ Assert.assertFalse(receiveFuture.isCompletedExceptionally());
+ Assert.assertEquals(receivedMessage, message);
+ }
+
+ @Test(invocationTimeOut = 1000)
+ public void testNotifyPendingReceivedCallback_WorkNormally() {
+ CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+ MessageImpl message = mock(MessageImpl.class);
+ ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+
+ consumer.pendingReceives.add(receiveFuture);
+ doReturn(message).when(spy).beforeConsume(any());
+ doNothing().when(spy).messageProcessed(message);
+ spy.notifyPendingReceivedCallback(message, null);
+ Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+
+ verify(spy, times(1)).beforeConsume(message);
+ verify(spy, times(1)).messageProcessed(message);
+ Assert.assertTrue(receiveFuture.isDone());
+ Assert.assertFalse(receiveFuture.isCompletedExceptionally());
+ Assert.assertEquals(receivedMessage, message);
+ }
+}
With regards,
Apache Git Services