You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2021/02/24 08:05:06 UTC
[nifi] branch main updated: NIFI-8198: ConsumeAMQP detects if Queue
is deleted on server
This is an automated email from the ASF dual-hosted git repository.
pvillard pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new f101a2b NIFI-8198: ConsumeAMQP detects if Queue is deleted on server
f101a2b is described below
commit f101a2bba51d518f7eb1fb929c18999ec6334a9a
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Thu Feb 4 22:31:00 2021 +0100
NIFI-8198: ConsumeAMQP detects if Queue is deleted on server
Signed-off-by: Pierre Villard <pi...@gmail.com>
This closes #4805.
---
.../org/apache/nifi/amqp/processors/AMQPConsumer.java | 10 ++++++++++
.../apache/nifi/amqp/processors/AMQPConsumerTest.java | 17 +++++++++++++++++
.../org/apache/nifi/amqp/processors/TestChannel.java | 6 ++++--
3 files changed, 31 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
index d2c47dc..8b47469 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
@@ -68,6 +68,16 @@ final class AMQPConsumer extends AMQPWorker {
Thread.currentThread().interrupt();
}
}
+
+ @Override
+ public void handleCancel(String consumerTag) throws IOException {
+ processorLog.error("Consumer has been cancelled by the broker, eg. due to deleted queue.");
+ try {
+ close();
+ } catch (Exception e) {
+ processorLog.error("Failed to close consumer.", e);
+ }
+ }
};
channel.basicConsume(queueName, autoAcknowledge, consumer);
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
index 8811053..323b0b1 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
@@ -17,8 +17,10 @@
package org.apache.nifi.amqp.processors;
import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertFalse;
import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
+import static org.junit.Assert.assertTrue;
import static org.mockito.Mockito.mock;
import java.io.IOException;
@@ -60,6 +62,21 @@ public class AMQPConsumerTest {
assertEquals(0, consumer.getResponseQueueSize());
}
+ @Test
+ public void testConsumerHandlesCancelling() throws IOException {
+ final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
+ final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
+
+ final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
+ final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog);
+
+ assertFalse(consumer.closed);
+
+ consumer.getChannel().basicCancel("queue1");
+
+ assertTrue(consumer.closed);
+ }
+
@Test(expected = IllegalArgumentException.class)
public void failOnNullConnection() throws IOException {
new AMQPConsumer(null, null, true, processorLog);
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
index 7eaceae..c68ffee 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
@@ -548,8 +548,10 @@ class TestChannel implements Channel {
@Override
public void basicCancel(String consumerTag) throws IOException {
- throw new UnsupportedOperationException("This method is not currently supported as it is not used by current API in testing");
-
+ // consumerMap is indexed by queue name so the passed in consumerTag parameter needs to be the name of the test queue
+ for (Consumer consumer: consumerMap.get(consumerTag)) {
+ consumer.handleCancel(consumerTag);
+ }
}
@Override