You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2020/08/03 16:43:57 UTC

[nifi] branch main updated: NIFI-6312: Improved connection handling in AMQP processors Disable connection automatic recovery which can lead to uncontrolled/stale threads. Handle the recovery in the processors instead. Use poisoning in case of errors, then discarding and recreating the poisoned consumer/publisher. NIFI-6312: Use conventional exception handling instead of poisoning Use component logger in workers. Remove basicNack()/basicReject() calls as they are not needed because all unacknowledged messages will be [...]

This is an automated email from the ASF dual-hosted git repository.

joewitt pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 7b4cce9  NIFI-6312: Improved connection handling in AMQP processors Disable connection automatic recovery which can lead to uncontrolled/stale threads. Handle the recovery in the processors instead. Use poisoning in case of errors, then discarding and recreating the poisoned consumer/publisher. NIFI-6312: Use conventional exception handling instead of poisoning Use component logger in workers. Remove basicNack()/basicReject() calls as they are not needed because all unacknowledge [...]
7b4cce9 is described below

commit 7b4cce9e219b86e6c6ade357d57a5f9b87c5c70e
Author: Peter Turcsanyi <tu...@apache.org>
AuthorDate: Tue Jul 14 22:13:16 2020 +0200

    NIFI-6312: Improved connection handling in AMQP processors
    Disable connection automatic recovery which can lead to uncontrolled/stale threads. Handle the recovery in the processors instead.
    Use poisoning in case of errors, then discarding and recreating the poisoned consumer/publisher.
    NIFI-6312: Use conventional exception handling instead of poisoning
    Use component logger in workers.
    Remove basicNack()/basicReject() calls as they are not needed because all unacknowledged messages will be redelivered.
    NIFI-6312: Further improve exception handling and error logging.
    NIFI-6312: Fix consumer closing in previous commit
    NIFI-6312: Use custom executor with a single thread (no more is used by the processor)
    
    Reviewed by tamas palfy and simon bence
    
    Signed-off-by: Joe Witt <jo...@apache.org>
---
 .../apache/nifi/amqp/processors/AMQPConsumer.java  | 52 +++++++------
 .../apache/nifi/amqp/processors/AMQPException.java | 32 ++++++++
 .../apache/nifi/amqp/processors/AMQPPublisher.java | 40 +++++-----
 .../apache/nifi/amqp/processors/AMQPResource.java  | 29 +++++--
 .../amqp/processors/AMQPRollbackException.java     | 32 ++++++++
 .../apache/nifi/amqp/processors/AMQPWorker.java    | 25 +++---
 .../amqp/processors/AbstractAMQPProcessor.java     | 91 +++++++++++++++++-----
 .../apache/nifi/amqp/processors/ConsumeAMQP.java   | 35 ++++-----
 .../apache/nifi/amqp/processors/PublishAMQP.java   | 12 ++-
 .../nifi/amqp/processors/AMQPConsumerTest.java     | 30 ++++---
 .../nifi/amqp/processors/AMQPPublisherTest.java    |  4 +-
 .../nifi/amqp/processors/ConsumeAMQPTest.java      | 19 ++---
 .../nifi/amqp/processors/PublishAMQPTest.java      |  3 +-
 .../apache/nifi/amqp/processors/TestChannel.java   |  4 +
 14 files changed, 278 insertions(+), 130 deletions(-)

diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
index 8872e0c..d2c47dc 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPConsumer.java
@@ -21,9 +21,8 @@ import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 
 import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Channel;
@@ -39,30 +38,27 @@ import com.rabbitmq.client.GetResponse;
  */
 final class AMQPConsumer extends AMQPWorker {
 
-    private final static Logger logger = LoggerFactory.getLogger(AMQPConsumer.class);
     private final String queueName;
     private final BlockingQueue<GetResponse> responseQueue;
     private final boolean autoAcknowledge;
     private final Consumer consumer;
 
-    private volatile boolean closed = false;
-
-
-    AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge) throws IOException {
-        super(connection);
+    AMQPConsumer(final Connection connection, final String queueName, final boolean autoAcknowledge, ComponentLog processorLog) throws IOException {
+        super(connection, processorLog);
         this.validateStringProperty("queueName", queueName);
         this.queueName = queueName;
         this.autoAcknowledge = autoAcknowledge;
         this.responseQueue = new LinkedBlockingQueue<>(10);
 
-        logger.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue");
+        processorLog.info("Successfully connected AMQPConsumer to " + connection.toString() + " and '" + queueName + "' queue");
 
         final Channel channel = getChannel();
         consumer = new DefaultConsumer(channel) {
             @Override
             public void handleDelivery(final String consumerTag, final Envelope envelope, final BasicProperties properties, final byte[] body) throws IOException {
-                if (!autoAcknowledge && closed) {
-                    channel.basicReject(envelope.getDeliveryTag(), true);
+                if (closed) {
+                    // simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again
+                    processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{envelope.getDeliveryTag()});
                     return;
                 }
 
@@ -78,8 +74,8 @@ final class AMQPConsumer extends AMQPWorker {
     }
 
     // Visible for unit tests
-    protected Consumer getConsumer() {
-        return consumer;
+    int getResponseQueueSize() {
+        return responseQueue.size();
     }
 
     /**
@@ -96,26 +92,32 @@ final class AMQPConsumer extends AMQPWorker {
         return responseQueue.poll();
     }
 
-    public void acknowledge(final GetResponse response) throws IOException {
+    public void acknowledge(final GetResponse response) {
         if (autoAcknowledge) {
             return;
         }
 
-        getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true);
+        try {
+            getChannel().basicAck(response.getEnvelope().getDeliveryTag(), true);
+        } catch (Exception e) {
+            throw new AMQPException("Failed to acknowledge message", e);
+        }
     }
 
     @Override
     public void close() throws TimeoutException, IOException {
-        closed = true;
-
-        GetResponse lastMessage = null;
-        GetResponse response;
-        while ((response = responseQueue.poll()) != null) {
-            lastMessage = response;
-        }
-
-        if (lastMessage != null) {
-            getChannel().basicNack(lastMessage.getEnvelope().getDeliveryTag(), true, true);
+        try {
+            super.close();
+        } finally {
+            try {
+                GetResponse response;
+                while ((response = responseQueue.poll()) != null) {
+                    // simply discard the messages, all unacknowledged messages will be redelivered by the broker when the consumer connects again
+                    processorLog.info("Consumer is closed, discarding message (delivery tag: {}).", new Object[]{response.getEnvelope().getDeliveryTag()});
+                }
+            } catch (Exception e) {
+                processorLog.error("Failed to drain response queue.");
+            }
         }
     }
 
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPException.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPException.java
new file mode 100644
index 0000000..18f8b34
--- /dev/null
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+/**
+ * Exception to indicate an AMQP related error when the FlowFile should not be tried to process again but it should be sent to failure.
+ * AMQPException and AMQPRollbackException are not interchangeable because of the difference in the expected error handling.
+ */
+public class AMQPException extends RuntimeException {
+
+    public AMQPException(String message) {
+        super(message);
+    }
+
+    public AMQPException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
index 553fc83..fed998c 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPPublisher.java
@@ -17,11 +17,12 @@
 package org.apache.nifi.amqp.processors;
 
 import java.io.IOException;
+import java.net.SocketException;
 
+import com.rabbitmq.client.AlreadyClosedException;
 import org.apache.nifi.logging.ComponentLog;
 
 import com.rabbitmq.client.AMQP.BasicProperties;
-import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
 import com.rabbitmq.client.ReturnListener;
 
@@ -31,7 +32,6 @@ import com.rabbitmq.client.ReturnListener;
  */
 final class AMQPPublisher extends AMQPWorker {
 
-    private final ComponentLog processLog;
     private final String connectionString;
 
     /**
@@ -39,11 +39,12 @@ final class AMQPPublisher extends AMQPWorker {
      *
      * @param connection instance of AMQP {@link Connection}
      */
-    AMQPPublisher(Connection connection, ComponentLog processLog) {
-        super(connection);
-        this.processLog = processLog;
+    AMQPPublisher(Connection connection, ComponentLog processorLog) {
+        super(connection, processorLog);
         getChannel().addReturnListener(new UndeliverableMessageLogger());
         this.connectionString = connection.toString();
+
+        processorLog.info("Successfully connected AMQPPublisher to " + this.connectionString);
     }
 
     /**
@@ -60,21 +61,21 @@ final class AMQPPublisher extends AMQPWorker {
     void publish(byte[] bytes, BasicProperties properties, String routingKey, String exchange) {
         this.validateStringProperty("routingKey", routingKey);
         exchange = exchange == null ? "" : exchange.trim();
-        if (exchange.length() == 0) {
-            processLog.info("The 'exchangeName' is not specified. Messages will be sent to default exchange");
-        }
-        processLog.info("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
-                + "' exchange with '" + routingKey + "' as a routing key.");
 
-        final Channel channel = getChannel();
-        if (channel.isOpen()) {
-            try {
-                channel.basicPublish(exchange, routingKey, true, properties, bytes);
-            } catch (Exception e) {
-                throw new IllegalStateException("Failed to publish to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
+        if (processorLog.isDebugEnabled()) {
+            if (exchange.length() == 0) {
+                processorLog.debug("The 'exchangeName' is not specified. Messages will be sent to default exchange");
             }
-        } else {
-            throw new IllegalStateException("This instance of AMQPPublisher is invalid since its publishingChannel is closed");
+            processorLog.debug("Successfully connected AMQPPublisher to " + this.connectionString + " and '" + exchange
+                    + "' exchange with '" + routingKey + "' as a routing key.");
+        }
+
+        try {
+            getChannel().basicPublish(exchange, routingKey, true, properties, bytes);
+        } catch (AlreadyClosedException | SocketException e) {
+            throw new AMQPRollbackException("Failed to publish message because the AMQP connection is lost or has been closed", e);
+        } catch (Exception e) {
+            throw new AMQPException("Failed to publish message to Exchange '" + exchange + "' with Routing Key '" + routingKey + "'.", e);
         }
     }
 
@@ -100,8 +101,7 @@ final class AMQPPublisher extends AMQPWorker {
         public void handleReturn(int replyCode, String replyText, String exchangeName, String routingKey, BasicProperties properties, byte[] message) throws IOException {
             String logMessage = "Message destined for '" + exchangeName + "' exchange with '" + routingKey
                     + "' as routing key came back with replyCode=" + replyCode + " and replyText=" + replyText + ".";
-            processLog.warn(logMessage);
-            AMQPPublisher.this.processLog.warn(logMessage);
+            processorLog.warn(logMessage);
         }
     }
 }
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java
index 2319e7a..96cea27 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPResource.java
@@ -19,17 +19,19 @@ package org.apache.nifi.amqp.processors;
 
 import java.io.Closeable;
 import java.io.IOException;
-import java.util.concurrent.TimeoutException;
+import java.util.concurrent.ExecutorService;
 
 import com.rabbitmq.client.Connection;
 
 public class AMQPResource<T extends AMQPWorker> implements Closeable {
     private final Connection connection;
+    private final ExecutorService executor;
     private final T worker;
 
-    public AMQPResource(final Connection connection, final T worker) {
+    public AMQPResource(final Connection connection, final T worker, final ExecutorService executor) {
         this.connection = connection;
         this.worker = worker;
+        this.executor = executor;
     }
 
     public Connection getConnection() {
@@ -48,23 +50,40 @@ public class AMQPResource<T extends AMQPWorker> implements Closeable {
             worker.close();
         } catch (final IOException e) {
             ioe = e;
-        } catch (final TimeoutException e) {
+        } catch (final Exception e) {
             ioe = new IOException(e);
         }
 
         try {
-            connection.close();
+            if (connection.isOpen()) {
+                connection.close();
+            }
         } catch (final IOException e) {
             if (ioe == null) {
                 ioe = e;
             } else {
                 ioe.addSuppressed(e);
             }
+        } catch (final Exception e) {
+            if (ioe == null) {
+                ioe = new IOException(e);
+            } else {
+                ioe.addSuppressed(e);
+            }
+        }
+
+        try {
+            executor.shutdown();
+        } catch (final Exception e) {
+            if (ioe == null) {
+                ioe = new IOException(e);
+            } else {
+                ioe.addSuppressed(e);
+            }
         }
 
         if (ioe != null) {
             throw ioe;
         }
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPRollbackException.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPRollbackException.java
new file mode 100644
index 0000000..3324b40
--- /dev/null
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPRollbackException.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.amqp.processors;
+
+/**
+ * Exception to indicate an AMQP related error when the FlowFile should be tried to process again so the NiFi session should be rolled back.
+ * AMQPRollbackException and AMQPException are not interchangeable because of the difference in the expected error handling.
+ */
+public class AMQPRollbackException extends RuntimeException {
+
+    public AMQPRollbackException(String message) {
+        super(message);
+    }
+
+    public AMQPRollbackException(String message, Throwable cause) {
+        super(message, cause);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
index d17ea0d..fca0f50 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AMQPWorker.java
@@ -19,8 +19,7 @@ package org.apache.nifi.amqp.processors;
 import java.io.IOException;
 import java.util.concurrent.TimeoutException;
 
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import org.apache.nifi.logging.ComponentLog;
 
 import com.rabbitmq.client.Channel;
 import com.rabbitmq.client.Connection;
@@ -33,9 +32,10 @@ import com.rabbitmq.client.Connection;
  */
 abstract class AMQPWorker implements AutoCloseable {
 
-    private final static Logger logger = LoggerFactory.getLogger(AMQPWorker.class);
+    protected final ComponentLog processorLog;
+
     private final Channel channel;
-    private boolean closed = false;
+    protected volatile boolean closed = false;
 
     /**
      * Creates an instance of this worker initializing it with AMQP
@@ -44,13 +44,15 @@ abstract class AMQPWorker implements AutoCloseable {
      *
      * @param connection instance of {@link Connection}
      */
-    public AMQPWorker(final Connection connection) {
+    public AMQPWorker(final Connection connection, ComponentLog processorLog) {
+        this.processorLog = processorLog;
+
         validateConnection(connection);
 
         try {
             this.channel = connection.createChannel();
         } catch (IOException e) {
-            logger.error("Failed to create Channel for " + connection, e);
+            processorLog.error("Failed to create Channel for " + connection, e);
             throw new IllegalStateException(e);
         }
     }
@@ -59,18 +61,19 @@ abstract class AMQPWorker implements AutoCloseable {
         return channel;
     }
 
-
     @Override
     public void close() throws TimeoutException, IOException {
         if (closed) {
             return;
         }
 
-        if (logger.isDebugEnabled()) {
-            logger.debug("Closing AMQP channel for " + this.channel.getConnection().toString());
-        }
+        if (channel.isOpen()) {
+            if (processorLog.isDebugEnabled()) {
+                processorLog.debug("Closing AMQP channel for " + this.channel.getConnection().toString());
+            }
 
-        this.channel.close();
+            this.channel.close();
+        }
         closed = true;
     }
 
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index 3344dc9..c947b7a 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -24,8 +24,14 @@ import java.util.Collection;
 import java.util.Collections;
 import java.util.List;
 import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
 import java.util.concurrent.LinkedBlockingQueue;
 import javax.net.ssl.SSLContext;
+
+import com.rabbitmq.client.impl.DefaultExceptionHandler;
+import org.apache.commons.lang3.concurrent.BasicThreadFactory;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
@@ -139,8 +145,12 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
         return propertyDescriptors;
     }
 
-    private final BlockingQueue<AMQPResource<T>> resourceQueue = new LinkedBlockingQueue<>();
+    private BlockingQueue<AMQPResource<T>> resourceQueue;
 
+    @OnScheduled
+    public void onScheduled(ProcessContext context) {
+        resourceQueue = new LinkedBlockingQueue<>(context.getMaxConcurrentTasks());
+    }
 
     @Override
     protected Collection<ValidationResult> customValidate(ValidationContext context) {
@@ -190,33 +200,49 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
     public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         AMQPResource<T> resource = resourceQueue.poll();
         if (resource == null) {
-            resource = createResource(context);
+            try {
+                resource = createResource(context);
+            } catch (Exception e) {
+                getLogger().error("Failed to initialize AMQP client", e);
+                context.yield();
+                return;
+            }
         }
 
         try {
             processResource(resource.getConnection(), resource.getWorker(), context, session);
-            resourceQueue.offer(resource);
-        } catch (final Exception e) {
-            try {
-                resource.close();
-            } catch (final Exception e2) {
-                e.addSuppressed(e2);
-            }
 
-            throw e;
+            if (!resourceQueue.offer(resource)) {
+                getLogger().info("Worker queue is full, closing AMQP client");
+                closeResource(resource);
+            }
+        } catch (AMQPException | AMQPRollbackException e) {
+            getLogger().error("AMQP failure, dropping the client", e);
+            context.yield();
+            closeResource(resource);
+        } catch (Exception e) {
+            getLogger().error("Processor failure", e);
+            context.yield();
         }
     }
 
 
     @OnStopped
     public void close() {
-        AMQPResource<T> resource;
-        while ((resource = resourceQueue.poll()) != null) {
-            try {
-                resource.close();
-            } catch (final Exception e) {
-                getLogger().warn("Failed to close AMQP Connection", e);
+        if (resourceQueue != null) {
+            AMQPResource<T> resource;
+            while ((resource = resourceQueue.poll()) != null) {
+                closeResource(resource);
             }
+            resourceQueue = null;
+        }
+    }
+
+    private void closeResource(AMQPResource<T> resource) {
+        try {
+            resource.close();
+        } catch (Exception e) {
+            getLogger().error("Failed to close AMQP Connection", e);
         }
     }
 
@@ -235,13 +261,28 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
 
 
     private AMQPResource<T> createResource(final ProcessContext context) {
-        final Connection connection = createConnection(context);
-        final T worker = createAMQPWorker(context, connection);
-        return new AMQPResource<>(connection, worker);
+        Connection connection = null;
+        try {
+            ExecutorService executor = Executors.newSingleThreadExecutor(new BasicThreadFactory.Builder()
+                    .namingPattern("AMQP Consumer: " + getIdentifier())
+                    .build());
+            connection = createConnection(context, executor);
+            T worker = createAMQPWorker(context, connection);
+            return new AMQPResource<>(connection, worker, executor);
+        } catch (Exception e) {
+            if (connection != null && connection.isOpen()) {
+                try {
+                    connection.close();
+                } catch (Exception closingEx) {
+                    getLogger().error("Failed to close AMQP Connection", closingEx);
+                }
+            }
+            throw e;
+        }
     }
 
 
-    protected Connection createConnection(ProcessContext context) {
+    protected Connection createConnection(ProcessContext context, ExecutorService executor) {
         final ConnectionFactory cf = new ConnectionFactory();
         cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
         cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
@@ -268,8 +309,16 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
             }
         }
 
+        cf.setAutomaticRecoveryEnabled(false);
+        cf.setExceptionHandler(new DefaultExceptionHandler() {
+            @Override
+            public void handleUnexpectedConnectionDriverException(Connection conn, Throwable exception) {
+                getLogger().error("Connection lost to server {}:{}.", new Object[]{conn.getAddress(), conn.getPort()}, exception);
+            }
+        });
+
         try {
-            Connection connection = cf.newConnection();
+            Connection connection = cf.newConnection(executor);
             return connection;
         } catch (Exception e) {
             throw new IllegalStateException("Failed to establish connection with AMQP Broker: " + cf.toString(), e);
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
index 3af4ee9..6c83a20 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/ConsumeAMQP.java
@@ -75,9 +75,12 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
         .build();
     static final PropertyDescriptor AUTO_ACKNOWLEDGE = new PropertyDescriptor.Builder()
         .name("auto.acknowledge")
-        .displayName("Auto-Acknowledge messages")
-        .description("If true, messages that are received will be auto-acknowledged by the AMQP Broker. "
-            + "This generally will provide better throughput but could result in messages being lost upon restart of NiFi")
+        .displayName("Auto-Acknowledge Messages")
+        .description(" If false (Non-Auto-Acknowledge), the messages will be acknowledged by the processor after transferring the FlowFiles to success and committing "
+            + "the NiFi session. Non-Auto-Acknowledge mode provides 'at-least-once' delivery semantics. "
+            + "If true (Auto-Acknowledge), messages that are delivered to the AMQP Client will be auto-acknowledged by the AMQP Broker just after sending them out. "
+            + "This generally will provide better throughput but will also result in messages being lost upon restart/crash of the AMQP Broker, NiFi or the processor. "
+            + "Auto-Acknowledge mode provides 'at-most-once' delivery semantics and it is recommended only if loosing messages is acceptable.")
         .allowableValues("true", "false")
         .defaultValue("false")
         .required(true)
@@ -85,8 +88,8 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
     static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
         .name("batch.size")
         .displayName("Batch Size")
-        .description("The maximum number of messages that should be pulled in a single session. Once this many messages have been received (or once no more messages are readily available), "
-            + "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged with the AMQP Broker. Setting this value to a larger number "
+        .description("The maximum number of messages that should be processed in a single session. Once this many messages have been received (or once no more messages are readily available), "
+            + "the messages received will be transferred to the 'success' relationship and the messages will be acknowledged to the AMQP Broker. Setting this value to a larger number "
             + "could result in better performance, particularly for very small messages, but can also result in more messages being duplicated upon sudden restart of NiFi.")
         .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
         .expressionLanguageSupported(ExpressionLanguageScope.NONE)
@@ -124,6 +127,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
     protected void processResource(final Connection connection, final AMQPConsumer consumer, final ProcessContext context, final ProcessSession session) {
         GetResponse lastReceived = null;
 
+        if (!connection.isOpen() || !consumer.getChannel().isOpen()) {
+            throw new AMQPException("AMQP client has lost connection.");
+        }
+
         for (int i = 0; i < context.getProperty(BATCH_SIZE).asInteger(); i++) {
             final GetResponse response = consumer.consume();
             if (response == null) {
@@ -147,14 +154,9 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
             lastReceived = response;
         }
 
-        session.commit();
-
         if (lastReceived != null) {
-            try {
-                consumer.acknowledge(lastReceived);
-            } catch (IOException e) {
-                throw new ProcessException("Failed to acknowledge message", e);
-            }
+            session.commit();
+            consumer.acknowledge(lastReceived);
         }
     }
 
@@ -190,17 +192,10 @@ public class ConsumeAMQP extends AbstractAMQPProcessor<AMQPConsumer> {
         try {
             final String queueName = context.getProperty(QUEUE).getValue();
             final boolean autoAcknowledge = context.getProperty(AUTO_ACKNOWLEDGE).asBoolean();
-            final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge);
+            final AMQPConsumer amqpConsumer = new AMQPConsumer(connection, queueName, autoAcknowledge, getLogger());
 
             return amqpConsumer;
         } catch (final IOException ioe) {
-            try {
-                connection.close();
-                getLogger().warn("Closed connection at port " + connection.getPort());
-            } catch (final IOException ioeClose) {
-                throw new ProcessException("Failed to close connection at port " + connection.getPort());
-            }
-
             throw new ProcessException("Failed to connect to AMQP Broker", ioe);
         }
     }
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
index 4dbb7ed..1520d1b 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/PublishAMQP.java
@@ -155,12 +155,16 @@ public class PublishAMQP extends AbstractAMQPProcessor<AMQPPublisher> {
 
         try {
             publisher.publish(messageContent, amqpProperties, routingKey, exchange);
-            session.transfer(flowFile, REL_SUCCESS);
-            session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
-        } catch (Exception e) {
+        } catch (AMQPRollbackException e) {
+            session.rollback();
+            throw e;
+        } catch (AMQPException e) {
             session.transfer(session.penalize(flowFile), REL_FAILURE);
-            getLogger().error("Failed while sending message to AMQP via " + publisher, e);
+            throw e;
         }
+
+        session.transfer(flowFile, REL_SUCCESS);
+        session.getProvenanceReporter().send(flowFile, connection.toString() + "/E:" + exchange + "/RK:" + routingKey);
     }
 
 
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
index 87278fd..8811053 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPConsumerTest.java
@@ -16,9 +16,10 @@
  */
 package org.apache.nifi.amqp.processors;
 
+import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
-import static org.junit.Assert.assertTrue;
+import static org.mockito.Mockito.mock;
 
 import java.io.IOException;
 import java.util.Arrays;
@@ -28,6 +29,8 @@ import java.util.List;
 import java.util.Map;
 import java.util.concurrent.TimeoutException;
 
+import org.apache.nifi.logging.ComponentLog;
+import org.junit.Before;
 import org.junit.Test;
 
 import com.rabbitmq.client.AMQP.BasicProperties;
@@ -36,41 +39,48 @@ import com.rabbitmq.client.GetResponse;
 
 public class AMQPConsumerTest {
 
+    private ComponentLog processorLog;
+
+    @Before
+    public void setUp() {
+        processorLog = mock(ComponentLog.class);
+    }
 
     @Test
-    public void testUnconsumedMessagesNacked() throws TimeoutException, IOException {
+    public void testResponseQueueDrained() throws TimeoutException, IOException {
         final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1", "queue2"));
         final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
 
         final TestConnection connection = new TestConnection(exchangeToRoutingKeymap, routingMap);
-        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true);
+        final AMQPConsumer consumer = new AMQPConsumer(connection, "queue1", true, processorLog);
         consumer.getChannel().basicPublish("myExchange", "key1", new BasicProperties(), new byte[0]);
 
         consumer.close();
-        assertTrue(((TestChannel) consumer.getChannel()).isNack(0));
+
+        assertEquals(0, consumer.getResponseQueueSize());
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void failOnNullConnection() throws IOException {
-        new AMQPConsumer(null, null, true);
+        new AMQPConsumer(null, null, true, processorLog);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void failOnNullQueueName() throws Exception {
         Connection conn = new TestConnection(null, null);
-        new AMQPConsumer(conn, null, true);
+        new AMQPConsumer(conn, null, true, processorLog);
     }
 
     @Test(expected = IllegalArgumentException.class)
     public void failOnEmptyQueueName() throws Exception {
         Connection conn = new TestConnection(null, null);
-        new AMQPConsumer(conn, " ", true);
+        new AMQPConsumer(conn, " ", true, processorLog);
     }
 
     @Test(expected = IOException.class)
     public void failOnNonExistingQueue() throws Exception {
         Connection conn = new TestConnection(null, null);
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true)) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "hello", true, processorLog)) {
             consumer.consume();
         }
     }
@@ -83,7 +93,7 @@ public class AMQPConsumerTest {
         exchangeToRoutingKeymap.put("", "queue1");
 
         Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) {
             GetResponse response = consumer.consume();
             assertNull(response);
         }
@@ -98,7 +108,7 @@ public class AMQPConsumerTest {
 
         Connection conn = new TestConnection(exchangeToRoutingKeymap, routingMap);
         conn.createChannel().basicPublish("myExchange", "key1", null, "hello Joe".getBytes());
-        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true)) {
+        try (AMQPConsumer consumer = new AMQPConsumer(conn, "queue1", true, processorLog)) {
             GetResponse response = consumer.consume();
             assertNotNull(response);
         }
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
index 51bd59f..db3acd0 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/AMQPPublisherTest.java
@@ -43,7 +43,7 @@ public class AMQPPublisherTest {
         new AMQPPublisher(null, null);
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = AMQPRollbackException.class)
     public void failPublishIfChannelClosed() throws Exception {
         Connection conn = new TestConnection(null, null);
         try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
@@ -52,7 +52,7 @@ public class AMQPPublisherTest {
         }
     }
 
-    @Test(expected = IllegalStateException.class)
+    @Test(expected = AMQPException.class)
     public void failPublishIfChannelFails() throws Exception {
         TestConnection conn = new TestConnection(null, null);
         try (AMQPPublisher sender = new AMQPPublisher(conn, mock(ComponentLog.class))) {
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
index 1a7fc0a..b82e1a9 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/ConsumeAMQPTest.java
@@ -26,6 +26,7 @@ import java.util.Arrays;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 import java.util.concurrent.TimeoutException;
 
 import org.apache.nifi.logging.ComponentLog;
@@ -36,9 +37,7 @@ import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
 import org.junit.Test;
 
-import com.rabbitmq.client.AMQP.BasicProperties;
 import com.rabbitmq.client.Connection;
-import com.rabbitmq.client.Envelope;
 import com.rabbitmq.client.MessageProperties;
 
 public class ConsumeAMQPTest {
@@ -106,7 +105,7 @@ public class ConsumeAMQPTest {
     }
 
     @Test
-    public void testMessagesRejectedOnStop() throws TimeoutException, IOException {
+    public void testConsumerStopped() throws TimeoutException, IOException {
         final Map<String, List<String>> routingMap = Collections.singletonMap("key1", Arrays.asList("queue1"));
         final Map<String, String> exchangeToRoutingKeymap = Collections.singletonMap("myExchange", "key1");
 
@@ -133,13 +132,11 @@ public class ConsumeAMQPTest {
             // A single cumulative ack should be used
             assertTrue(((TestChannel) connection.createChannel()).isAck(0));
 
-            // Messages 1 and 2 will have been delivered but on stop should be rejected. They will be rejected
-            // cumulatively, though, so only delivery Tag 2 will be nack'ed explicitly
-            assertTrue(((TestChannel) connection.createChannel()).isNack(2));
+            assertFalse(((TestChannel) connection.createChannel()).isAck(1));
+            assertFalse(((TestChannel) connection.createChannel()).isAck(2));
 
-            // Any newly delivered messages should also be immediately nack'ed.
-            proc.getAMQPWorker().getConsumer().handleDelivery("123", new Envelope(3, false, "myExchange", "key1"), new BasicProperties(), new byte[0]);
-            assertTrue(((TestChannel) connection.createChannel()).isNack(3));
+            assertFalse(connection.createChannel().isOpen());
+            assertFalse(connection.isOpen());
         }
     }
 
@@ -186,7 +183,7 @@ public class ConsumeAMQPTest {
                     throw new IllegalStateException("Consumer already created");
                 }
 
-                consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean());
+                consumer = new AMQPConsumer(connection, context.getProperty(QUEUE).getValue(), context.getProperty(AUTO_ACKNOWLEDGE).asBoolean(), getLogger());
                 return consumer;
             } catch (IOException e) {
                 throw new ProcessException(e);
@@ -198,7 +195,7 @@ public class ConsumeAMQPTest {
         }
 
         @Override
-        protected Connection createConnection(ProcessContext context) {
+        protected Connection createConnection(ProcessContext context, ExecutorService executor) {
             return connection;
         }
     }
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
index 0464e8e..556d7b9 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/PublishAMQPTest.java
@@ -27,6 +27,7 @@ import java.util.Date;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
 
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.util.MockFlowFile;
@@ -135,7 +136,7 @@ public class PublishAMQPTest {
         }
 
         @Override
-        protected Connection createConnection(ProcessContext context) {
+        protected Connection createConnection(ProcessContext context, ExecutorService executor) {
             return connection;
         }
 
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
index 1011f62..7eaceae 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/test/java/org/apache/nifi/amqp/processors/TestChannel.java
@@ -40,6 +40,7 @@ import com.rabbitmq.client.AMQP.Queue.PurgeOk;
 import com.rabbitmq.client.AMQP.Tx.CommitOk;
 import com.rabbitmq.client.AMQP.Tx.RollbackOk;
 import com.rabbitmq.client.AMQP.Tx.SelectOk;
+import com.rabbitmq.client.AlreadyClosedException;
 import com.rabbitmq.client.BuiltinExchangeType;
 import com.rabbitmq.client.CancelCallback;
 import com.rabbitmq.client.Channel;
@@ -237,6 +238,9 @@ class TestChannel implements Channel {
         if (this.corrupted) {
             throw new IOException("Channel is corrupted");
         }
+        if (!this.open) {
+            throw new AlreadyClosedException(new ShutdownSignalException(false, false, null, null));
+        }
 
         if (exchange.equals("")){ // default exchange; routingKey corresponds to a queue.
             BlockingQueue<GetResponse> messages = this.getMessageQueue(routingKey);