You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/08/30 08:32:52 UTC

[nifi] branch main updated: NIFI-10251 Add v5 protocol support for existing MQTT processors

This is an automated email from the ASF dual-hosted git repository.

turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/main by this push:
     new 4d4a5ca4be NIFI-10251 Add v5 protocol support for existing MQTT processors
4d4a5ca4be is described below

commit 4d4a5ca4be2f98e82ccf0b96290b27e0b06f277c
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Thu Jul 7 17:54:11 2022 +0200

    NIFI-10251 Add v5 protocol support for existing MQTT processors
    
    This closes #6225.
    
    Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
 nifi-assembly/NOTICE                               | 299 ++++++++++++++++++++
 .../src/main/resources/META-INF/NOTICE             | 307 +++++++++++++++++++++
 .../nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml  |  29 +-
 .../apache/nifi/processors/mqtt/ConsumeMQTT.java   | 176 ++++++------
 .../apache/nifi/processors/mqtt/PublishMQTT.java   |  58 ++--
 .../mqtt/adapters/HiveMqV5ClientAdapter.java       | 200 ++++++++++++++
 .../mqtt/adapters/PahoMqttClientAdapter.java       | 191 +++++++++++++
 .../mqtt/common/AbstractMQTTProcessor.java         | 273 +++++++++---------
 .../{MQTTQueueMessage.java => MqttCallback.java}   |  42 +--
 .../nifi/processors/mqtt/common/MqttClient.java    |  69 +++++
 .../processors/mqtt/common/MqttClientFactory.java  |  37 +++
 .../mqtt/common/MqttClientProperties.java          | 164 +++++++++++
 .../nifi/processors/mqtt/common/MqttConstants.java |  19 +-
 .../{MQTTQueueMessage.java => MqttException.java}  |  39 +--
 ...TTQueueMessage.java => MqttProtocolScheme.java} |  43 +--
 .../nifi/processors/mqtt/common/MqttVersion.java   |  51 ++++
 ...TQueueMessage.java => ReceivedMqttMessage.java} |  33 +--
 ...TQueueMessage.java => StandardMqttMessage.java} |  33 +--
 .../nifi/processors/mqtt/TestConsumeMQTT.java      |  24 +-
 .../nifi/processors/mqtt/TestPublishMQTT.java      |  22 +-
 .../processors/mqtt/common/MqttTestClient.java     | 222 ++-------------
 .../mqtt/common/TestConsumeMqttCommon.java         |  24 +-
 22 files changed, 1654 insertions(+), 701 deletions(-)

diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 242b7c4305..b62e10a256 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -1565,6 +1565,275 @@ The following binary components are provided under the Apache Software License v
           * http://tomcat.apache.org/native-doc/
           * https://svn.apache.org/repos/asf/tomcat/native/
 
+  (ASLv2) The Netty Project (4.1.77.Final)
+    The following NOTICE information applies:
+      netty/netty
+      Copyright 2014 The Netty Project
+    -------------------------------------------------------------------------------
+                                The Netty Project
+                                =================
+
+    Please visit the Netty web site for more information:
+
+      * https://netty.io/
+
+    Copyright 2014 The Netty Project
+
+    The Netty Project licenses this file to you under the Apache License,
+    version 2.0 (the "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at:
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+    License for the specific language governing permissions and limitations
+    under the License.
+
+    Also, please refer to each LICENSE.<component>.txt file, which is located in
+    the 'license' directory of the distribution file, for the license terms of the
+    components that this product depends on.
+
+    -------------------------------------------------------------------------------
+    This product contains the extensions to Java Collections Framework which has
+    been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+      * LICENSE:
+        * license/LICENSE.jsr166y.txt (Public Domain)
+      * HOMEPAGE:
+        * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+        * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+    This product contains a modified version of Robert Harder's Public Domain
+    Base64 Encoder and Decoder, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.base64.txt (Public Domain)
+      * HOMEPAGE:
+        * http://iharder.sourceforge.net/current/java/base64/
+
+    This product contains a modified portion of 'Webbit', an event based
+    WebSocket and HTTP server, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.webbit.txt (BSD License)
+      * HOMEPAGE:
+        * https://github.com/joewalnes/webbit
+
+    This product contains a modified portion of 'SLF4J', a simple logging
+    facade for Java, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.slf4j.txt (MIT License)
+      * HOMEPAGE:
+        * https://www.slf4j.org/
+
+    This product contains a modified portion of 'Apache Harmony', an open source
+    Java SE, which can be obtained at:
+
+      * NOTICE:
+        * license/NOTICE.harmony.txt
+      * LICENSE:
+        * license/LICENSE.harmony.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://archive.apache.org/dist/harmony/
+
+    This product contains a modified portion of 'jbzip2', a Java bzip2 compression
+    and decompression library written by Matthew J. Francis. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jbzip2.txt (MIT License)
+      * HOMEPAGE:
+        * https://code.google.com/p/jbzip2/
+
+    This product contains a modified portion of 'libdivsufsort', a C API library to construct
+    the suffix array and the Burrows-Wheeler transformed string for any input string of
+    a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.libdivsufsort.txt (MIT License)
+      * HOMEPAGE:
+        * https://github.com/y-256/libdivsufsort
+
+    This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
+     which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jctools.txt (ASL2 License)
+      * HOMEPAGE:
+        * https://github.com/JCTools/JCTools
+
+    This product optionally depends on 'JZlib', a re-implementation of zlib in
+    pure Java, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jzlib.txt (BSD style License)
+      * HOMEPAGE:
+        * http://www.jcraft.com/jzlib/
+
+    This product optionally depends on 'Compress-LZF', a Java library for encoding and
+    decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/ning/compress
+
+    This product optionally depends on 'lz4', a LZ4 Java compression
+    and decompression library written by Adrien Grand. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.lz4.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/jpountz/lz4-java
+
+    This product optionally depends on 'lzma-java', a LZMA Java compression
+    and decompression library, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.lzma-java.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/jponge/lzma-java
+
+    This product optionally depends on 'zstd-jni', a zstd-jni Java compression
+    and decompression library, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/luben/zstd-jni
+
+    This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
+    and decompression library written by William Kinney. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jfastlz.txt (MIT License)
+      * HOMEPAGE:
+        * https://code.google.com/p/jfastlz/
+
+    This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
+    interchange format, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.protobuf.txt (New BSD License)
+      * HOMEPAGE:
+        * https://github.com/google/protobuf
+
+    This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
+    a temporary self-signed X.509 certificate when the JVM does not provide the
+    equivalent functionality.  It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.bouncycastle.txt (MIT License)
+      * HOMEPAGE:
+        * https://www.bouncycastle.org/
+
+    This product optionally depends on 'Snappy', a compression library produced
+    by Google Inc, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.snappy.txt (New BSD License)
+      * HOMEPAGE:
+        * https://github.com/google/snappy
+
+    This product optionally depends on 'JBoss Marshalling', an alternative Java
+    serialization API, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/jboss-remoting/jboss-marshalling
+
+    This product optionally depends on 'Caliper', Google's micro-
+    benchmarking framework, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.caliper.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/google/caliper
+
+    This product optionally depends on 'Apache Commons Logging', a logging
+    framework, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.commons-logging.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://commons.apache.org/logging/
+
+    This product optionally depends on 'Apache Log4J', a logging framework, which
+    can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.log4j.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://logging.apache.org/log4j/
+
+    This product optionally depends on 'Aalto XML', an ultra-high performance
+    non-blocking XML processor, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://wiki.fasterxml.com/AaltoHome
+
+    This product contains a modified version of 'HPACK', a Java implementation of
+    the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.hpack.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/twitter/hpack
+
+    This product contains a modified version of 'HPACK', a Java implementation of
+    the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.hyper-hpack.txt (MIT License)
+      * HOMEPAGE:
+        * https://github.com/python-hyper/hpack/
+
+    This product contains a modified version of 'HPACK', a Java implementation of
+    the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.nghttp2-hpack.txt (MIT License)
+      * HOMEPAGE:
+        * https://github.com/nghttp2/nghttp2/
+
+    This product contains a modified portion of 'Apache Commons Lang', a Java library
+    provides utilities for the java.lang API, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.commons-lang.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://commons.apache.org/proper/commons-lang/
+
+
+    This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
+
+      * LICENSE:
+        * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/takari/maven-wrapper
+
+    This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS.
+    This private header is also used by Apple's open source
+     mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+     * LICENSE:
+        * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+      * HOMEPAGE:
+        * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+    This product optionally depends on 'Brotli4j', Brotli compression and
+    decompression for Java., which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.brotli4j.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/hyperxpro/Brotli4j
+
   (ASLv2) Error Prone
     The following NOTICE information applies:
       Copyright 2017 Google Inc.
@@ -1958,6 +2227,36 @@ The following binary components are provided under the Apache Software License v
       Copyright (c) 2014-2019 Appsicle
       Copyright (c) 2019-2020 QuestDB
 
+  (ASLv2) HiveMQ MQTT Client
+    The following NOTICE information applies:
+      HiveMQ MQTT Client
+      Copyright 2018-2022 HiveMQ and the HiveMQ Community
+
+  (ASLv2) ReactiveX/RxJava
+    The following NOTICE information applies:
+      ReactiveX/RxJava
+      Copyright 2018-present RxJava Contributors
+
+  (ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core)
+    The following NOTICE information applies:
+      JCTools/JCTools
+      Copyright
+
+  (ASLv2) JetBrains/java-annotations
+    The following NOTICE information applies:
+      JetBrains/java-annotations
+      Copyright 2000-2016 JetBrains s.r.o.
+
+  (ASLv2) google/dagger
+    The following NOTICE information applies:
+      google/dagger
+      Copyright 2012 The Dagger Authors
+
+  (ASLv2) atinject (javax.inject:javax.inject)
+    The following NOTICE information applies:
+      atinject
+      Copyright
+
 ************************
 Common Development and Distribution License 1.1
 ************************
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE
index f95db89b8c..306617c48e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE
@@ -41,6 +41,305 @@ The following binary components are provided under the Apache Software License v
       in some artifacts (usually source distributions); but is always available
       from the source code management (SCM) system project uses.
 
+  (ASLv2) HiveMQ MQTT Client
+    The following NOTICE information applies:
+      HiveMQ MQTT Client
+      Copyright 2018-2022 HiveMQ and the HiveMQ Community
+
+  (ASLv2) ReactiveX/RxJava
+    The following NOTICE information applies:
+      ReactiveX/RxJava
+      Copyright 2018-present RxJava Contributors
+
+  (ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core)
+    The following NOTICE information applies:
+      JCTools/JCTools
+      Copyright
+
+  (ASLv2) JetBrains/java-annotations
+    The following NOTICE information applies:
+      JetBrains/java-annotations
+      Copyright 2000-2016 JetBrains s.r.o.
+
+  (ASLv2) google/dagger
+    The following NOTICE information applies:
+      google/dagger
+      Copyright 2012 The Dagger Authors
+
+  (ASLv2) atinject (javax.inject:javax.inject)
+    The following NOTICE information applies:
+      atinject
+      Copyright
+
+  (ASLv2) The Netty Project
+    The following NOTICE information applies:
+      netty/netty
+      Copyright 2014 The Netty Project
+    -------------------------------------------------------------------------------
+                                The Netty Project
+                                =================
+
+    Please visit the Netty web site for more information:
+
+      * https://netty.io/
+
+    Copyright 2014 The Netty Project
+
+    The Netty Project licenses this file to you under the Apache License,
+    version 2.0 (the "License"); you may not use this file except in compliance
+    with the License. You may obtain a copy of the License at:
+
+      https://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+    WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+    License for the specific language governing permissions and limitations
+    under the License.
+
+    Also, please refer to each LICENSE.<component>.txt file, which is located in
+    the 'license' directory of the distribution file, for the license terms of the
+    components that this product depends on.
+
+    -------------------------------------------------------------------------------
+    This product contains the extensions to Java Collections Framework which has
+    been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+      * LICENSE:
+        * license/LICENSE.jsr166y.txt (Public Domain)
+      * HOMEPAGE:
+        * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+        * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+    This product contains a modified version of Robert Harder's Public Domain
+    Base64 Encoder and Decoder, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.base64.txt (Public Domain)
+      * HOMEPAGE:
+        * http://iharder.sourceforge.net/current/java/base64/
+
+    This product contains a modified portion of 'Webbit', an event based
+    WebSocket and HTTP server, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.webbit.txt (BSD License)
+      * HOMEPAGE:
+        * https://github.com/joewalnes/webbit
+
+    This product contains a modified portion of 'SLF4J', a simple logging
+    facade for Java, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.slf4j.txt (MIT License)
+      * HOMEPAGE:
+        * https://www.slf4j.org/
+
+    This product contains a modified portion of 'Apache Harmony', an open source
+    Java SE, which can be obtained at:
+
+      * NOTICE:
+        * license/NOTICE.harmony.txt
+      * LICENSE:
+        * license/LICENSE.harmony.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://archive.apache.org/dist/harmony/
+
+    This product contains a modified portion of 'jbzip2', a Java bzip2 compression
+    and decompression library written by Matthew J. Francis. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jbzip2.txt (MIT License)
+      * HOMEPAGE:
+        * https://code.google.com/p/jbzip2/
+
+    This product contains a modified portion of 'libdivsufsort', a C API library to construct
+    the suffix array and the Burrows-Wheeler transformed string for any input string of
+    a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.libdivsufsort.txt (MIT License)
+      * HOMEPAGE:
+        * https://github.com/y-256/libdivsufsort
+
+    This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
+     which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jctools.txt (ASL2 License)
+      * HOMEPAGE:
+        * https://github.com/JCTools/JCTools
+
+    This product optionally depends on 'JZlib', a re-implementation of zlib in
+    pure Java, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jzlib.txt (BSD style License)
+      * HOMEPAGE:
+        * http://www.jcraft.com/jzlib/
+
+    This product optionally depends on 'Compress-LZF', a Java library for encoding and
+    decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/ning/compress
+
+    This product optionally depends on 'lz4', a LZ4 Java compression
+    and decompression library written by Adrien Grand. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.lz4.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/jpountz/lz4-java
+
+    This product optionally depends on 'lzma-java', a LZMA Java compression
+    and decompression library, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.lzma-java.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/jponge/lzma-java
+
+    This product optionally depends on 'zstd-jni', a zstd-jni Java compression
+    and decompression library, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/luben/zstd-jni
+
+    This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
+    and decompression library written by William Kinney. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jfastlz.txt (MIT License)
+      * HOMEPAGE:
+        * https://code.google.com/p/jfastlz/
+
+    This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
+    interchange format, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.protobuf.txt (New BSD License)
+      * HOMEPAGE:
+        * https://github.com/google/protobuf
+
+    This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
+    a temporary self-signed X.509 certificate when the JVM does not provide the
+    equivalent functionality.  It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.bouncycastle.txt (MIT License)
+      * HOMEPAGE:
+        * https://www.bouncycastle.org/
+
+    This product optionally depends on 'Snappy', a compression library produced
+    by Google Inc, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.snappy.txt (New BSD License)
+      * HOMEPAGE:
+        * https://github.com/google/snappy
+
+    This product optionally depends on 'JBoss Marshalling', an alternative Java
+    serialization API, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/jboss-remoting/jboss-marshalling
+
+    This product optionally depends on 'Caliper', Google's micro-
+    benchmarking framework, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.caliper.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/google/caliper
+
+    This product optionally depends on 'Apache Commons Logging', a logging
+    framework, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.commons-logging.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://commons.apache.org/logging/
+
+    This product optionally depends on 'Apache Log4J', a logging framework, which
+    can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.log4j.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://logging.apache.org/log4j/
+
+    This product optionally depends on 'Aalto XML', an ultra-high performance
+    non-blocking XML processor, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://wiki.fasterxml.com/AaltoHome
+
+    This product contains a modified version of 'HPACK', a Java implementation of
+    the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.hpack.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/twitter/hpack
+
+    This product contains a modified version of 'HPACK', a Java implementation of
+    the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.hyper-hpack.txt (MIT License)
+      * HOMEPAGE:
+        * https://github.com/python-hyper/hpack/
+
+    This product contains a modified version of 'HPACK', a Java implementation of
+    the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.nghttp2-hpack.txt (MIT License)
+      * HOMEPAGE:
+        * https://github.com/nghttp2/nghttp2/
+
+    This product contains a modified portion of 'Apache Commons Lang', a Java library
+    provides utilities for the java.lang API, which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.commons-lang.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://commons.apache.org/proper/commons-lang/
+
+
+    This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
+
+      * LICENSE:
+        * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/takari/maven-wrapper
+
+    This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS.
+    This private header is also used by Apple's open source
+     mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+     * LICENSE:
+        * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+      * HOMEPAGE:
+        * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+    This product optionally depends on 'Brotli4j', Brotli compression and
+    decompression for Java., which can be obtained at:
+
+      * LICENSE:
+        * license/LICENSE.brotli4j.txt (Apache License 2.0)
+      * HOMEPAGE:
+        * https://github.com/hyperxpro/Brotli4j
+
 ************************
 Eclipse Public License 1.0
 ************************
@@ -48,3 +347,11 @@ Eclipse Public License 1.0
 The following binary components are provided under the Eclipse Public License 1.0.  See project link for details.
 
     (EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0 - https://github.com/eclipse/paho.mqtt.java)
+
+*****************
+Public Domain
+*****************
+
+The following binary components are provided under the Creative Commons Zero license version 1.0.  See project link for details.
+
+    (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 - http://www.reactive-streams.org/)
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index 2ed90145e7..3e9bed91ef 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -48,6 +48,11 @@
             <groupId>org.apache.nifi</groupId>
             <artifactId>nifi-record</artifactId>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-security-utils</artifactId>
+            <version>1.18.0-SNAPSHOT</version>
+        </dependency>
 
         <!-- External dependencies -->
         <dependency>
@@ -55,6 +60,11 @@
             <artifactId>org.eclipse.paho.client.mqttv3</artifactId>
             <version>1.2.2</version>
         </dependency>
+        <dependency>
+            <groupId>com.hivemq</groupId>
+            <artifactId>hivemq-mqtt-client</artifactId>
+            <version>1.3.0</version>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
@@ -84,23 +94,4 @@
             <scope>test</scope>
         </dependency>
     </dependencies>
-
-
-    <build>
-        <plugins>
-            <plugin>
-                <groupId>org.apache.maven.plugins</groupId>
-                <artifactId>maven-surefire-plugin</artifactId>
-                <configuration>
-                    <excludes>
-                        <exclude>**/integration/TestConsumeMQTT.java</exclude>
-                        <exclude>**/integration/TestConsumeMqttSSL.java</exclude>
-                        <exclude>**/integration/TestPublishAndSubscribeMqttIntegration.java</exclude>
-                        <exclude>**/integration/TestPublishMQTT.java</exclude>
-                        <exclude>**/integration/TestPublishMqttSSL.java</exclude>
-                    </excludes>
-                </configuration>
-            </plugin>
-        </plugins>
-    </build>
 </project>
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 7c5a7ff49c..46d1452cea 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -42,10 +42,11 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
-import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
 import org.apache.nifi.serialization.MalformedRecordException;
 import org.apache.nifi.serialization.RecordReader;
 import org.apache.nifi.serialization.RecordReaderFactory;
@@ -58,10 +59,6 @@ import org.apache.nifi.serialization.record.Record;
 import org.apache.nifi.serialization.record.RecordField;
 import org.apache.nifi.serialization.record.RecordFieldType;
 import org.apache.nifi.serialization.record.RecordSchema;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
 
 import java.io.ByteArrayInputStream;
 import java.io.IOException;
@@ -209,7 +206,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
     private volatile String topicFilter;
     private final AtomicBoolean scheduled = new AtomicBoolean(false);
 
-    private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
+    private volatile LinkedBlockingQueue<ReceivedMqttMessage> mqttQueue;
 
     public static final Relationship REL_MESSAGE = new Relationship.Builder()
             .name("Message")
@@ -226,7 +223,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
     private static final List<PropertyDescriptor> descriptors;
     private static final Set<Relationship> relationships;
 
-    static{
+    static {
         final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors();
         innerDescriptorsList.add(PROP_GROUPID);
         innerDescriptorsList.add(PROP_TOPIC_FILTER);
@@ -238,7 +235,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
         innerDescriptorsList.add(MESSAGE_DEMARCATOR);
         descriptors = Collections.unmodifiableList(innerDescriptorsList);
 
-        final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>();
+        final Set<Relationship> innerRelationshipsSet = new HashSet<>();
         innerRelationshipsSet.add(REL_MESSAGE);
         innerRelationshipsSet.add(REL_PARSE_FAILURE);
         relationships = Collections.unmodifiableSet(innerRelationshipsSet);
@@ -249,15 +246,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
         // resize the receive buffer, but preserve data
         if (descriptor == PROP_MAX_QUEUE_SIZE) {
             // it's a mandatory integer, never null
-            int newSize = Integer.valueOf(newValue);
+            int newSize = Integer.parseInt(newValue);
             if (mqttQueue != null) {
                 int msgPending = mqttQueue.size();
                 if (msgPending > newSize) {
-                    logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.",
-                            new Object[]{newSize, msgPending});
+                    logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", newSize, msgPending);
                     return;
                 }
-                LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
+                LinkedBlockingQueue<ReceivedMqttMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
                 mqttQueue.drainTo(newBuffer);
                 mqttQueue = newBuffer;
             }
@@ -297,15 +293,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
         final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
         final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
-        if((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
+        if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
             results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
-                    .explanation("Both Record Reader and Writer must be set when used").build());
+                    .explanation("both Record Reader and Writer must be set when used.").build());
         }
 
         final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
-        if(readerIsSet && demarcatorIsSet) {
+        if (readerIsSet && demarcatorIsSet) {
             results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
-                    .explanation("You cannot use both a demarcator and a Reader/Writer").build());
+                    .explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
         }
 
         return results;
@@ -346,17 +342,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
     public void onUnscheduled(final ProcessContext context) {
         scheduled.set(false);
         synchronized (this) {
-            super.onStopped();
+            stopClient();
         }
     }
 
 
     @OnStopped
-    public void onStopped(final ProcessContext context) throws IOException {
-        if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
+    public void onStopped(final ProcessContext context) {
+        if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
             logger.info("Finishing processing leftover messages");
             ProcessSession session = processSessionFactory.createSession();
-            if(context.getProperty(RECORD_READER).isSet()) {
+            if (context.getProperty(RECORD_READER).isSet()) {
                 transferQueueRecord(context, session);
             } else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
                 transferQueueDemarcator(context, session);
@@ -364,7 +360,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
                 transferQueue(session);
             }
         } else {
-            if (mqttQueue!= null && !mqttQueue.isEmpty()){
+            if (mqttQueue != null && !mqttQueue.isEmpty()) {
                 throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " +
                         "clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages " +
                         "in the MQTT internal queue cannot finish processing until until the processor is triggered to run.");
@@ -375,7 +371,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
         final boolean isScheduled = scheduled.get();
-        if (!isConnected() && isScheduled){
+        if (!isConnected() && isScheduled) {
             synchronized (this) {
                 if (!isConnected()) {
                     initializeClient(context);
@@ -402,42 +398,27 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
         // non-null but not connected, so we need to handle each case and only create a new client when it is null
         try {
             if (mqttClient == null) {
-                logger.debug("Creating client");
-                mqttClient = createMqttClient(broker, clientID, persistence);
+                mqttClient = createMqttClient();
                 mqttClient.setCallback(this);
             }
 
             if (!mqttClient.isConnected()) {
-                logger.debug("Connecting client");
-                mqttClient.connect(connOpts);
+                mqttClient.connect();
                 mqttClient.subscribe(topicPrefix + topicFilter, qos);
             }
-        } catch (MqttException e) {
-            logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+        } catch (Exception e) {
+            logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{clientProperties.getBroker()}, e);
+            mqttClient = null; // prevent stucked processor when subscribe fails
             context.yield();
         }
     }
 
-    private void transferQueue(ProcessSession session){
+    private void transferQueue(ProcessSession session) {
         while (!mqttQueue.isEmpty()) {
-            final MQTTQueueMessage mqttMessage = mqttQueue.peek();
-            FlowFile messageFlowfile = session.create();
-
-            Map<String, String> attrs = new HashMap<>();
-            attrs.put(BROKER_ATTRIBUTE_KEY, broker);
-            attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
-            attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
-            attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
-            attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
-
-            messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
-
-            messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
-                @Override
-                public void process(final OutputStream out) throws IOException {
-                    out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
-                }
-            });
+            final ReceivedMqttMessage mqttMessage = mqttQueue.peek();
+
+            final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage),
+                    out -> out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()));
 
             session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
             session.transfer(messageFlowfile, REL_MESSAGE);
@@ -446,17 +427,16 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
         }
     }
 
-    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+    private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session) {
         final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
 
         FlowFile messageFlowfile = session.create();
-        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
-
+        session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
 
         messageFlowfile = session.append(messageFlowfile, out -> {
             int i = 0;
             while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
-                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+                final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
                 out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
                 out.write(demarcator);
                 session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
@@ -469,41 +449,40 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
         session.commitAsync();
     }
 
-    private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+    private void transferFailure(final ProcessSession session, final ReceivedMqttMessage mqttMessage) {
+        final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage),
+                out -> out.write(mqttMessage.getPayload()));
+
+        session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
+        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+    }
+
+    private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, ReceivedMqttMessage mqttMessage) {
         FlowFile messageFlowfile = session.create();
 
         Map<String, String> attrs = new HashMap<>();
-        attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+        attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
         attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
         attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
         attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
         attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
 
         messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
-
-        messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
-            @Override
-            public void process(final OutputStream out) throws IOException {
-                out.write(mqttMessage.getPayload());
-            }
-        });
-
-        session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
-        session.transfer(messageFlowfile, REL_PARSE_FAILURE);
-        session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+        return messageFlowfile;
     }
 
-    private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+    private void transferQueueRecord(final ProcessContext context, final ProcessSession session) {
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
 
         FlowFile flowFile = session.create();
-        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+        session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
 
         final Map<String, String> attributes = new HashMap<>();
         final AtomicInteger recordCount = new AtomicInteger();
 
-        final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+        final List<ReceivedMqttMessage> doneList = new ArrayList<>();
 
         RecordSetWriter writer = null;
         boolean isWriterInitialized = false;
@@ -511,8 +490,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
         try {
             while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
-                final MQTTQueueMessage mqttMessage = mqttQueue.poll();
-                if(mqttMessage == null) {
+                final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
+                if (mqttMessage == null) {
                     break;
                 }
 
@@ -533,16 +512,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
                         Record record;
                         while ((record = reader.nextRecord()) != null) {
 
-                            if(!isWriterInitialized) {
+                            if (!isWriterInitialized) {
                                 final RecordSchema recordSchema = record.getSchema();
                                 final OutputStream rawOut = session.write(flowFile);
 
                                 RecordSchema writeSchema;
                                 try {
                                     writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
-                                    if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
-                                        final List<RecordField> fields = new ArrayList<>();
-                                        fields.addAll(writeSchema.getFields());
+                                    if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                        final List<RecordField> fields = new ArrayList<>(writeSchema.getFields());
 
                                         fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
                                         fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
@@ -562,7 +540,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
                             }
 
                             try {
-                                if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+                                if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
                                     record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
                                     record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
                                     record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
@@ -583,16 +561,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
                     } catch (final IOException | MalformedRecordException | SchemaValidationException e) {
                         logger.error("Failed to write message, sending to the parse failure relationship", e);
                         transferFailure(session, mqttMessage);
-                        continue;
                     }
                 } catch (Exception e) {
                     logger.error("Failed to write message, sending to the parse failure relationship", e);
                     transferFailure(session, mqttMessage);
-                    continue;
                 }
             }
 
-            if(writer != null) {
+            if (writer != null) {
                 final WriteResult writeResult = writer.finishRecordSet();
                 attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
                 attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
@@ -605,26 +581,26 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
             // we try to add the messages back into the internal queue
             int numberOfMessages = 0;
-            for(MQTTQueueMessage done : doneList) {
+            for (ReceivedMqttMessage done : doneList) {
                 try {
                     mqttQueue.offer(done, 1, TimeUnit.SECONDS);
                 } catch (InterruptedException ex) {
                     numberOfMessages++;
-                    if(getLogger().isDebugEnabled()) {
+                    if (getLogger().isDebugEnabled()) {
                         logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex);
                     }
                 }
             }
-            if(numberOfMessages > 0) {
-                logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages});
+            if (numberOfMessages > 0) {
+                logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", numberOfMessages);
             }
 
-            throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e);
+            throw new ProcessException("Could not process data received from the MQTT broker(s): " + clientProperties.getBroker(), e);
         } finally {
             closeWriter(writer);
         }
 
-        if(recordCount.get() == 0) {
+        if (recordCount.get() == 0) {
             session.remove(flowFile);
             return;
         }
@@ -635,7 +611,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
         final int count = recordCount.get();
         session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false);
-        getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile});
+        getLogger().info("Successfully processed {} records for {}", count, flowFile);
     }
 
     private void closeWriter(final RecordSetWriter writer) {
@@ -649,8 +625,9 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
     }
 
     private String getTransitUri(String... appends) {
-        StringBuilder stringBuilder = new StringBuilder(brokerUri);
-        for(String append : appends) {
+        String broker = clientProperties.getBrokerUri().toString();
+        StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ? broker : broker + "/");
+        for (String append : appends) {
             stringBuilder.append(append);
         }
         return stringBuilder.toString();
@@ -658,29 +635,34 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
     @Override
     public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
+        logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
     }
 
     @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
+    public void messageArrived(ReceivedMqttMessage message) {
         if (logger.isDebugEnabled()) {
             byte[] payload = message.getPayload();
-            String text = new String(payload, "UTF-8");
+            String text = new String(payload, StandardCharsets.UTF_8);
             if (StringUtils.isAsciiPrintable(text)) {
-                logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text});
+                logger.debug("Message arrived from topic {}. Payload: {}", message.getTopic(), text);
             } else {
-                logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length});
+                logger.debug("Message arrived from topic {}. Binary value of size {}", message.getTopic(), payload.length);
             }
         }
 
-        if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1, TimeUnit.SECONDS)) {
-            throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+        try {
+            if (!mqttQueue.offer(message, 1, TimeUnit.SECONDS)) {
+                throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+            }
+        } catch (InterruptedException e) {
+            throw new MqttException("Failed to process message arrived from topic " + message.getTopic());
         }
     }
 
     @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
-        logger.warn("Received MQTT 'delivery complete' message to subscriber: " + token);
+    public void deliveryComplete(String token) {
+        // Unlikely situation. Api uses the same callback for publisher and consumer as well.
+        // That's why we have this log message here to indicate something really messy thing happened.
+        logger.error("Received MQTT 'delivery complete' message to subscriber. Token: [{}]", token);
     }
-
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index 0db7615d7e..2ea80b1ded 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -36,18 +36,15 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
 import org.apache.nifi.stream.io.StreamUtils;
 import org.apache.nifi.util.StopWatch;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
 
-import java.io.IOException;
-import java.io.InputStream;
 import java.util.Arrays;
 import java.util.Collections;
 import java.util.HashSet;
@@ -137,18 +134,18 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
     @OnStopped
     public void onStopped(final ProcessContext context) {
         synchronized (this) {
-            super.onStopped();
+            stopClient();
         }
     }
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        FlowFile flowfile = session.get();
+        final FlowFile flowfile = session.get();
         if (flowfile == null) {
             return;
         }
 
-        if (!isConnected()){
+        if (!isConnected()) {
             synchronized (this) {
                 if (!isConnected()) {
                     initializeClient(context);
@@ -157,7 +154,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
         }
 
         // get the MQTT topic
-        String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();
+        final String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();
 
         if (topic == null || topic.isEmpty()) {
             logger.warn("Evaluation of the topic property returned null or evaluated to be empty, routing to failure");
@@ -167,18 +164,11 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
 
         // do the read
         final byte[] messageContent = new byte[(int) flowfile.getSize()];
-        session.read(flowfile, new InputStreamCallback() {
-            @Override
-            public void process(final InputStream in) throws IOException {
-                StreamUtils.fillBuffer(in, messageContent, true);
-            }
-        });
+        session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true));
 
         int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
-        final MqttMessage mqttMessage = new MqttMessage(messageContent);
-        mqttMessage.setQos(qos);
-        mqttMessage.setPayload(messageContent);
-        mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean());
+        boolean retained = context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean();
+        final StandardMqttMessage mqttMessage = new StandardMqttMessage(messageContent, qos, retained);
 
         try {
             final StopWatch stopWatch = new StopWatch(true);
@@ -188,9 +178,9 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
              */
             mqttClient.publish(topic, mqttMessage);
 
-            session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+            session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
             session.transfer(flowfile, REL_SUCCESS);
-        } catch(MqttException me) {
+        } catch (MqttException me) {
             logger.error("Failed to publish message.", me);
             session.transfer(flowfile, REL_FAILURE);
         }
@@ -201,35 +191,35 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
         // non-null but not connected, so we need to handle each case and only create a new client when it is null
         try {
             if (mqttClient == null) {
-                logger.debug("Creating client");
-                mqttClient = createMqttClient(broker, clientID, persistence);
+                mqttClient = createMqttClient();
                 mqttClient.setCallback(this);
             }
 
             if (!mqttClient.isConnected()) {
-                logger.debug("Connecting client");
-                mqttClient.connect(connOpts);
+                mqttClient.connect();
             }
-        } catch (MqttException e) {
-            logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+        } catch (Exception e) {
+            logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", clientProperties.getBroker(), e);
             context.yield();
         }
     }
 
     @Override
     public void connectionLost(Throwable cause) {
-        logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
+        logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
     }
 
     @Override
-    public void messageArrived(String topic, MqttMessage message) throws Exception {
-        logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+    public void messageArrived(ReceivedMqttMessage message) {
+        // Unlikely situation. Api uses the same callback for publisher and consumer as well.
+        // That's why we have this log message here to indicate something really messy thing happened.
+        logger.error("Message arrived to a PublishMQTT processor { topic:'" + message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) + "}");
     }
 
     @Override
-    public void deliveryComplete(IMqttDeliveryToken token) {
+    public void deliveryComplete(String token) {
         // Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
-        logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
+        logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token);
     }
 
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
new file mode 100644
index 0000000000..7532411055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.SSL;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WS;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WSS;
+
+public class HiveMqV5ClientAdapter implements MqttClient {
+
+    private final Mqtt5BlockingClient mqtt5BlockingClient;
+    private final MqttClientProperties clientProperties;
+    private final ComponentLog logger;
+
+    private MqttCallback callback;
+
+    public HiveMqV5ClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
+        this.mqtt5BlockingClient = createClient(clientProperties, logger);
+        this.clientProperties = clientProperties;
+        this.logger = logger;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return mqtt5BlockingClient.getState().isConnected();
+    }
+
+    @Override
+    public void connect() {
+        logger.debug("Connecting to broker");
+
+        final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+                .keepAlive(clientProperties.getKeepAliveInterval());
+
+        final boolean cleanSession = clientProperties.isCleanSession();
+        connectBuilder.cleanStart(cleanSession);
+        if (!cleanSession) {
+            connectBuilder.sessionExpiryInterval(clientProperties.getSessionExpiryInterval());
+        }
+
+        final String lastWillTopic = clientProperties.getLastWillTopic();
+        if (lastWillTopic != null) {
+            connectBuilder.willPublish()
+                    .topic(lastWillTopic)
+                    .payload(clientProperties.getLastWillMessage().getBytes())
+                    .retain(clientProperties.getLastWillRetain())
+                    .qos(MqttQos.fromCode(clientProperties.getLastWillQos()))
+                    .applyWillPublish();
+        }
+
+        final String username = clientProperties.getUsername();
+        final String password = clientProperties.getPassword();
+        if (username != null && password != null) {
+            connectBuilder.simpleAuth()
+                    .username(clientProperties.getUsername())
+                    .password(password.getBytes(StandardCharsets.UTF_8))
+                    .applySimpleAuth();
+        }
+
+        final Mqtt5Connect mqtt5Connect = connectBuilder.build();
+        mqtt5BlockingClient.connect(mqtt5Connect);
+    }
+
+    @Override
+    public void disconnect() {
+        logger.debug("Disconnecting client");
+        // Currently it is not possible to set timeout for disconnect with HiveMQ Client.
+        mqtt5BlockingClient.disconnect();
+    }
+
+    @Override
+    public void close() {
+        // there is no paho's close equivalent in hivemq client
+    }
+
+    @Override
+    public void publish(String topic, StandardMqttMessage message) {
+        logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos());
+
+        mqtt5BlockingClient.publishWith()
+                .topic(topic)
+                .payload(message.getPayload())
+                .retain(message.isRetained())
+                .qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos())))
+                .send();
+    }
+
+    @Override
+    public void subscribe(String topicFilter, int qos) {
+        Objects.requireNonNull(callback, "callback should be set");
+
+        logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
+
+        CompletableFuture<Mqtt5SubAck> futureAck = mqtt5BlockingClient.toAsync().subscribeWith()
+                .topicFilter(topicFilter)
+                .qos(Objects.requireNonNull(MqttQos.fromCode(qos)))
+                .callback(mqtt5Publish -> {
+                    final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(
+                            mqtt5Publish.getPayloadAsBytes(),
+                            mqtt5Publish.getQos().getCode(),
+                            mqtt5Publish.isRetain(),
+                            mqtt5Publish.getTopic().toString());
+                    callback.messageArrived(receivedMessage);
+                })
+                .send();
+
+        // Setting "listener" callback is only possible with async client, though sending subscribe message
+        // should happen in a blocking way to make sure the processor is blocked until ack is not arrived.
+        try {
+            Mqtt5SubAck ack = futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
+            logger.debug("Received mqtt5 subscribe ack: {}", ack);
+        } catch (Exception e) {
+            throw new MqttException("An error has occurred during sending subscribe message to broker", e);
+        }
+    }
+
+    @Override
+    public void setCallback(MqttCallback callback) {
+        this.callback = callback;
+    }
+
+    private static Mqtt5BlockingClient createClient(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
+        logger.debug("Creating Mqtt v5 client");
+
+        Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder()
+                .identifier(clientProperties.getClientId())
+                .serverHost(clientProperties.getBrokerUri().getHost());
+
+        int port = clientProperties.getBrokerUri().getPort();
+        if (port != -1) {
+            mqtt5ClientBuilder.serverPort(port);
+        }
+
+        // default is tcp
+        if (WS.equals(clientProperties.getScheme()) || WSS.equals(clientProperties.getScheme())) {
+            mqtt5ClientBuilder.webSocketConfig().applyWebSocketConfig();
+        }
+
+        if (SSL.equals(clientProperties.getScheme())) {
+            if (clientProperties.getTlsConfiguration().getTruststorePath() != null) {
+                mqtt5ClientBuilder
+                        .sslConfig()
+                        .trustManagerFactory(KeyStoreUtils.loadTrustManagerFactory(
+                                clientProperties.getTlsConfiguration().getTruststorePath(),
+                                clientProperties.getTlsConfiguration().getTruststorePassword(),
+                                clientProperties.getTlsConfiguration().getTruststoreType().getType()))
+                        .applySslConfig();
+            }
+
+            if (clientProperties.getTlsConfiguration().getKeystorePath() != null) {
+                mqtt5ClientBuilder
+                        .sslConfig()
+                        .keyManagerFactory(KeyStoreUtils.loadKeyManagerFactory(
+                                clientProperties.getTlsConfiguration().getKeystorePath(),
+                                clientProperties.getTlsConfiguration().getKeystorePassword(),
+                                null,
+                                clientProperties.getTlsConfiguration().getKeystoreType().getType()))
+                        .applySslConfig();
+            }
+        }
+
+        return mqtt5ClientBuilder.buildBlocking();
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
new file mode 100644
index 0000000000..90cdc5c1c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.Properties;
+
+public class PahoMqttClientAdapter implements MqttClient {
+
+    public static final int DISCONNECT_TIMEOUT = 5000;
+
+    private final IMqttClient client;
+    private final MqttClientProperties clientProperties;
+    private final ComponentLog logger;
+
+    public PahoMqttClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) {
+        this.client = createClient(clientProperties, logger);
+        this.clientProperties = clientProperties;
+        this.logger = logger;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return client.isConnected();
+    }
+
+    @Override
+    public void connect() {
+        logger.debug("Connecting to broker");
+
+        try {
+            final MqttConnectOptions connectOptions = new MqttConnectOptions();
+
+            connectOptions.setCleanSession(clientProperties.isCleanSession());
+            connectOptions.setKeepAliveInterval(clientProperties.getKeepAliveInterval());
+            connectOptions.setMqttVersion(clientProperties.getMqttVersion().getVersionCode());
+            connectOptions.setConnectionTimeout(clientProperties.getConnectionTimeout());
+
+            final TlsConfiguration tlsConfiguration = clientProperties.getTlsConfiguration();
+            if (tlsConfiguration != null) {
+                connectOptions.setSSLProperties(transformSSLContextService(tlsConfiguration));
+            }
+
+            final String lastWillTopic = clientProperties.getLastWillTopic();
+            if (lastWillTopic != null) {
+                boolean lastWillRetain = clientProperties.getLastWillRetain() != null && clientProperties.getLastWillRetain();
+                connectOptions.setWill(lastWillTopic, clientProperties.getLastWillMessage().getBytes(), clientProperties.getLastWillQos(), lastWillRetain);
+            }
+
+            final String username = clientProperties.getUsername();
+            if (username != null) {
+                connectOptions.setUserName(username);
+                connectOptions.setPassword(clientProperties.getPassword().toCharArray());
+            }
+
+            client.connect(connectOptions);
+        } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+            throw new MqttException("An error has occurred during connecting to broker", e);
+        }
+    }
+
+    @Override
+    public void disconnect() {
+        logger.debug("Disconnecting client with timeout: {}", DISCONNECT_TIMEOUT);
+
+        try {
+            client.disconnect(DISCONNECT_TIMEOUT);
+        } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+            throw new MqttException("An error has occurred during disconnecting client with timeout: " + DISCONNECT_TIMEOUT, e);
+        }
+    }
+
+    @Override
+    public void close() {
+        logger.debug("Closing client");
+
+        try {
+            client.close();
+        } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+            throw new MqttException("An error has occurred during closing client", e);
+        }
+    }
+
+    @Override
+    public void publish(String topic, StandardMqttMessage message) {
+        logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos());
+
+        try {
+            client.publish(topic, message.getPayload(), message.getQos(), message.isRetained());
+        } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+            throw new MqttException("An error has occurred during publishing message to " + topic + " with QoS: " + message.getQos(), e);
+        }
+    }
+
+    @Override
+    public void subscribe(String topicFilter, int qos) {
+        logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
+
+        try {
+            client.subscribe(topicFilter, qos);
+        } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+            throw new MqttException("An error has occurred during subscribing to " + topicFilter + " with QoS: " + qos, e);
+        }
+    }
+
+    @Override
+    public void setCallback(MqttCallback callback) {
+        client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallback() {
+            @Override
+            public void connectionLost(Throwable cause) {
+                callback.connectionLost(cause);
+            }
+
+            @Override
+            public void messageArrived(String topic, MqttMessage message) {
+                logger.debug("Message arrived with id: {}", message.getId());
+                final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic);
+                callback.messageArrived(receivedMessage);
+            }
+
+            @Override
+            public void deliveryComplete(IMqttDeliveryToken token) {
+                callback.deliveryComplete(token.toString());
+            }
+        });
+    }
+
+    public static Properties transformSSLContextService(TlsConfiguration tlsConfiguration) {
+        final Properties properties = new Properties();
+        if (tlsConfiguration.getProtocol() != null) {
+            properties.setProperty("com.ibm.ssl.protocol", tlsConfiguration.getProtocol());
+        }
+        if (tlsConfiguration.getKeystorePath() != null) {
+            properties.setProperty("com.ibm.ssl.keyStore", tlsConfiguration.getKeystorePath());
+        }
+        if (tlsConfiguration.getKeystorePassword() != null) {
+            properties.setProperty("com.ibm.ssl.keyStorePassword", tlsConfiguration.getKeystorePassword());
+        }
+        if (tlsConfiguration.getKeystoreType() != null) {
+            properties.setProperty("com.ibm.ssl.keyStoreType", tlsConfiguration.getKeystoreType().getType());
+        }
+        if (tlsConfiguration.getTruststorePath() != null) {
+            properties.setProperty("com.ibm.ssl.trustStore", tlsConfiguration.getTruststorePath());
+        }
+        if (tlsConfiguration.getTruststorePassword() != null) {
+            properties.setProperty("com.ibm.ssl.trustStorePassword", tlsConfiguration.getTruststorePassword());
+        }
+        if (tlsConfiguration.getTruststoreType() != null) {
+            properties.setProperty("com.ibm.ssl.trustStoreType", tlsConfiguration.getTruststoreType().getType());
+        }
+        return  properties;
+    }
+
+    private static org.eclipse.paho.client.mqttv3.MqttClient createClient(MqttClientProperties clientProperties, ComponentLog logger) {
+        logger.debug("Creating Mqtt v3 client");
+
+        try {
+            return new org.eclipse.paho.client.mqttv3.MqttClient(clientProperties.getBroker(), clientProperties.getClientId(), new MemoryPersistence());
+        } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+            throw new MqttException("An error has occurred during creating adapter for MQTT v3 client", e);
+        }
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 13bc624216..8b8c186360 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -31,25 +31,25 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessSessionFactory;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.util.ArrayList;
+import java.util.Arrays;
 import java.util.Collection;
 import java.util.List;
-import java.util.Properties;
 import java.util.UUID;
+import java.util.concurrent.TimeUnit;
 
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
 import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
@@ -57,63 +57,68 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
 
 public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
 
-    public static int DISCONNECT_TIMEOUT = 5000;
+    private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
 
     protected ComponentLog logger;
-    protected IMqttClient mqttClient;
-    protected volatile String broker;
-    protected volatile String brokerUri;
-    protected volatile String clientID;
-    protected MqttConnectOptions connOpts;
-    protected MemoryPersistence persistence = new MemoryPersistence();
 
-    public ProcessSessionFactory processSessionFactory;
+    protected MqttClientProperties clientProperties;
 
-    public static final Validator QOS_VALIDATOR = new Validator() {
+    protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+    protected MqttClient mqttClient;
 
-        @Override
-        public ValidationResult validate(String subject, String input, ValidationContext context) {
-            Integer inputInt = Integer.parseInt(input);
-            if (inputInt < 0 || inputInt > 2) {
-                return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2").build();
-            }
-            return new ValidationResult.Builder().subject(subject).valid(true).build();
+    public ProcessSessionFactory processSessionFactory;
+
+    public static final Validator QOS_VALIDATOR = (subject, input, context) -> {
+        Integer inputInt = Integer.parseInt(input);
+        if (inputInt < 0 || inputInt > 2) {
+            return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2.").build();
         }
+        return new ValidationResult.Builder().subject(subject).valid(true).build();
     };
 
-    public static final Validator BROKER_VALIDATOR = new Validator() {
-
-        @Override
-        public ValidationResult validate(String subject, String input, ValidationContext context) {
-            try{
-                URI brokerURI = new URI(input);
-                if (!"".equals(brokerURI.getPath())) {
-                    return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
-                }
-                if (!("tcp".equals(brokerURI.getScheme()) || "ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || "wss".equals(brokerURI.getScheme()))) {
-                    return new ValidationResult.Builder().subject(subject).valid(false).explanation("only the 'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
-                }
-            } catch (URISyntaxException e) {
-                return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
+    public static final Validator BROKER_VALIDATOR = (subject, input, context) -> {
+        try {
+            URI brokerURI = new URI(input);
+            if (!EMPTY.equals(brokerURI.getPath())) {
+                return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is: " + brokerURI.getPath()).build();
             }
-            return new ValidationResult.Builder().subject(subject).valid(true).build();
+            if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, brokerURI.getScheme())) {
+                return new ValidationResult.Builder().subject(subject).valid(false)
+                        .explanation("scheme is invalid. Supported schemes are: " + getSupportedSchemeList()).build();
+            }
+        } catch (URISyntaxException e) {
+            return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
         }
+        return new ValidationResult.Builder().subject(subject).valid(true).build();
     };
 
-    public static final Validator RETAIN_VALIDATOR = new Validator() {
-
-        @Override
-        public ValidationResult validate(String subject, String input, ValidationContext context) {
-            if("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)){
-                return new ValidationResult.Builder().subject(subject).valid(true).build();
-            } else{
-                return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
-                        .validate(subject, input, context);
-            }
+    private static String getSupportedSchemeList() {
+        return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new));
+    }
 
+    public static final Validator RETAIN_VALIDATOR = (subject, input, context) -> {
+        if ("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)) {
+            return new ValidationResult.Builder().subject(subject).valid(true).build();
+        } else {
+            return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
+                    .validate(subject, input, context);
         }
+
     };
 
+    public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder()
+            .name("MQTT Specification Version")
+            .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.")
+            .allowableValues(
+                    ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
+                    ALLOWABLE_VALUE_MQTT_VERSION_500,
+                    ALLOWABLE_VALUE_MQTT_VERSION_311,
+                    ALLOWABLE_VALUE_MQTT_VERSION_310
+            )
+            .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
+            .required(true)
+            .build();
+
     public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder()
             .name("Broker URI")
             .description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context " +
@@ -123,7 +128,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
             .addValidator(BROKER_VALIDATOR)
             .build();
 
-
     public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder()
             .name("Client ID")
             .description("MQTT client ID to use. If not set, a UUID will be generated.")
@@ -155,7 +159,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
             .identifiesControllerService(SSLContextService.class)
             .build();
 
-
     public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder()
             .name("Last Will Topic")
             .description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.")
@@ -174,7 +177,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
             .name("Last Will Retain")
             .description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.")
             .required(false)
-            .allowableValues("true","false")
+            .allowableValues("true", "false")
             .build();
 
     public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder()
@@ -190,7 +193,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
 
     public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder()
             .name("Session state")
-            .description("Whether to start afresh or resume previous flows. See the allowable value descriptions for more details.")
+            .description("Whether to start a fresh or resume previous flows. See the allowable value descriptions for more details.")
             .required(true)
             .allowableValues(
                     ALLOWABLE_VALUE_CLEAN_SESSION_TRUE,
@@ -199,16 +202,13 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
             .defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue())
             .build();
 
-    public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder()
-            .name("MQTT Specification Version")
-            .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.")
-            .allowableValues(
-                    ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
-                    ALLOWABLE_VALUE_MQTT_VERSION_311,
-                    ALLOWABLE_VALUE_MQTT_VERSION_310
-            )
-            .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
-            .required(true)
+    public static final PropertyDescriptor PROP_SESSION_EXPIRY_INTERVAL = new PropertyDescriptor.Builder()
+            .name("Session Expiry Interval")
+            .description("After this interval the broker will expire the client and clear the session state.")
+            .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+            .dependsOn(PROP_MQTT_VERSION, ALLOWABLE_VALUE_MQTT_VERSION_500)
+            .dependsOn(PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE)
+            .defaultValue(DEFAULT_SESSION_EXPIRY_INTERVAL)
             .build();
 
     public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder()
@@ -232,8 +232,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
             .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
             .build();
 
-    public static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
-        final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+    public static List<PropertyDescriptor> getAbstractPropertyDescriptors() {
+        final List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.add(PROP_BROKER_URI);
         descriptors.add(PROP_CLIENTID);
         descriptors.add(PROP_USERNAME);
@@ -244,6 +244,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
         descriptors.add(PROP_LAST_WILL_RETAIN);
         descriptors.add(PROP_LAST_WILL_QOS);
         descriptors.add(PROP_CLEAN_SESSION);
+        descriptors.add(PROP_SESSION_EXPIRY_INTERVAL);
         descriptors.add(PROP_MQTT_VERSION);
         descriptors.add(PROP_CONN_TIMEOUT);
         descriptors.add(PROP_KEEP_ALIVE_INTERVAL);
@@ -257,7 +258,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
         final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet();
 
         if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
-            results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set").build());
+            results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set.").build());
         }
 
         final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet();
@@ -269,7 +270,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
         // If any of the Last Will Properties are set
         if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) {
             // And any are not set
-            if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)){
+            if (!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)) {
                 // Then mark as invalid
                 results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " +
                         "set, all must be set.").build());
@@ -289,88 +290,34 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
         return results;
     }
 
-    public static Properties transformSSLContextService(SSLContextService sslContextService){
-        Properties properties = new Properties();
-        if (sslContextService.getSslAlgorithm() != null) {
-            properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm());
-        }
-        if (sslContextService.getKeyStoreFile() != null) {
-            properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile());
-        }
-        if (sslContextService.getKeyStorePassword() != null) {
-            properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword());
-        }
-        if (sslContextService.getKeyStoreType() != null) {
-            properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType());
-        }
-        if (sslContextService.getTrustStoreFile() != null) {
-            properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile());
-        }
-        if (sslContextService.getTrustStorePassword() != null) {
-            properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword());
-        }
-        if (sslContextService.getTrustStoreType() != null) {
-            properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType());
-        }
-        return  properties;
+    protected void onScheduled(final ProcessContext context) {
+        clientProperties = getMqttClientProperties(context);
     }
 
-    protected void onScheduled(final ProcessContext context){
-        broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
-        brokerUri = broker.endsWith("/") ? broker : broker + "/";
-        clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
-
-        if (clientID == null) {
-            clientID = UUID.randomUUID().toString();
-        }
-
-        connOpts = new MqttConnectOptions();
-        connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
-        connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
-        connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
-        connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
-
-        PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
-        if (sslProp.isSet()) {
-            Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
-            connOpts.setSSLProperties(sslProps);
-        }
-
-        PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
-        if (lastWillTopicProp.isSet()){
-            String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
-            PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
-            Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
-            connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
-        }
-
-
-        PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
-        if(usernameProp.isSet()) {
-            connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
-            connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
-        }
-    }
+    protected void stopClient() {
+        // Since client is created in the onTrigger method it can happen that it never will be created because of an initialization error.
+        // We are preventing additional nullPtrException here, but the clean solution would be to create the client in the onScheduled method.
+        if (mqttClient != null) {
+            try {
+                logger.info("Disconnecting client");
+                mqttClient.disconnect();
+            } catch (Exception e) {
+                logger.error("Error disconnecting MQTT client", e);
+            }
 
-    protected void onStopped() {
-        try {
-            logger.info("Disconnecting client");
-            mqttClient.disconnect(DISCONNECT_TIMEOUT);
-        } catch(MqttException me) {
-            logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me);
-        }
+            try {
+                logger.info("Closing client");
+                mqttClient.close();
+            } catch (Exception e) {
+                logger.error("Error closing MQTT client", e);
+            }
 
-        try {
-            logger.info("Closing client");
-            mqttClient.close();
             mqttClient = null;
-        } catch (MqttException me) {
-            logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me);
         }
     }
 
-    protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
-        return new MqttClient(broker, clientID, persistence);
+    protected MqttClient createMqttClient() throws TlsException {
+        return mqttClientFactory.create(clientProperties, getLogger());
     }
 
 
@@ -384,7 +331,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
             onTrigger(context, session);
             session.commitAsync();
         } catch (final Throwable t) {
-            getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
+            getLogger().error("{} failed to process due to {}; rolling back session", this, t);
             session.rollback(true);
             throw t;
         }
@@ -392,8 +339,52 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
 
     public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
 
-    protected boolean isConnected(){
+    protected boolean isConnected() {
         return (mqttClient != null && mqttClient.isConnected());
     }
 
+    protected MqttClientProperties getMqttClientProperties(final ProcessContext context) {
+        final MqttClientProperties clientProperties = new MqttClientProperties();
+
+        try {
+            clientProperties.setBrokerUri(new URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+        } catch (URISyntaxException e) {
+            throw new IllegalArgumentException("Invalid Broker URI", e);
+        }
+
+        String clientId = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
+        if (clientId == null) {
+            clientId = UUID.randomUUID().toString();
+        }
+        clientProperties.setClientId(clientId);
+
+        clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger()));
+
+        clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+        clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS));
+
+        clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+        clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+
+        final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+        if (sslProp.isSet()) {
+            final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService();
+            clientProperties.setTlsConfiguration(sslContextService.createTlsConfiguration());
+        }
+
+        clientProperties.setLastWillTopic(context.getProperty(PROP_LAST_WILL_TOPIC).getValue());
+        clientProperties.setLastWillMessage(context.getProperty(PROP_LAST_WILL_MESSAGE).getValue());
+        final PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
+        clientProperties.setLastWillRetain(lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
+        clientProperties.setLastWillQos(context.getProperty(PROP_LAST_WILL_QOS).asInteger());
+
+        final PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
+        if (usernameProp.isSet()) {
+            clientProperties.setUsername(usernameProp.evaluateAttributeExpressions().getValue());
+        }
+
+        clientProperties.setPassword(context.getProperty(PROP_PASSWORD).getValue());
+
+        return clientProperties;
+    }
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
similarity index 51%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
index d5e63c789b..a890616f5c 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
@@ -14,44 +14,10 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.mqtt.common;
 
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
-    private String topic;
-
-    private byte[] payload;
-    private int qos = 1;
-    private boolean retained = false;
-    private boolean duplicate = false;
-
-    public MQTTQueueMessage(String topic, MqttMessage message) {
-        this.topic = topic;
-        payload = message.getPayload();
-        qos = message.getQos();
-        retained = message.isRetained();
-        duplicate = message.isDuplicate();
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public byte[] getPayload() {
-        return payload;
-    }
-
-    public int getQos() {
-        return qos;
-    }
-
-    public boolean isRetained() {
-        return retained;
-    }
-
-    public boolean isDuplicate() {
-        return duplicate;
-    }
+public interface MqttCallback {
+    void connectionLost(Throwable cause);
+    void messageArrived(ReceivedMqttMessage message);
+    void deliveryComplete(String token);
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
new file mode 100644
index 0000000000..f21d3e9242
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+public interface MqttClient {
+
+    /**
+     * Determines if this client is currently connected to an MQTT broker.
+     *
+     * @return whether the client is connected.
+     */
+    boolean isConnected();
+
+    /**
+     * Connects the client to an MQTT broker.
+     */
+    void connect();
+
+    /**
+     * Disconnects client from an MQTT broker.
+     */
+    void disconnect();
+
+    /**
+     * Releases all resource associated with the client. After the client has
+     * been closed it cannot be reused. For instance attempts to connect will fail.
+     */
+    void close();
+
+    /**
+     * Publishes a message to a topic on the MQTT broker.
+     *
+     * @param topic to deliver the message to, for example "pipe-1/flow-rate"
+     * @param message to deliver to the MQTT broker
+     */
+    void publish(String topic, StandardMqttMessage message);
+
+    /**
+     * Subscribe to a topic.
+     *
+     * @param topicFilter the topic to subscribe to, which can include wildcards.
+     * @param qos the maximum quality of service at which to subscribe. Messages
+     *            published at a lower quality of service will be received at the published
+     *            QoS. Messages published at a higher quality of service will be received using
+     *            the QoS specified on the subscribe.
+     */
+    void subscribe(String topicFilter, int qos);
+
+    /**
+     * Sets a callback listener to use for events that happen asynchronously.
+     *
+     * @param callback for matching events
+     */
+    void setCallback(MqttCallback callback);
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java
new file mode 100644
index 0000000000..630385f5e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.adapters.HiveMqV5ClientAdapter;
+import org.apache.nifi.processors.mqtt.adapters.PahoMqttClientAdapter;
+import org.apache.nifi.security.util.TlsException;
+
+public class MqttClientFactory {
+    public MqttClient create(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
+        switch (clientProperties.getMqttVersion()) {
+            case MQTT_VERSION_3_AUTO:
+            case MQTT_VERSION_3_1:
+            case MQTT_VERSION_3_1_1:
+                return new PahoMqttClientAdapter(clientProperties, logger);
+            case MQTT_VERSION_5_0:
+                return new HiveMqV5ClientAdapter(clientProperties, logger);
+            default:
+                throw new MqttException("Unsupported Mqtt version: " + clientProperties.getMqttVersion());
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java
new file mode 100644
index 0000000000..eecde9b1b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.security.util.TlsConfiguration;
+
+import java.net.URI;
+
+public class MqttClientProperties {
+    private URI brokerUri;
+    private String clientId;
+
+    private MqttVersion mqttVersion;
+
+    private int keepAliveInterval;
+    private int connectionTimeout;
+
+    private boolean cleanSession;
+    private Long sessionExpiryInterval;
+
+    private TlsConfiguration tlsConfiguration;
+
+    private String lastWillTopic;
+    private String lastWillMessage;
+    private Boolean lastWillRetain;
+    private Integer lastWillQos;
+
+    private String username;
+    private String password;
+
+    public String getBroker() {
+        return brokerUri.toString();
+    }
+
+    public MqttProtocolScheme getScheme() {
+        return MqttProtocolScheme.valueOf(brokerUri.getScheme().toUpperCase());
+    }
+
+    public URI getBrokerUri() {
+        return brokerUri;
+    }
+
+    public void setBrokerUri(URI brokerUri) {
+        this.brokerUri = brokerUri;
+    }
+
+    public String getClientId() {
+        return clientId;
+    }
+
+    public void setClientId(String clientId) {
+        this.clientId = clientId;
+    }
+
+    public MqttVersion getMqttVersion() {
+        return mqttVersion;
+    }
+
+    public void setMqttVersion(MqttVersion mqttVersion) {
+        this.mqttVersion = mqttVersion;
+    }
+
+    public int getKeepAliveInterval() {
+        return keepAliveInterval;
+    }
+
+    public void setKeepAliveInterval(int keepAliveInterval) {
+        this.keepAliveInterval = keepAliveInterval;
+    }
+
+    public int getConnectionTimeout() {
+        return connectionTimeout;
+    }
+
+    public void setConnectionTimeout(int connectionTimeout) {
+        this.connectionTimeout = connectionTimeout;
+    }
+
+    public boolean isCleanSession() {
+        return cleanSession;
+    }
+
+    public void setCleanSession(boolean cleanSession) {
+        this.cleanSession = cleanSession;
+    }
+
+    public Long getSessionExpiryInterval() {
+        return sessionExpiryInterval;
+    }
+
+    public void setSessionExpiryInterval(Long sessionExpiryInterval) {
+        this.sessionExpiryInterval = sessionExpiryInterval;
+    }
+
+    public TlsConfiguration getTlsConfiguration() {
+        return tlsConfiguration;
+    }
+
+    public void setTlsConfiguration(TlsConfiguration tlsConfiguration) {
+        this.tlsConfiguration = tlsConfiguration;
+    }
+
+    public String getLastWillTopic() {
+        return lastWillTopic;
+    }
+
+    public void setLastWillTopic(String lastWillTopic) {
+        this.lastWillTopic = lastWillTopic;
+    }
+
+    public String getLastWillMessage() {
+        return lastWillMessage;
+    }
+
+    public void setLastWillMessage(String lastWillMessage) {
+        this.lastWillMessage = lastWillMessage;
+    }
+
+    public Boolean getLastWillRetain() {
+        return lastWillRetain;
+    }
+
+    public void setLastWillRetain(Boolean lastWillRetain) {
+        this.lastWillRetain = lastWillRetain;
+    }
+
+    public Integer getLastWillQos() {
+        return lastWillQos;
+    }
+
+    public void setLastWillQos(Integer lastWillQos) {
+        this.lastWillQos = lastWillQos;
+    }
+
+    public String getUsername() {
+        return username;
+    }
+
+    public void setUsername(String username) {
+        this.username = username;
+    }
+
+    public String getPassword() {
+        return password;
+    }
+
+    public void setPassword(String password) {
+        this.password = password;
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
index a29e6ff616..072708f6da 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
@@ -18,7 +18,11 @@
 package org.apache.nifi.processors.mqtt.common;
 
 import org.apache.nifi.components.AllowableValue;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1;
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1_1;
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_AUTO;
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_5_0;
 
 public class MqttConstants {
 
@@ -66,15 +70,16 @@ public class MqttConstants {
       ------------------------------------------
      */
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT),
-                    "AUTO",
+            new AllowableValue(String.valueOf(MQTT_VERSION_3_AUTO.getVersionCode()), MQTT_VERSION_3_AUTO.getDisplayName(),
                     "Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker");
 
+    public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_500 =
+            new AllowableValue(String.valueOf(MQTT_VERSION_5_0.getVersionCode()), MQTT_VERSION_5_0.getDisplayName());
+
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1),
-                    "v3.1.1");
+            new AllowableValue(String.valueOf(MQTT_VERSION_3_1_1.getVersionCode()), MQTT_VERSION_3_1_1.getDisplayName());
 
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1),
-                    "v3.1.0");
+            new AllowableValue(String.valueOf(MQTT_VERSION_3_1.getVersionCode()), MQTT_VERSION_3_1.getDisplayName());
+
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java
similarity index 52%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java
index d5e63c789b..cee1599136 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java
@@ -14,44 +14,15 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.mqtt.common;
 
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
-    private String topic;
-
-    private byte[] payload;
-    private int qos = 1;
-    private boolean retained = false;
-    private boolean duplicate = false;
-
-    public MQTTQueueMessage(String topic, MqttMessage message) {
-        this.topic = topic;
-        payload = message.getPayload();
-        qos = message.getQos();
-        retained = message.isRetained();
-        duplicate = message.isDuplicate();
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public byte[] getPayload() {
-        return payload;
-    }
-
-    public int getQos() {
-        return qos;
-    }
+public class MqttException extends RuntimeException {
 
-    public boolean isRetained() {
-        return retained;
+    public MqttException(String message) {
+        super(message);
     }
 
-    public boolean isDuplicate() {
-        return duplicate;
+    public MqttException(String message, Throwable cause) {
+        super(message, cause);
     }
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java
similarity index 51%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java
index d5e63c789b..1474694223 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java
@@ -14,44 +14,11 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.mqtt.common;
 
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
-    private String topic;
-
-    private byte[] payload;
-    private int qos = 1;
-    private boolean retained = false;
-    private boolean duplicate = false;
-
-    public MQTTQueueMessage(String topic, MqttMessage message) {
-        this.topic = topic;
-        payload = message.getPayload();
-        qos = message.getQos();
-        retained = message.isRetained();
-        duplicate = message.isDuplicate();
-    }
-
-    public String getTopic() {
-        return topic;
-    }
-
-    public byte[] getPayload() {
-        return payload;
-    }
-
-    public int getQos() {
-        return qos;
-    }
-
-    public boolean isRetained() {
-        return retained;
-    }
-
-    public boolean isDuplicate() {
-        return duplicate;
-    }
+public enum MqttProtocolScheme {
+    TCP,
+    SSL,
+    WS,
+    WSS
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java
new file mode 100644
index 0000000000..0aec86f138
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+public enum MqttVersion {
+    MQTT_VERSION_3_AUTO(MqttConnectOptions.MQTT_VERSION_DEFAULT, "v3 AUTO"),
+    MQTT_VERSION_3_1(MqttConnectOptions.MQTT_VERSION_3_1, "v3.1.0"),
+    MQTT_VERSION_3_1_1(MqttConnectOptions.MQTT_VERSION_3_1_1, "v3.1.1"),
+    MQTT_VERSION_5_0(5, "v5.0");
+
+    private final int versionCode;
+    private final String displayName;
+
+    MqttVersion(int versionCode, String displayName) {
+        this.versionCode = versionCode;
+        this.displayName = displayName;
+    }
+
+    public int getVersionCode() {
+        return versionCode;
+    }
+
+    public String getDisplayName() {
+        return displayName;
+    }
+
+    public static MqttVersion fromVersionCode(int versionCode) {
+        for (MqttVersion version : values()) {
+            if (version.getVersionCode() == versionCode) {
+                return version;
+            }
+        }
+        throw new IllegalArgumentException("Unable to map MqttVersionCode from version code: " + versionCode);
+    }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java
similarity index 61%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java
index d5e63c789b..abe1fe6281 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java
@@ -14,44 +14,25 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.mqtt.common;
 
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+/**
+ * Represents a received MQTT message
+ */
+public class ReceivedMqttMessage extends StandardMqttMessage {
 
-public class MQTTQueueMessage {
     private String topic;
 
-    private byte[] payload;
-    private int qos = 1;
-    private boolean retained = false;
-    private boolean duplicate = false;
-
-    public MQTTQueueMessage(String topic, MqttMessage message) {
+    public ReceivedMqttMessage(byte[] payload, int qos, boolean retained, String topic) {
+        super(payload, qos, retained);
         this.topic = topic;
-        payload = message.getPayload();
-        qos = message.getQos();
-        retained = message.isRetained();
-        duplicate = message.isDuplicate();
     }
 
     public String getTopic() {
         return topic;
     }
 
-    public byte[] getPayload() {
-        return payload;
-    }
-
-    public int getQos() {
-        return qos;
-    }
-
-    public boolean isRetained() {
-        return retained;
-    }
-
     public boolean isDuplicate() {
-        return duplicate;
+        return false;
     }
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java
similarity index 64%
rename from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
rename to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java
index d5e63c789b..2a232eab86 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java
@@ -14,29 +14,20 @@
  * See the License for the specific language governing permissions and
  * limitations under the License.
  */
-
 package org.apache.nifi.processors.mqtt.common;
 
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
-    private String topic;
-
+/**
+ * Represents a MQTT message.
+ */
+public class StandardMqttMessage {
     private byte[] payload;
-    private int qos = 1;
-    private boolean retained = false;
-    private boolean duplicate = false;
+    private int qos;
+    private boolean retained;
 
-    public MQTTQueueMessage(String topic, MqttMessage message) {
-        this.topic = topic;
-        payload = message.getPayload();
-        qos = message.getQos();
-        retained = message.isRetained();
-        duplicate = message.isDuplicate();
-    }
-
-    public String getTopic() {
-        return topic;
+    public StandardMqttMessage(byte[] payload, int qos, boolean retained) {
+        this.payload = payload;
+        this.qos = qos;
+        this.retained = retained;
     }
 
     public byte[] getPayload() {
@@ -50,8 +41,4 @@ public class MQTTQueueMessage {
     public boolean isRetained() {
         return retained;
     }
-
-    public boolean isDuplicate() {
-        return duplicate;
-    }
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index f66b4dd402..81ea6d7db1 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -18,8 +18,10 @@
 package org.apache.nifi.processors.mqtt;
 
 import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
 import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.security.util.SslContextFactory;
@@ -29,10 +31,6 @@ import org.apache.nifi.security.util.TlsException;
 import org.apache.nifi.ssl.SSLContextService;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.junit.jupiter.api.BeforeAll;
 import org.junit.jupiter.api.BeforeEach;
 import org.junit.jupiter.api.Test;
@@ -59,8 +57,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
         }
 
         @Override
-        public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
-            mqttTestClient =  new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber);
+        protected MqttClient createMqttClient() {
+            mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
             return mqttTestClient;
         }
     }
@@ -111,10 +109,10 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
     public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException {
         testRunner.run(1, false);
         ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
-        MQTTQueueMessage mock = mock(MQTTQueueMessage.class);
+        ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
         when(mock.getPayload()).thenReturn(new byte[0]);
         when(mock.getTopic()).thenReturn("testTopic");
-        BlockingQueue<MQTTQueueMessage> mqttQueue = getMqttQueue(processor);
+        BlockingQueue<ReceivedMqttMessage> mqttQueue = getMqttQueue(processor);
         mqttQueue.add(mock);
 
         ProcessSession session = testRunner.getProcessSessionFactory().createSession();
@@ -131,11 +129,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
     }
 
     @Override
-    public void internalPublish(final MqttMessage message, final String topicName) {
-        try {
-            mqttTestClient.publish(topicName, message);
-        } catch (MqttException e) {
-            throw new RuntimeException(e);
-        }
+    public void internalPublish(final StandardMqttMessage message, final String topicName) {
+        mqttTestClient.publish(topicName, message);
     }
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index 1755365c1c..41181c65ed 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -17,13 +17,12 @@
 
 package org.apache.nifi.processors.mqtt;
 
-import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttException;
 import org.apache.nifi.processors.mqtt.common.MqttTestClient;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
 import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
 import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
 import org.junit.jupiter.api.BeforeEach;
 
 import java.util.Arrays;
@@ -34,11 +33,12 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
 
     @Override
     public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
-        MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage;
-        assertEquals(Arrays.toString(payload), Arrays.toString(mqttQueueMessage.getPayload()));
-        assertEquals(qos, mqttQueueMessage.getQos());
-        assertEquals(retain, mqttQueueMessage.isRetained());
-        assertEquals(topic, mqttQueueMessage.getTopic());
+        StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
+        String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+        assertEquals(Arrays.toString(payload), Arrays.toString(lastPublishedMessage.getPayload()));
+        assertEquals(qos, lastPublishedMessage.getQos());
+        assertEquals(retain, lastPublishedMessage.isRetained());
+        assertEquals(topic, lastPublishedTopic);
     }
 
     private MqttTestClient mqttTestClient;
@@ -50,8 +50,8 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
         }
 
         @Override
-        public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
-            mqttTestClient =  new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher);
+        protected MqttClient createMqttClient() throws MqttException {
+            mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
             return mqttTestClient;
         }
     }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
index b135dcfe44..91997061bc 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
@@ -17,24 +17,9 @@
 
 package org.apache.nifi.processors.mqtt.common;
 
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
-import org.eclipse.paho.client.mqttv3.IMqttToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
-import org.eclipse.paho.client.mqttv3.MqttSecurityException;
-import org.eclipse.paho.client.mqttv3.MqttTopic;
-
 import java.util.concurrent.atomic.AtomicBoolean;
 
-public class MqttTestClient implements IMqttClient {
-
-    public String serverURI;
-    public String clientId;
+public class MqttTestClient implements MqttClient {
 
     public AtomicBoolean connected = new AtomicBoolean(false);
 
@@ -42,184 +27,54 @@ public class MqttTestClient implements IMqttClient {
     public ConnectType type;
     public enum ConnectType {Publisher, Subscriber}
 
-    public MQTTQueueMessage publishedMessage;
+    private StandardMqttMessage lastPublishedMessage;
+    private String lastPublishedTopic;
 
     public String subscribedTopic;
     public int subscribedQos;
 
 
-    public MqttTestClient(String serverURI, String clientId, ConnectType type) throws MqttException {
-        this.serverURI = serverURI;
-        this.clientId = clientId;
+    public MqttTestClient(ConnectType type) {
         this.type = type;
     }
 
     @Override
-    public void connect() throws MqttSecurityException, MqttException {
-        connected.set(true);
+    public boolean isConnected() {
+        return connected.get();
     }
 
     @Override
-    public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {
+    public void connect() {
         connected.set(true);
     }
 
     @Override
-    public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException {
-        return null;
-    }
-
-    @Override
-    public void disconnect() throws MqttException {
-        connected.set(false);
-    }
-
-    @Override
-    public void disconnect(long quiesceTimeout) throws MqttException {
-        connected.set(false);
-    }
-
-    @Override
-    public void disconnectForcibly() throws MqttException {
-        connected.set(false);
-    }
-
-    @Override
-    public void disconnectForcibly(long disconnectTimeout) throws MqttException {
+    public void disconnect() {
         connected.set(false);
     }
 
     @Override
-    public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException {
-        connected.set(false);
-    }
-
-    @Override
-    public void subscribe(String topicFilter) throws MqttException, MqttSecurityException {
-        subscribedTopic = topicFilter;
-        subscribedQos = -1;
-    }
-
-    @Override
-    public void subscribe(String[] topicFilters) throws MqttException {
-        throw new UnsupportedOperationException("Multiple topic filters is not supported");
-    }
-
-    @Override
-    public void subscribe(String topicFilter, int qos) throws MqttException {
-        subscribedTopic = topicFilter;
-        subscribedQos = qos;
-    }
-
-    @Override
-    public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
-        throw new UnsupportedOperationException("Multiple topic filters is not supported");
-    }
-
-    @Override
-    public void subscribe(String s, IMqttMessageListener iMqttMessageListener) throws MqttException, MqttSecurityException {
-
-    }
-
-    @Override
-    public void subscribe(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
-
-    }
-
-    @Override
-    public void subscribe(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException {
-
-    }
-
-    @Override
-    public void subscribe(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
-
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String s) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String s, IMqttMessageListener iMqttMessageListener) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String s, int i) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String[] strings) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String[] strings, int[] ints) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public IMqttToken subscribeWithResponse(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
-        return null;
-    }
-
-    @Override
-    public void unsubscribe(String topicFilter) throws MqttException {
-        subscribedTopic = "";
-        subscribedQos = -2;
-    }
+    public void close() {
 
-    @Override
-    public void unsubscribe(String[] topicFilters) throws MqttException {
-        throw new UnsupportedOperationException("Multiple topic filters is not supported");
     }
 
     @Override
-    public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException {
-        MqttMessage message = new MqttMessage(payload);
-        message.setQos(qos);
-        message.setRetained(retained);
+    public void publish(String topic, StandardMqttMessage message) {
         switch (type) {
             case Publisher:
-                publishedMessage = new MQTTQueueMessage(topic, message);
+                lastPublishedMessage = message;
+                lastPublishedTopic = topic;
                 break;
             case Subscriber:
-                try {
-                    mqttCallback.messageArrived(topic, message);
-                } catch (Exception e) {
-                    throw new MqttException(e);
-                }
+                mqttCallback.messageArrived(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic));
                 break;
         }
     }
 
     @Override
-    public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException {
-        switch (type) {
-            case Publisher:
-                publishedMessage = new MQTTQueueMessage(topic, message);
-                break;
-            case Subscriber:
-                try {
-                    mqttCallback.messageArrived(topic, message);
-                } catch (Exception e) {
-                    throw new MqttException(e);
-                }
-                break;
-        }
+    public void subscribe(String topicFilter, int qos) {
+        subscribedTopic = topicFilter;
+        subscribedQos = qos;
     }
 
     @Override
@@ -227,48 +82,11 @@ public class MqttTestClient implements IMqttClient {
         this.mqttCallback = callback;
     }
 
-    @Override
-    public MqttTopic getTopic(String topic) {
-        return null;
-    }
-
-    @Override
-    public boolean isConnected() {
-        return connected.get();
-    }
-
-    @Override
-    public String getClientId() {
-        return clientId;
-    }
-
-    @Override
-    public String getServerURI() {
-        return serverURI;
-    }
-
-    @Override
-    public IMqttDeliveryToken[] getPendingDeliveryTokens() {
-        return new IMqttDeliveryToken[0];
+    public StandardMqttMessage getLastPublishedMessage() {
+        return lastPublishedMessage;
     }
 
-    @Override
-    public void setManualAcks(boolean b) {
-
-    }
-
-    @Override
-    public void reconnect() throws MqttException {
-
-    }
-
-    @Override
-    public void messageArrivedComplete(int i, int i1) throws MqttException {
-
-    }
-
-    @Override
-    public void close() throws MqttException {
-
+    public String getLastPublishedTopic() {
+        return lastPublishedTopic;
     }
 }
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
index 0eec42ce71..7cbaa11c8d 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -28,8 +28,6 @@ import org.apache.nifi.schema.access.SchemaAccessUtils;
 import org.apache.nifi.util.MockFlowFile;
 import org.apache.nifi.util.TestRunner;
 import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
 import org.junit.jupiter.api.Test;
 
 import java.lang.reflect.Field;
@@ -66,7 +64,7 @@ public abstract class TestConsumeMqttCommon {
     private static final int LEAST_ONE = 1;
     private static final int EXACTLY_ONCE = 2;
 
-    public abstract void internalPublish(MqttMessage message, String topicName);
+    public abstract void internalPublish(StandardMqttMessage message, String topicName);
 
     @Test
     public void testClientIDConfiguration() {
@@ -295,10 +293,8 @@ public abstract class TestConsumeMqttCommon {
 
         testRunner.assertValid();
 
-        MqttMessage innerMessage = new MqttMessage();
-        innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
-        innerMessage.setQos(2);
-        MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);
+        final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array();
+        ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, "testTopic");
 
         ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
         consumeMQTT.onScheduled(testRunner.getProcessContext());
@@ -313,7 +309,7 @@ public abstract class TestConsumeMqttCommon {
         Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
         f.setAccessible(true);
         @SuppressWarnings("unchecked")
-        LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
+        LinkedBlockingQueue<ReceivedMqttMessage> queue = (LinkedBlockingQueue<ReceivedMqttMessage>) f.get(consumeMQTT);
         queue.add(testMessage);
 
         consumeMQTT.onUnscheduled(testRunner.getProcessContext());
@@ -551,7 +547,7 @@ public abstract class TestConsumeMqttCommon {
     private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
         Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
         f.setAccessible(true);
-        IMqttClient mqttClient = (IMqttClient) f.get(processor);
+        MqttClient mqttClient = (MqttClient) f.get(processor);
         return mqttClient.isConnected();
     }
 
@@ -563,10 +559,10 @@ public abstract class TestConsumeMqttCommon {
     }
 
     @SuppressWarnings("unchecked")
-    public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
+    public static BlockingQueue<ReceivedMqttMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
         Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
         mqttQueueField.setAccessible(true);
-        return (BlockingQueue<MQTTQueueMessage>) mqttQueueField.get(consumeMQTT);
+        return (BlockingQueue<ReceivedMqttMessage>) mqttQueueField.get(consumeMQTT);
     }
 
     public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
@@ -585,11 +581,7 @@ public abstract class TestConsumeMqttCommon {
     }
 
     private void publishMessage(final String payload, final int qos) {
-        final MqttMessage message = new MqttMessage();
-        message.setPayload(payload.getBytes(StandardCharsets.UTF_8));
-        message.setQos(qos);
-        message.setRetained(false);
-
+        final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
         internalPublish(message, "testTopic");
     }
 }