You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@pulsar.apache.org by GitBox <gi...@apache.org> on 2019/01/15 15:15:59 UTC

[pulsar] Diff for: [GitHub] sijie merged pull request #3337: Bug fixes/Improvement for notify pending receive method

diff --git a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
index 648afb891e..c3d934da59 100644
--- a/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
+++ b/pulsar-client/src/main/java/org/apache/pulsar/client/impl/ConsumerImpl.java
@@ -910,27 +910,44 @@ void messageReceived(MessageIdData messageId, int redeliveryCount, ByteBuf heade
      * @param message
      */
     void notifyPendingReceivedCallback(final Message<T> message, Exception exception) {
-        if (!pendingReceives.isEmpty()) {
-            // fetch receivedCallback from queue
-            CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
-            if (exception == null) {
-                checkNotNull(message, "received message can't be null");
-                if (receivedFuture != null) {
-                    if (conf.getReceiverQueueSize() == 0) {
-                        // return message to receivedCallback
-                        receivedFuture.complete(message);
-                    } else {
-                        // increase permits for available message-queue
-                        Message<T> interceptMsg = beforeConsume(message);
-                        messageProcessed(interceptMsg);
-                        // return message to receivedCallback
-                        listenerExecutor.execute(() -> receivedFuture.complete(interceptMsg));
-                    }
-                }
-            } else {
-                listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
-            }
+        if (pendingReceives.isEmpty()) {
+            return;
+        }
+
+        // fetch receivedCallback from queue
+        final CompletableFuture<Message<T>> receivedFuture = pendingReceives.poll();
+        if (receivedFuture == null) {
+            return;
+        }
+
+        if (exception != null) {
+            listenerExecutor.execute(() -> receivedFuture.completeExceptionally(exception));
+            return;
+        }
+
+        if (message == null) {
+            IllegalStateException e = new IllegalStateException("received message can't be null");
+            listenerExecutor.execute(() -> receivedFuture.completeExceptionally(e));
+            return;
         }
+
+        if (conf.getReceiverQueueSize() == 0) {
+            // call interceptor and complete received callback
+            interceptAndComplete(message, receivedFuture);
+            return;
+        }
+
+        // increase permits for available message-queue
+        messageProcessed(message);
+        // call interceptor and complete received callback
+        interceptAndComplete(message, receivedFuture);
+    }
+
+    private void interceptAndComplete(final Message<T> message, final CompletableFuture<Message<T>> receivedFuture) {
+        // call proper interceptor
+        final Message<T> interceptMessage = beforeConsume(message);
+        // return message to receivedCallback
+        listenerExecutor.execute(() -> receivedFuture.complete(interceptMessage));
     }
 
     private void triggerZeroQueueSizeListener(final Message<T> message) {
diff --git a/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
new file mode 100644
index 0000000000..657b4f436a
--- /dev/null
+++ b/pulsar-client/src/test/java/org/apache/pulsar/client/impl/ConsumerImplTest.java
@@ -0,0 +1,136 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.pulsar.client.impl;
+
+import org.apache.pulsar.client.api.Consumer;
+import org.apache.pulsar.client.api.Message;
+import org.apache.pulsar.client.api.PulsarClientException;
+import org.apache.pulsar.client.impl.conf.ClientConfigurationData;
+import org.apache.pulsar.client.impl.conf.ConsumerConfigurationData;
+import org.testng.Assert;
+import org.testng.annotations.BeforeMethod;
+import org.testng.annotations.Test;
+
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.CompletionException;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import static org.mockito.Mockito.*;
+
+public class ConsumerImplTest {
+
+    private final ExecutorService executorService = Executors.newSingleThreadExecutor();
+    private ConsumerImpl<ConsumerImpl> consumer;
+    private ConsumerConfigurationData consumerConf;
+
+    @BeforeMethod
+    public void setUp() {
+        consumerConf = new ConsumerConfigurationData<>();
+        ClientConfigurationData clientConf = new ClientConfigurationData();
+        PulsarClientImpl client = mock(PulsarClientImpl.class);
+        CompletableFuture<ClientCnx> clientCnxFuture = new CompletableFuture<>();
+        CompletableFuture<Consumer<ConsumerImpl>> subscribeFuture = new CompletableFuture<>();
+        String topic = "non-persistent://tenant/ns1/my-topic";
+
+        // Mock connection for grabCnx()
+        when(client.getConnection(anyString())).thenReturn(clientCnxFuture);
+        clientConf.setOperationTimeoutMs(100);
+        clientConf.setStatsIntervalSeconds(0);
+        when(client.getConfiguration()).thenReturn(clientConf);
+
+        consumerConf.setSubscriptionName("test-sub");
+        consumer = new ConsumerImpl<ConsumerImpl>(client, topic, consumerConf,
+                executorService, -1, subscribeFuture, null, null);
+    }
+
+    @Test(invocationTimeOut = 1000)
+    public void testNotifyPendingReceivedCallback_EmptyQueueNotThrowsException() {
+        consumer.notifyPendingReceivedCallback(null, null);
+    }
+
+    @Test(invocationTimeOut = 1000)
+    public void testNotifyPendingReceivedCallback_CompleteWithException() {
+        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+        consumer.pendingReceives.add(receiveFuture);
+        Exception exception = new PulsarClientException.InvalidMessageException("some random exception");
+        consumer.notifyPendingReceivedCallback(null, exception);
+
+        try {
+            receiveFuture.join();
+        } catch (CompletionException e) {
+            // Completion exception must be the same we provided at calling time
+            Assert.assertEquals(e.getCause(), exception);
+        }
+
+        Assert.assertTrue(receiveFuture.isCompletedExceptionally());
+    }
+
+    @Test(invocationTimeOut = 1000)
+    public void testNotifyPendingReceivedCallback_CompleteWithExceptionWhenMessageIsNull() {
+        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+        consumer.pendingReceives.add(receiveFuture);
+        consumer.notifyPendingReceivedCallback(null, null);
+
+        try {
+            receiveFuture.join();
+        } catch (CompletionException e) {
+            Assert.assertEquals("received message can't be null", e.getCause().getMessage());
+        }
+
+        Assert.assertTrue(receiveFuture.isCompletedExceptionally());
+    }
+
+    @Test(invocationTimeOut = 1000)
+    public void testNotifyPendingReceivedCallback_InterceptorsWorksWithPrefetchDisabled() {
+        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+        MessageImpl message = mock(MessageImpl.class);
+        ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+
+        consumer.pendingReceives.add(receiveFuture);
+        consumerConf.setReceiverQueueSize(0);
+        doReturn(message).when(spy).beforeConsume(any());
+        spy.notifyPendingReceivedCallback(message, null);
+        Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+
+        verify(spy, times(1)).beforeConsume(message);
+        Assert.assertTrue(receiveFuture.isDone());
+        Assert.assertFalse(receiveFuture.isCompletedExceptionally());
+        Assert.assertEquals(receivedMessage, message);
+    }
+
+    @Test(invocationTimeOut = 1000)
+    public void testNotifyPendingReceivedCallback_WorkNormally() {
+        CompletableFuture<Message<ConsumerImpl>> receiveFuture = new CompletableFuture<>();
+        MessageImpl message = mock(MessageImpl.class);
+        ConsumerImpl<ConsumerImpl> spy = spy(consumer);
+
+        consumer.pendingReceives.add(receiveFuture);
+        doReturn(message).when(spy).beforeConsume(any());
+        doNothing().when(spy).messageProcessed(message);
+        spy.notifyPendingReceivedCallback(message, null);
+        Message<ConsumerImpl> receivedMessage = receiveFuture.join();
+
+        verify(spy, times(1)).beforeConsume(message);
+        verify(spy, times(1)).messageProcessed(message);
+        Assert.assertTrue(receiveFuture.isDone());
+        Assert.assertFalse(receiveFuture.isCompletedExceptionally());
+        Assert.assertEquals(receivedMessage, message);
+    }
+}


With regards,
Apache Git Services