You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/02/24 08:05:06 UTC

[nifi] branch main updated: NIFI-8198: ConsumeAMQP detects if Queue is deleted on server

This is an automated email from the ASF dual-hosted git repository.

pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new f101a2b  NIFI-8198: ConsumeAMQP detects if Queue is deleted on server
f101a2b is described below

commit f101a2bba51d518f7eb1fb929c18999ec6334a9a
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Thu Feb 4 22:31:00 2021 +0100

    NIFI-8198: ConsumeAMQP detects if Queue is deleted on server
    
    Signed-off-by: Pierre Villard <pi...@gmail.com>
    
    This closes #4805.
---
 .../org/apache/nifi/amqp/processors/AMQPConsumer.java   | 10 ++++++++++
 .../apache/nifi/amqp/processors/AMQPConsumerTest.java   | 17 +++++++++++++++++
 .../org/apache/nifi/amqp/processors/TestChannel.java    |  6 ++++--
 3 files changed, 31 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
index d2c47dc..8b47469 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
@@ -68,6 +68,16 @@ final class AMQPConsumer extends AMQPWorker {
                     Thread.currentThread().interrupt();
                 }
             }
+
+            @Override
+            public void handleCancel(String consumerTag) throws IOException {
+                processorLog.error("Consumer has been cancelled by the broker, eg. due to deleted queue.");
+                try {
+                    close();
+                } catch (Exception e) {
+                    processorLog.error("Failed to close consumer.", e);
+                }
+            }
         };
 
         channel.basicConsume(queueName, autoAcknowledge, consumer);
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
index 8811053..323b0b1 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
@@ -17,8 +17,10 @@
 package org.apache.nifi.amqp.processors;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
 import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
@@ -60,6 +62,21 @@ public class AMQPConsumerTest {
         assertEquals(0, consumer.getResponseQueueSize());
     }
 
+    @Test
+    public void testConsumerHandlesCancelling() throws IOException {
+        final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+        final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+
+        final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog);
+
+        assertFalse(consumer.closed);
+
+        consumer.getChannel().basicCancel("queue1");
+
+        assertTrue(consumer.closed);
+    }
+
     @Test(expected = IllegalArgumentException.class)
     public void failOnNullConnection() throws IOException {
         new AMQPConsumer(null, null, true, processorLog);
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
index 7eaceae..c68ffee 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
@@ -548,8 +548,10 @@ class TestChannel implements Channel {
 
     @Override
     public void basicCancel(String consumerTag) throws IOException {
-        throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
-
+        // consumerMap is indexed by queue name so the passed in consumerTag parameter needs to be the name of the test queue
+        for (Consumer consumer: consumerMap.get(consumerTag)) {
+            consumer.handleCancel(consumerTag);
+        }
     }
 
     @Override