You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/10/22 19:15:35 UTC
nifi git commit: NIFI-5721 Fixing connection handling in MQTT
processors
Repository: nifi
Updated Branches:
refs/heads/master 5561c29ed -> ebaaf5797
NIFI-5721 Fixing connection handling in MQTT processors
This closes #3096.
Signed-off-by: Aldrin Piri <al...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/ebaaf579
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/ebaaf579
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/ebaaf579
Branch: refs/heads/master
Commit: ebaaf5797e94d7177633cf4e85f0fbeada0765ba
Parents: 5561c29
Author: Bryan Bende <bb...@apache.org>
Authored: Thu Oct 18 17:02:42 2018 -0400
Committer: Aldrin Piri <al...@apache.org>
Committed: Mon Oct 22 15:13:14 2018 -0400
----------------------------------------------------------------------
.../nifi/processors/mqtt/ConsumeMQTT.java | 142 ++++++++-----------
.../nifi/processors/mqtt/PublishMQTT.java | 126 +++++++---------
.../mqtt/common/AbstractMQTTProcessor.java | 91 ++++++------
.../nifi/processors/mqtt/TestConsumeMQTT.java | 2 +-
.../nifi/processors/mqtt/TestPublishMQTT.java | 2 +-
.../mqtt/common/TestConsumeMqttCommon.java | 25 ++--
.../mqtt/integration/TestConsumeMQTT.java | 2 +-
.../mqtt/integration/TestConsumeMqttSSL.java | 2 +-
.../TestPublishAndSubscribeMqttIntegration.java | 2 +-
9 files changed, 181 insertions(+), 213 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index dbfbcbb..f0cba72 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -20,35 +20,36 @@ package org.apache.nifi.processors.mqtt;
import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.TriggerSerially;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
+import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.annotation.behavior.WritesAttribute;
-import org.apache.nifi.annotation.behavior.WritesAttributes;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.flowfile.attributes.CoreAttributes;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.OutputStreamCallback;
-
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import java.io.IOException;
+import java.io.OutputStream;
import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
@@ -57,8 +58,6 @@ import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.LinkedBlockingQueue;
-import java.io.OutputStream;
-import java.io.IOException;
import java.util.concurrent.atomic.AtomicBoolean;
import static org.apache.nifi.processors.mqtt.ConsumeMQTT.BROKER_ATTRIBUTE_KEY;
@@ -73,7 +72,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
@Tags({"subscribe", "MQTT", "IOT", "consume", "listen"})
@InputRequirement(InputRequirement.Requirement.INPUT_FORBIDDEN)
-@TriggerSerially // we want to have a consistent mapping between clientID and MQTT connection
+@TriggerSerially
@CapabilityDescription("Subscribes to a topic and receives messages from an MQTT broker")
@SeeAlso({PublishMQTT.class})
@WritesAttributes({
@@ -83,7 +82,7 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
@WritesAttribute(attribute=IS_DUPLICATE_ATTRIBUTE_KEY, description="Whether or not this message might be a duplicate of one which has already been received."),
@WritesAttribute(attribute=IS_RETAINED_ATTRIBUTE_KEY, description="Whether or not this message was from a current publisher, or was \"retained\" by the server as the last message published " +
"on the topic.")})
-public class ConsumeMQTT extends AbstractMQTTProcessor {
+public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
public final static String BROKER_ATTRIBUTE_KEY = "mqtt.broker";
public final static String TOPIC_ATTRIBUTE_KEY = "mqtt.topic";
@@ -119,7 +118,6 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
.build();
- private static int DISCONNECT_TIMEOUT = 5000;
private volatile long maxQueueSize;
private volatile int qos;
@@ -205,29 +203,19 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
}
@OnScheduled
- public void onScheduled(final ProcessContext context) throws IOException, ClassNotFoundException {
+ public void onScheduled(final ProcessContext context) {
+ super.onScheduled(context);
qos = context.getProperty(PROP_QOS).asInteger();
maxQueueSize = context.getProperty(PROP_MAX_QUEUE_SIZE).asLong();
topicFilter = context.getProperty(PROP_TOPIC_FILTER).getValue();
-
- buildClient(context);
scheduled.set(true);
}
@OnUnscheduled
public void onUnscheduled(final ProcessContext context) {
scheduled.set(false);
-
- mqttClientConnectLock.writeLock().lock();
- try {
- if(isConnected()) {
- mqttClient.disconnect(DISCONNECT_TIMEOUT);
- logger.info("Disconnected the MQTT client.");
- }
- } catch(MqttException me) {
- logger.error("Failed when disconnecting the MQTT client.", me);
- } finally {
- mqttClientConnectLock.writeLock().unlock();
+ synchronized (this) {
+ super.onStopped();
}
}
@@ -249,14 +237,12 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- if (mqttQueue.isEmpty() && !isConnected() && scheduled.get()){
- logger.info("Queue is empty and client is not connected. Attempting to reconnect.");
-
- try {
- reconnect();
- } catch (MqttException e) {
- logger.error("Connection to " + broker + " lost (or was never connected) and ontrigger connect failed. Yielding processor", e);
- context.yield();
+ final boolean isScheduled = scheduled.get();
+ if (!isConnected() && isScheduled){
+ synchronized (this) {
+ if (!isConnected()) {
+ initializeClient(context);
+ }
}
}
@@ -267,6 +253,27 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
transferQueue(session);
}
+ private void initializeClient(ProcessContext context) {
+ // NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is
+ // non-null but not connected, so we need to handle each case and only create a new client when it is null
+ try {
+ if (mqttClient == null) {
+ logger.debug("Creating client");
+ mqttClient = createMqttClient(broker, clientID, persistence);
+ mqttClient.setCallback(this);
+ }
+
+ if (!mqttClient.isConnected()) {
+ logger.debug("Connecting client");
+ mqttClient.connect(connOpts);
+ mqttClient.subscribe(topicFilter, qos);
+ }
+ } catch (MqttException e) {
+ logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+ context.yield();
+ }
+ }
+
private void transferQueue(ProcessSession session){
while (!mqttQueue.isEmpty()) {
FlowFile messageFlowfile = session.create();
@@ -303,56 +310,33 @@ public class ConsumeMQTT extends AbstractMQTTProcessor {
}
}
- private class ConsumeMQTTCallback implements MqttCallback {
-
- @Override
- public void connectionLost(Throwable cause) {
- logger.warn("Connection to " + broker + " lost", cause);
- try {
- reconnect();
- } catch (MqttException e) {
- logger.error("Connection to " + broker + " lost and callback re-connect failed.");
- }
- }
-
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- if (logger.isDebugEnabled()) {
- byte[] payload = message.getPayload();
- String text = new String(payload, "UTF-8");
- if (StringUtils.isAsciiPrintable(text)) {
- logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text});
- } else {
- logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length});
- }
- }
+ @Override
+ public void connectionLost(Throwable cause) {
+ logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
+ }
- if (mqttQueue.size() >= maxQueueSize){
- throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ if (logger.isDebugEnabled()) {
+ byte[] payload = message.getPayload();
+ String text = new String(payload, "UTF-8");
+ if (StringUtils.isAsciiPrintable(text)) {
+ logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text});
} else {
- mqttQueue.add(new MQTTQueueMessage(topic, message));
+ logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length});
}
}
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- logger.warn("Received MQTT 'delivery complete' message to subscriber:"+ token);
+ if (mqttQueue.size() >= maxQueueSize){
+ throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+ } else {
+ mqttQueue.add(new MQTTQueueMessage(topic, message));
}
}
- private void reconnect() throws MqttException {
- mqttClientConnectLock.writeLock().lock();
- try {
- if (!mqttClient.isConnected()) {
- setAndConnectClient(new ConsumeMQTTCallback());
- mqttClient.subscribe(topicFilter, qos);
- }
- } finally {
- mqttClientConnectLock.writeLock().unlock();
- }
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ logger.warn("Received MQTT 'delivery complete' message to subscriber: " + token);
}
- private boolean isConnected(){
- return (mqttClient != null && mqttClient.isConnected());
- }
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index c1f1a68..0db7615 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -17,43 +17,42 @@
package org.apache.nifi.processors.mqtt;
-import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.behavior.SystemResource;
+import org.apache.nifi.annotation.behavior.SystemResourceConsideration;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.expression.AttributeExpression;
import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
-import org.apache.nifi.annotation.documentation.CapabilityDescription;
-import org.apache.nifi.annotation.documentation.SeeAlso;
-import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.io.InputStreamCallback;
-import org.apache.nifi.annotation.behavior.InputRequirement;
-import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
-
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttException;
import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import java.io.IOException;
+import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
import java.util.List;
import java.util.Set;
-import java.io.InputStream;
-import java.io.IOException;
import java.util.concurrent.TimeUnit;
@SupportsBatching
@@ -62,7 +61,7 @@ import java.util.concurrent.TimeUnit;
@CapabilityDescription("Publishes a message to an MQTT topic")
@SeeAlso({ConsumeMQTT.class})
@SystemResourceConsideration(resource = SystemResource.MEMORY)
-public class PublishMQTT extends AbstractMQTTProcessor {
+public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
public static final PropertyDescriptor PROP_TOPIC = new PropertyDescriptor.Builder()
.name("Topic")
@@ -132,21 +131,13 @@ public class PublishMQTT extends AbstractMQTTProcessor {
@OnScheduled
public void onScheduled(final ProcessContext context) {
- buildClient(context);
+ super.onScheduled(context);
}
@OnStopped
- public void onStop(final ProcessContext context) {
- mqttClientConnectLock.writeLock().lock();
- try {
- if (mqttClient != null && mqttClient.isConnected()) {
- mqttClient.disconnect();
- logger.info("Disconnected the MQTT client.");
- }
- } catch(MqttException me) {
- logger.error("Failed when disconnecting the MQTT client.", me);
- } finally {
- mqttClientConnectLock.writeLock().unlock();
+ public void onStopped(final ProcessContext context) {
+ synchronized (this) {
+ super.onStopped();
}
}
@@ -157,15 +148,11 @@ public class PublishMQTT extends AbstractMQTTProcessor {
return;
}
- if(mqttClient == null || !mqttClient.isConnected()){
- logger.info("Was disconnected from client or was never connected, attempting to connect.");
- try {
- reconnect();
- } catch (MqttException e) {
- context.yield();
- session.transfer(flowfile, REL_FAILURE);
- logger.error("MQTT client is disconnected and re-connecting failed. Transferring FlowFile to fail and yielding", e);
- return;
+ if (!isConnected()){
+ synchronized (this) {
+ if (!isConnected()) {
+ initializeClient(context);
+ }
}
}
@@ -194,17 +181,12 @@ public class PublishMQTT extends AbstractMQTTProcessor {
mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean());
try {
- mqttClientConnectLock.readLock().lock();
final StopWatch stopWatch = new StopWatch(true);
- try {
- /*
- * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
- * MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
- */
- mqttClient.publish(topic, mqttMessage);
- } finally {
- mqttClientConnectLock.readLock().unlock();
- }
+ /*
+ * Underlying method waits for the message to publish (according to set QoS), so it executes synchronously:
+ * MqttClient.java:361 aClient.publish(topic, message, null, null).waitForCompletion(getTimeToWait());
+ */
+ mqttClient.publish(topic, mqttMessage);
session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowfile, REL_SUCCESS);
@@ -214,40 +196,40 @@ public class PublishMQTT extends AbstractMQTTProcessor {
}
}
- private class PublishMQTTCallback implements MqttCallback {
-
- @Override
- public void connectionLost(Throwable cause) {
- logger.warn("Connection to " + broker + " lost", cause);
- try {
- reconnect();
- } catch (MqttException e) {
- logger.error("Connection to " + broker + " lost and re-connect failed");
+ private void initializeClient(ProcessContext context) {
+ // NOTE: This method is called when isConnected returns false which can happen when the client is null, or when it is
+ // non-null but not connected, so we need to handle each case and only create a new client when it is null
+ try {
+ if (mqttClient == null) {
+ logger.debug("Creating client");
+ mqttClient = createMqttClient(broker, clientID, persistence);
+ mqttClient.setCallback(this);
}
- }
- @Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+ if (!mqttClient.isConnected()) {
+ logger.debug("Connecting client");
+ mqttClient.connect(connOpts);
+ }
+ } catch (MqttException e) {
+ logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+ context.yield();
}
+ }
- @Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
- logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
- }
+ @Override
+ public void connectionLost(Throwable cause) {
+ logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
}
+ @Override
+ public void messageArrived(String topic, MqttMessage message) throws Exception {
+ logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+ }
- private void reconnect() throws MqttException {
- mqttClientConnectLock.writeLock().lock();
- try {
- if (!mqttClient.isConnected()) {
- setAndConnectClient(new PublishMQTTCallback());
- getLogger().info("Connecting to broker: " + broker);
- }
- } finally {
- mqttClientConnectLock.writeLock().unlock();
- }
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
+ logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
}
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 733c240..ceb206e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -32,7 +32,6 @@ import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.ssl.SSLContextService;
import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
import org.eclipse.paho.client.mqttv3.MqttClient;
import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
import org.eclipse.paho.client.mqttv3.MqttException;
@@ -44,8 +43,6 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Properties;
-import java.util.concurrent.locks.ReadWriteLock;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
@@ -58,9 +55,10 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
+ public static int DISCONNECT_TIMEOUT = 5000;
+
protected ComponentLog logger;
protected IMqttClient mqttClient;
- protected final ReadWriteLock mqttClientConnectLock = new ReentrantReadWriteLock(true);
protected volatile String broker;
protected volatile String clientID;
protected MqttConnectOptions connOpts;
@@ -296,51 +294,56 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
return properties;
}
- protected void buildClient(ProcessContext context){
- try {
- broker = context.getProperty(PROP_BROKER_URI).getValue();
- clientID = context.getProperty(PROP_CLIENTID).getValue();
-
- connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
- connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
- connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
- connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
-
- PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
- if (sslProp.isSet()) {
- Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
- connOpts.setSSLProperties(sslProps);
- }
+ protected void onScheduled(final ProcessContext context){
+ broker = context.getProperty(PROP_BROKER_URI).getValue();
+ clientID = context.getProperty(PROP_CLIENTID).getValue();
- PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
- if (lastWillTopicProp.isSet()){
- String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
- PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
- Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
- connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
- }
+ connOpts = new MqttConnectOptions();
+ connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+ connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+ connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
+ connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+ PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+ if (sslProp.isSet()) {
+ Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
+ connOpts.setSSLProperties(sslProps);
+ }
- PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
- if(usernameProp.isSet()) {
- connOpts.setUserName(usernameProp.getValue());
- connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
- }
+ PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
+ if (lastWillTopicProp.isSet()){
+ String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
+ PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
+ Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
+ connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
+ }
- mqttClientConnectLock.writeLock().lock();
- try{
- mqttClient = getMqttClient(broker, clientID, persistence);
- } finally {
- mqttClientConnectLock.writeLock().unlock();
- }
+ PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
+ if(usernameProp.isSet()) {
+ connOpts.setUserName(usernameProp.getValue());
+ connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
+ }
+ }
+
+ protected void onStopped() {
+ try {
+ logger.info("Disconnecting client");
+ mqttClient.disconnect(DISCONNECT_TIMEOUT);
} catch(MqttException me) {
- logger.error("Failed to initialize the connection to the " + me.getMessage());
+ logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me);
+ }
+
+ try {
+ logger.info("Closing client");
+ mqttClient.close();
+ mqttClient = null;
+ } catch (MqttException me) {
+ logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me);
}
}
- protected IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
+ protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
return new MqttClient(broker, clientID, persistence);
}
@@ -363,10 +366,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
- // Caller should obtain the necessary lock
- protected void setAndConnectClient(MqttCallback mqttCallback) throws MqttException {
- mqttClient = getMqttClient(broker, clientID, persistence);
- mqttClient.setCallback(mqttCallback);
- mqttClient.connect(connOpts);
+ protected boolean isConnected(){
+ return (mqttClient != null && mqttClient.isConnected());
}
+
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index 144cd63..7d02804 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -54,7 +54,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
}
@Override
- public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
+ public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber);
return mqttTestClient;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index cdbc67f..9916408 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -56,7 +56,7 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
}
@Override
- public IMqttClient getMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
+ public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher);
return mqttTestClient;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
index a9159ad..71b6814 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -20,6 +20,7 @@ package org.apache.nifi.processors.mqtt.common;
import io.moquette.proto.messages.AbstractMessage;
import io.moquette.proto.messages.PublishMessage;
import io.moquette.server.Server;
+import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processors.mqtt.ConsumeMQTT;
import org.apache.nifi.provenance.ProvenanceEventRecord;
@@ -79,7 +80,7 @@ public abstract class TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -121,7 +122,7 @@ public abstract class TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -139,7 +140,7 @@ public abstract class TestConsumeMqttCommon {
internalPublish(testMessage);
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -170,7 +171,7 @@ public abstract class TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -211,7 +212,7 @@ public abstract class TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -229,7 +230,7 @@ public abstract class TestConsumeMqttCommon {
internalPublish(testMessage);
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -260,7 +261,7 @@ public abstract class TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -308,7 +309,7 @@ public abstract class TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -354,7 +355,7 @@ public abstract class TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
@@ -396,10 +397,10 @@ public abstract class TestConsumeMqttCommon {
}
- public static void reconnect(ConsumeMQTT processor) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
- Method method = ConsumeMQTT.class.getDeclaredMethod("reconnect");
+ public static void reconnect(ConsumeMQTT processor, ProcessContext context) throws NoSuchFieldException, IllegalAccessException, NoSuchMethodException, InvocationTargetException {
+ Method method = ConsumeMQTT.class.getDeclaredMethod("initializeClient", ProcessContext.class);
method.setAccessible(true);
- method.invoke(processor);
+ method.invoke(processor, context);
}
public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
index 759bf96..d7ed0e0 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMQTT.java
@@ -107,7 +107,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
index ccb0eb7..65319d8 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestConsumeMqttSSL.java
@@ -122,7 +122,7 @@ public class TestConsumeMqttSSL extends TestConsumeMqttCommon {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testRunner.getProcessContext());
Thread.sleep(PUBLISH_WAIT_MS);
http://git-wip-us.apache.org/repos/asf/nifi/blob/ebaaf579/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
index a97ac98..dc09ce1 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/integration/TestPublishAndSubscribeMqttIntegration.java
@@ -128,7 +128,7 @@ public class TestPublishAndSubscribeMqttIntegration {
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testSubscribeRunner.getProcessor();
consumeMQTT.onScheduled(testSubscribeRunner.getProcessContext());
- reconnect(consumeMQTT);
+ reconnect(consumeMQTT, testSubscribeRunner.getProcessContext());
}
private void subscribeVerify(){