You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/22 19:15:35 UTC

nifi git commit: NIFI-5721 Fixing connection handling in MQTT processors

Repository: nifi
Updated Branches:
  refs/heads/master 5561c29ed -> ebaaf5797


NIFI-5721 Fixing connection handling in MQTT processors

This closes #3096.

Signed-off-by: Aldrin Piri <al...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ebaaf579
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ebaaf579
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ebaaf579

Branch: refs/heads/master
Commit: ebaaf5797e94d7177633cf4e85f0fbeada0765ba
Parents: 5561c29
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Oct 18 17:02:42 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Mon Oct 22 15:13:14 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/mqtt/ConsumeMQTT.java       | 142 ++++++++-----------
 .../nifi/processors/mqtt/PublishMQTT.java       | 126 +++++++---------
 .../mqtt/common/AbstractMQTTProcessor.java      |  91 ++++++------
 .../nifi/processors/mqtt/TestConsumeMQTT.java   |   2 +-
 .../nifi/processors/mqtt/TestPublishMQTT.java   |   2 +-
 .../mqtt/common/TestConsumeMqttCommon.java      |  25 ++--
 .../mqtt/integration/TestConsumeMQTT.java       |   2 +-
 .../mqtt/integration/TestConsumeMqttSSL.java    |   2 +-
 .../TestPublishAndSubscribeMqttIntegration.java |   2 +-
 9 files changed, 181 insertions(+), 213 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index dbfbcbb..f0cba72 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -20,35 +20,36 @@ package org.apache.nifi.processors.mqtt;
 import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.io.OutputStreamCallback;
-
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 
+import java.io.IOException;
+import java.io.OutputStream;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
@@ -57,8 +58,6 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.LinkedBlockingQueue;
-import java.io.OutputStream;
-import java.io.IOException;
 import java.util.concurrent.atomic.AtomicBoolean;
 
 import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
@@ -73,7 +72,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
 
 @Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
 @InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection
+@TriggerSerially
 @CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
 @SeeAlso({PublishMQTT.class})
 @WritesAttributes({
@@ -83,7 +82,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
     @WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."),
     @WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " +
             "on the topic.")})
-public class ConsumeMQTT extends AbstractMQTTProcessor {
+public class ConsumeMQTT extends AbstractMQTTProcessor  implements MqttCallback {
 
     public final static String BROKER_ATTRIBUTE_KEY =  "mqtt.broker";
     public final static String TOPIC_ATTRIBUTE_KEY =  "mqtt.topic";
@@ -119,7 +118,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
             .build();
 
 
-    private static int DISCONNECT_TIMEOUT = 5000;
     private volatile long maxQueueSize;
 
     private volatile int qos;
@@ -205,29 +203,19 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
     }
 
     @OnScheduled
-    public void onScheduled(final ProcessContext context) throws IOException, ClassNotFoundException {
+    public void onScheduled(final ProcessContext context) {
+        super.onScheduled(context);
         qos = context.getProperty(PROP_QOS).asInteger();
         maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
         topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
-
-        buildClient(context);
         scheduled.set(true);
     }
 
     @OnUnscheduled
     public void onUnscheduled(final ProcessContext context) {
         scheduled.set(false);
-
-        mqttClientConnectLock.writeLock().lock();
-        try {
-            if(isConnected()) {
-                mqttClient.disconnect(DISCONNECT_TIMEOUT);
-                logger.info("Disconnected the MQTT client.");
-            }
-        } catch(MqttException me) {
-            logger.error("Failed when disconnecting the MQTT client.", me);
-        } finally {
-            mqttClientConnectLock.writeLock().unlock();
+        synchronized (this) {
+            super.onStopped();
         }
     }
 
@@ -249,14 +237,12 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        if (mqttQueue.isEmpty() && !isConnected() && scheduled.get()){
-            logger.info("Queue is empty and client is not connected. Attempting to reconnect.");
-
-            try {
-                reconnect();
-            } catch (MqttException e) {
-                logger.error("Connection to " + broker + " lost (or was never connected) and ontrigger connect failed. Yielding processor", e);
-                context.yield();
+        final boolean isScheduled = scheduled.get();
+        if (!isConnected() && isScheduled){
+            synchronized (this) {
+                if (!isConnected()) {
+                    initializeClient(context);
+                }
             }
         }
 
@@ -267,6 +253,27 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
         transferQueue(session);
     }
 
+    private void initializeClient(ProcessContext context) {
+        // NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is
+        // non-null but not connected, so we need to handle each case and only create a new client when it is null
+        try {
+            if (mqttClient == null) {
+                logger.debug("Creating client");
+                mqttClient = createMqttClient(broker, clientID, persistence);
+                mqttClient.setCallback(this);
+            }
+
+            if (!mqttClient.isConnected()) {
+                logger.debug("Connecting client");
+                mqttClient.connect(connOpts);
+                mqttClient.subscribe(topicFilter, qos);
+            }
+        } catch (MqttException e) {
+            logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+            context.yield();
+        }
+    }
+
     private void transferQueue(ProcessSession session){
         while (!mqttQueue.isEmpty()) {
             FlowFile messageFlowfile = session.create();
@@ -303,56 +310,33 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
         }
     }
 
-    private class ConsumeMQTTCallback implements MqttCallback {
-
-        @Override
-        public void connectionLost(Throwable cause) {
-            logger.warn("Connection to " + broker + " lost", cause);
-            try {
-                reconnect();
-            } catch (MqttException e) {
-                logger.error("Connection to " + broker + " lost and callback re-connect failed.");
-            }
-        }
-
-        @Override
-        public void messageArrived(String topic, MqttMessage message) throws Exception {
-            if (logger.isDebugEnabled()) {
-                byte[] payload = message.getPayload();
-                String text = new String(payload, "UTF-8");
-                if (StringUtils.isAsciiPrintable(text)) {
-                    logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text});
-                } else {
-                    logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length});
-                }
-            }
+    @Override
+    public void connectionLost(Throwable cause) {
+        logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
+    }
 
-            if (mqttQueue.size() >= maxQueueSize){
-                throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
+        if (logger.isDebugEnabled()) {
+            byte[] payload = message.getPayload();
+            String text = new String(payload, "UTF-8");
+            if (StringUtils.isAsciiPrintable(text)) {
+                logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text});
             } else {
-                mqttQueue.add(new MQTTQueueMessage(topic, message));
+                logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length});
             }
         }
 
-        @Override
-        public void deliveryComplete(IMqttDeliveryToken token) {
-            logger.warn("Received MQTT 'delivery complete' message to subscriber:"+ token);
+        if (mqttQueue.size() >= maxQueueSize){
+            throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+        } else {
+            mqttQueue.add(new MQTTQueueMessage(topic, message));
         }
     }
 
-    private void reconnect() throws MqttException {
-        mqttClientConnectLock.writeLock().lock();
-        try {
-            if (!mqttClient.isConnected()) {
-                setAndConnectClient(new ConsumeMQTTCallback());
-                mqttClient.subscribe(topicFilter, qos);
-            }
-        } finally {
-            mqttClientConnectLock.writeLock().unlock();
-        }
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        logger.warn("Received MQTT 'delivery complete' message to subscriber: " + token);
     }
 
-    private boolean isConnected(){
-        return (mqttClient != null && mqttClient.isConnected());
-    }
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index c1f1a68..0db7615 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -17,43 +17,42 @@
 
 package org.apache.nifi.processors.mqtt;
 
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.AttributeExpression;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttException;
 import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
 
+import java.io.IOException;
+import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Set;
-import java.io.InputStream;
-import java.io.IOException;
 import java.util.concurrent.TimeUnit;
 
 @SupportsBatching
@@ -62,7 +61,7 @@ import java.util.concurrent.TimeUnit;
 @CapabilityDescription("Publishes a message to an MQTT topic")
 @SeeAlso({ConsumeMQTT.class})
 @SystemResourceConsideration(resource = SystemResource.MEMORY)
-public class PublishMQTT extends AbstractMQTTProcessor {
+public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
     public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor.Builder()
             .name("Topic")
@@ -132,21 +131,13 @@ public class PublishMQTT extends AbstractMQTTProcessor {
 
     @OnScheduled
     public void onScheduled(final ProcessContext context) {
-        buildClient(context);
+        super.onScheduled(context);
     }
 
     @OnStopped
-    public void onStop(final ProcessContext context) {
-        mqttClientConnectLock.writeLock().lock();
-        try {
-            if (mqttClient != null && mqttClient.isConnected()) {
-                mqttClient.disconnect();
-                logger.info("Disconnected the MQTT client.");
-            }
-        } catch(MqttException me) {
-            logger.error("Failed when disconnecting the MQTT client.", me);
-        } finally {
-            mqttClientConnectLock.writeLock().unlock();
+    public void onStopped(final ProcessContext context) {
+        synchronized (this) {
+            super.onStopped();
         }
     }
 
@@ -157,15 +148,11 @@ public class PublishMQTT extends AbstractMQTTProcessor {
             return;
         }
 
-        if(mqttClient == null || !mqttClient.isConnected()){
-            logger.info("Was disconnected from client or was never connected, attempting to connect.");
-            try {
-                reconnect();
-            } catch (MqttException e) {
-                context.yield();
-                session.transfer(flowfile, REL_FAILURE);
-                logger.error("MQTT client is disconnected and re-connecting failed. Transferring FlowFile to fail and yielding", e);
-                return;
+        if (!isConnected()){
+            synchronized (this) {
+                if (!isConnected()) {
+                    initializeClient(context);
+                }
             }
         }
 
@@ -194,17 +181,12 @@ public class PublishMQTT extends AbstractMQTTProcessor {
         mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean());
 
         try {
-            mqttClientConnectLock.readLock().lock();
             final StopWatch stopWatch = new StopWatch(true);
-            try {
-                /*
-                 * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
-                 *     MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
-                 */
-                mqttClient.publish(topic, mqttMessage);
-            } finally {
-                mqttClientConnectLock.readLock().unlock();
-            }
+            /*
+             * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
+             *     MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
+             */
+            mqttClient.publish(topic, mqttMessage);
 
             session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowfile, REL_SUCCESS);
@@ -214,40 +196,40 @@ public class PublishMQTT extends AbstractMQTTProcessor {
         }
     }
 
-    private class PublishMQTTCallback  implements MqttCallback {
-
-        @Override
-        public void connectionLost(Throwable cause) {
-            logger.warn("Connection to " + broker + " lost", cause);
-            try {
-                reconnect();
-            } catch (MqttException e) {
-                logger.error("Connection to " + broker + " lost and re-connect failed");
+    private void initializeClient(ProcessContext context) {
+        // NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is
+        // non-null but not connected, so we need to handle each case and only create a new client when it is null
+        try {
+            if (mqttClient == null) {
+                logger.debug("Creating client");
+                mqttClient = createMqttClient(broker, clientID, persistence);
+                mqttClient.setCallback(this);
             }
-        }
 
-        @Override
-        public void messageArrived(String topic, MqttMessage message) throws Exception {
-            logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+            if (!mqttClient.isConnected()) {
+                logger.debug("Connecting client");
+                mqttClient.connect(connOpts);
+            }
+        } catch (MqttException e) {
+            logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+            context.yield();
         }
+    }
 
-        @Override
-        public void deliveryComplete(IMqttDeliveryToken token) {
-            // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
-            logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
-        }
+    @Override
+    public void connectionLost(Throwable cause) {
+        logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
     }
 
+    @Override
+    public void messageArrived(String topic, MqttMessage message) throws Exception {
+        logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+    }
 
-    private void reconnect() throws MqttException {
-        mqttClientConnectLock.writeLock().lock();
-        try {
-            if (!mqttClient.isConnected()) {
-                setAndConnectClient(new PublishMQTTCallback());
-                getLogger().info("Connecting to broker: " + broker);
-            }
-        } finally {
-            mqttClientConnectLock.writeLock().unlock();
-        }
+    @Override
+    public void deliveryComplete(IMqttDeliveryToken token) {
+        // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
+        logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 733c240..ceb206e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -32,7 +32,6 @@ import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.ssl.SSLContextService;
 import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
 import org.eclipse.paho.client.mqttv3.MqttClient;
 import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
 import org.eclipse.paho.client.mqttv3.MqttException;
@@ -44,8 +43,6 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Properties;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
@@ -58,9 +55,10 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
 
 public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
 
+    public static int DISCONNECT_TIMEOUT = 5000;
+
     protected ComponentLog logger;
     protected IMqttClient mqttClient;
-    protected final ReadWriteLock mqttClientConnectLock = new ReentrantReadWriteLock(true);
     protected volatile String broker;
     protected volatile String clientID;
     protected MqttConnectOptions connOpts;
@@ -296,51 +294,56 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
         return  properties;
     }
 
-    protected void buildClient(ProcessContext context){
-        try {
-            broker = context.getProperty(PROP_BROKER_URI).getValue();
-            clientID = context.getProperty(PROP_CLIENTID).getValue();
-
-            connOpts = new MqttConnectOptions();
-            connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
-            connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
-            connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
-            connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
-
-            PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
-            if (sslProp.isSet()) {
-                Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
-                connOpts.setSSLProperties(sslProps);
-            }
+    protected void onScheduled(final ProcessContext context){
+        broker = context.getProperty(PROP_BROKER_URI).getValue();
+        clientID = context.getProperty(PROP_CLIENTID).getValue();
 
-            PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
-            if (lastWillTopicProp.isSet()){
-                String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
-                PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
-                Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
-                connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
-            }
+        connOpts = new MqttConnectOptions();
+        connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+        connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+        connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
+        connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
 
+        PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+        if (sslProp.isSet()) {
+            Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
+            connOpts.setSSLProperties(sslProps);
+        }
 
-            PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
-            if(usernameProp.isSet()) {
-                connOpts.setUserName(usernameProp.getValue());
-                connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
-            }
+        PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
+        if (lastWillTopicProp.isSet()){
+            String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
+            PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
+            Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
+            connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
+        }
 
-            mqttClientConnectLock.writeLock().lock();
-            try{
-                mqttClient = getMqttClient(broker, clientID, persistence);
 
-            } finally {
-                mqttClientConnectLock.writeLock().unlock();
-            }
+        PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
+        if(usernameProp.isSet()) {
+            connOpts.setUserName(usernameProp.getValue());
+            connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
+        }
+    }
+
+    protected void onStopped() {
+        try {
+            logger.info("Disconnecting client");
+            mqttClient.disconnect(DISCONNECT_TIMEOUT);
         } catch(MqttException me) {
-            logger.error("Failed to initialize the connection to the  " + me.getMessage());
+            logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me);
+        }
+
+        try {
+            logger.info("Closing client");
+            mqttClient.close();
+            mqttClient = null;
+        } catch (MqttException me) {
+            logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me);
         }
     }
 
-    protected IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
+    protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
         return new MqttClient(broker, clientID, persistence);
     }
 
@@ -363,10 +366,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
 
     public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
 
-    // Caller should obtain the necessary lock
-    protected void setAndConnectClient(MqttCallback mqttCallback) throws MqttException {
-        mqttClient = getMqttClient(broker, clientID, persistence);
-        mqttClient.setCallback(mqttCallback);
-        mqttClient.connect(connOpts);
+    protected boolean isConnected(){
+        return (mqttClient != null && mqttClient.isConnected());
     }
+
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 144cd63..7d02804 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -54,7 +54,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
         }
 
         @Override
-        public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
+        public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
             mqttTestClient =  new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber);
             return mqttTestClient;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index cdbc67f..9916408 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -56,7 +56,7 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
         }
 
         @Override
-        public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
+        public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
             mqttTestClient =  new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher);
             return mqttTestClient;
         }

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
index a9159ad..71b6814 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.mqtt.common;
 import io.moquette.proto.messages.AbstractMessage;
 import io.moquette.proto.messages.PublishMessage;
 import io.moquette.server.Server;
+import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processors.mqtt.ConsumeMQTT;
 import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -79,7 +80,7 @@ public abstract class TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -121,7 +122,7 @@ public abstract class TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -139,7 +140,7 @@ public abstract class TestConsumeMqttCommon {
         internalPublish(testMessage);
 
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -170,7 +171,7 @@ public abstract class TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -211,7 +212,7 @@ public abstract class TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -229,7 +230,7 @@ public abstract class TestConsumeMqttCommon {
         internalPublish(testMessage);
 
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -260,7 +261,7 @@ public abstract class TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -308,7 +309,7 @@ public abstract class TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -354,7 +355,7 @@ public abstract class TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 
@@ -396,10 +397,10 @@ public abstract class TestConsumeMqttCommon {
     }
 
 
-    public static void reconnect(ConsumeMQTT processor) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
-        Method method = ConsumeMQTT.class.getDeclaredMethod("reconnect");
+    public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+        Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
         method.setAccessible(true);
-        method.invoke(processor);
+        method.invoke(processor, context);
     }
 
     public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
index 759bf96..d7ed0e0 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
@@ -107,7 +107,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
index ccb0eb7..65319d8 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
@@ -122,7 +122,7 @@ public class TestConsumeMqttSSL extends TestConsumeMqttCommon {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testRunner.getProcessContext());
 
         Thread.sleep(PUBLISH_WAIT_MS);
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
index a97ac98..dc09ce1 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
@@ -128,7 +128,7 @@ public class TestPublishAndSubscribeMqttIntegration {
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testSubscribeRunner.getProcessor();
         consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext());
-        reconnect(consumeMQTT);
+        reconnect(consumeMQTT, testSubscribeRunner.getProcessContext());
     }
 
     private void subscribeVerify(){