You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by tu...@apache.org on 2022/08/30 08:32:52 UTC
[nifi] branch main updated: NIFI-10251 Add v5 protocol support for existing MQTT processors
This is an automated email from the ASF dual-hosted git repository.
turcsanyi pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 4d4a5ca4be NIFI-10251 Add v5 protocol support for existing MQTT processors
4d4a5ca4be is described below
commit 4d4a5ca4be2f98e82ccf0b96290b27e0b06f277c
Author: Nandor Soma Abonyi <ab...@gmail.com>
AuthorDate: Thu Jul 7 17:54:11 2022 +0200
NIFI-10251 Add v5 protocol support for existing MQTT processors
This closes #6225.
Signed-off-by: Peter Turcsanyi <tu...@apache.org>
---
nifi-assembly/NOTICE | 299 ++++++++++++++++++++
.../src/main/resources/META-INF/NOTICE | 307 +++++++++++++++++++++
.../nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml | 29 +-
.../apache/nifi/processors/mqtt/ConsumeMQTT.java | 176 ++++++------
.../apache/nifi/processors/mqtt/PublishMQTT.java | 58 ++--
.../mqtt/adapters/HiveMqV5ClientAdapter.java | 200 ++++++++++++++
.../mqtt/adapters/PahoMqttClientAdapter.java | 191 +++++++++++++
.../mqtt/common/AbstractMQTTProcessor.java | 273 +++++++++---------
.../{MQTTQueueMessage.java => MqttCallback.java} | 42 +--
.../nifi/processors/mqtt/common/MqttClient.java | 69 +++++
.../processors/mqtt/common/MqttClientFactory.java | 37 +++
.../mqtt/common/MqttClientProperties.java | 164 +++++++++++
.../nifi/processors/mqtt/common/MqttConstants.java | 19 +-
.../{MQTTQueueMessage.java => MqttException.java} | 39 +--
...TTQueueMessage.java => MqttProtocolScheme.java} | 43 +--
.../nifi/processors/mqtt/common/MqttVersion.java | 51 ++++
...TQueueMessage.java => ReceivedMqttMessage.java} | 33 +--
...TQueueMessage.java => StandardMqttMessage.java} | 33 +--
.../nifi/processors/mqtt/TestConsumeMQTT.java | 24 +-
.../nifi/processors/mqtt/TestPublishMQTT.java | 22 +-
.../processors/mqtt/common/MqttTestClient.java | 222 ++-------------
.../mqtt/common/TestConsumeMqttCommon.java | 24 +-
22 files changed, 1654 insertions(+), 701 deletions(-)
diff --git a/nifi-assembly/NOTICE b/nifi-assembly/NOTICE
index 242b7c4305..b62e10a256 100644
--- a/nifi-assembly/NOTICE
+++ b/nifi-assembly/NOTICE
@@ -1565,6 +1565,275 @@ The following binary components are provided under the Apache Software License v
* http://tomcat.apache.org/native-doc/
* https://svn.apache.org/repos/asf/tomcat/native/
+ (ASLv2) The Netty Project (4.1.77.Final)
+ The following NOTICE information applies:
+ netty/netty
+ Copyright 2014 The Netty Project
+ -------------------------------------------------------------------------------
+ The Netty Project
+ =================
+
+ Please visit the Netty web site for more information:
+
+ * https://netty.io/
+
+ Copyright 2014 The Netty Project
+
+ The Netty Project licenses this file to you under the Apache License,
+ version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at:
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and limitations
+ under the License.
+
+ Also, please refer to each LICENSE.<component>.txt file, which is located in
+ the 'license' directory of the distribution file, for the license terms of the
+ components that this product depends on.
+
+ -------------------------------------------------------------------------------
+ This product contains the extensions to Java Collections Framework which has
+ been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+ * LICENSE:
+ * license/LICENSE.jsr166y.txt (Public Domain)
+ * HOMEPAGE:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+ * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+ This product contains a modified version of Robert Harder's Public Domain
+ Base64 Encoder and Decoder, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.base64.txt (Public Domain)
+ * HOMEPAGE:
+ * http://iharder.sourceforge.net/current/java/base64/
+
+ This product contains a modified portion of 'Webbit', an event based
+ WebSocket and HTTP server, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.webbit.txt (BSD License)
+ * HOMEPAGE:
+ * https://github.com/joewalnes/webbit
+
+ This product contains a modified portion of 'SLF4J', a simple logging
+ facade for Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.slf4j.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.slf4j.org/
+
+ This product contains a modified portion of 'Apache Harmony', an open source
+ Java SE, which can be obtained at:
+
+ * NOTICE:
+ * license/NOTICE.harmony.txt
+ * LICENSE:
+ * license/LICENSE.harmony.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://archive.apache.org/dist/harmony/
+
+ This product contains a modified portion of 'jbzip2', a Java bzip2 compression
+ and decompression library written by Matthew J. Francis. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jbzip2.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jbzip2/
+
+ This product contains a modified portion of 'libdivsufsort', a C API library to construct
+ the suffix array and the Burrows-Wheeler transformed string for any input string of
+ a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.libdivsufsort.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/y-256/libdivsufsort
+
+ This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
+ which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jctools.txt (ASL2 License)
+ * HOMEPAGE:
+ * https://github.com/JCTools/JCTools
+
+ This product optionally depends on 'JZlib', a re-implementation of zlib in
+ pure Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jzlib.txt (BSD style License)
+ * HOMEPAGE:
+ * http://www.jcraft.com/jzlib/
+
+ This product optionally depends on 'Compress-LZF', a Java library for encoding and
+ decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/ning/compress
+
+ This product optionally depends on 'lz4', a LZ4 Java compression
+ and decompression library written by Adrien Grand. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lz4.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jpountz/lz4-java
+
+ This product optionally depends on 'lzma-java', a LZMA Java compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lzma-java.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jponge/lzma-java
+
+ This product optionally depends on 'zstd-jni', a zstd-jni Java compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/luben/zstd-jni
+
+ This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
+ and decompression library written by William Kinney. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jfastlz.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jfastlz/
+
+ This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
+ interchange format, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.protobuf.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/protobuf
+
+ This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
+ a temporary self-signed X.509 certificate when the JVM does not provide the
+ equivalent functionality. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.bouncycastle.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.bouncycastle.org/
+
+ This product optionally depends on 'Snappy', a compression library produced
+ by Google Inc, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.snappy.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/snappy
+
+ This product optionally depends on 'JBoss Marshalling', an alternative Java
+ serialization API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jboss-remoting/jboss-marshalling
+
+ This product optionally depends on 'Caliper', Google's micro-
+ benchmarking framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.caliper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/google/caliper
+
+ This product optionally depends on 'Apache Commons Logging', a logging
+ framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-logging.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/logging/
+
+ This product optionally depends on 'Apache Log4J', a logging framework, which
+ can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.log4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://logging.apache.org/log4j/
+
+ This product optionally depends on 'Aalto XML', an ultra-high performance
+ non-blocking XML processor, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://wiki.fasterxml.com/AaltoHome
+
+ This product contains a modified version of 'HPACK', a Java implementation of
+ the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.hpack.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/twitter/hpack
+
+ This product contains a modified version of 'HPACK', a Java implementation of
+ the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.hyper-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/python-hyper/hpack/
+
+ This product contains a modified version of 'HPACK', a Java implementation of
+ the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.nghttp2-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/nghttp2/nghttp2/
+
+ This product contains a modified portion of 'Apache Commons Lang', a Java library
+ provides utilities for the java.lang API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-lang.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/proper/commons-lang/
+
+
+ This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
+
+ * LICENSE:
+ * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/takari/maven-wrapper
+
+ This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS.
+ This private header is also used by Apple's open source
+ mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+ * LICENSE:
+ * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+ * HOMEPAGE:
+ * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+ This product optionally depends on 'Brotli4j', Brotli compression and
+ decompression for Java., which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.brotli4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/hyperxpro/Brotli4j
+
(ASLv2) Error Prone
The following NOTICE information applies:
Copyright 2017 Google Inc.
@@ -1958,6 +2227,36 @@ The following binary components are provided under the Apache Software License v
Copyright (c) 2014-2019 Appsicle
Copyright (c) 2019-2020 QuestDB
+ (ASLv2) HiveMQ MQTT Client
+ The following NOTICE information applies:
+ HiveMQ MQTT Client
+ Copyright 2018-2022 HiveMQ and the HiveMQ Community
+
+ (ASLv2) ReactiveX/RxJava
+ The following NOTICE information applies:
+ ReactiveX/RxJava
+ Copyright 2018-present RxJava Contributors
+
+ (ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core)
+ The following NOTICE information applies:
+ JCTools/JCTools
+ Copyright
+
+ (ASLv2) JetBrains/java-annotations
+ The following NOTICE information applies:
+ JetBrains/java-annotations
+ Copyright 2000-2016 JetBrains s.r.o.
+
+ (ASLv2) google/dagger
+ The following NOTICE information applies:
+ google/dagger
+ Copyright 2012 The Dagger Authors
+
+ (ASLv2) atinject (javax.inject:javax.inject)
+ The following NOTICE information applies:
+ atinject
+ Copyright
+
************************
Common Development and Distribution License 1.1
************************
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE
index f95db89b8c..306617c48e 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-nar/src/main/resources/META-INF/NOTICE
@@ -41,6 +41,305 @@ The following binary components are provided under the Apache Software License v
in some artifacts (usually source distributions); but is always available
from the source code management (SCM) system project uses.
+ (ASLv2) HiveMQ MQTT Client
+ The following NOTICE information applies:
+ HiveMQ MQTT Client
+ Copyright 2018-2022 HiveMQ and the HiveMQ Community
+
+ (ASLv2) ReactiveX/RxJava
+ The following NOTICE information applies:
+ ReactiveX/RxJava
+ Copyright 2018-present RxJava Contributors
+
+ (ASLv2) Java Concurrency Tools Core Library (org.jctools:jctools-core)
+ The following NOTICE information applies:
+ JCTools/JCTools
+ Copyright
+
+ (ASLv2) JetBrains/java-annotations
+ The following NOTICE information applies:
+ JetBrains/java-annotations
+ Copyright 2000-2016 JetBrains s.r.o.
+
+ (ASLv2) google/dagger
+ The following NOTICE information applies:
+ google/dagger
+ Copyright 2012 The Dagger Authors
+
+ (ASLv2) atinject (javax.inject:javax.inject)
+ The following NOTICE information applies:
+ atinject
+ Copyright
+
+ (ASLv2) The Netty Project
+ The following NOTICE information applies:
+ netty/netty
+ Copyright 2014 The Netty Project
+ -------------------------------------------------------------------------------
+ The Netty Project
+ =================
+
+ Please visit the Netty web site for more information:
+
+ * https://netty.io/
+
+ Copyright 2014 The Netty Project
+
+ The Netty Project licenses this file to you under the Apache License,
+ version 2.0 (the "License"); you may not use this file except in compliance
+ with the License. You may obtain a copy of the License at:
+
+ https://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS, WITHOUT
+ WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. See the
+ License for the specific language governing permissions and limitations
+ under the License.
+
+ Also, please refer to each LICENSE.<component>.txt file, which is located in
+ the 'license' directory of the distribution file, for the license terms of the
+ components that this product depends on.
+
+ -------------------------------------------------------------------------------
+ This product contains the extensions to Java Collections Framework which has
+ been derived from the works by JSR-166 EG, Doug Lea, and Jason T. Greene:
+
+ * LICENSE:
+ * license/LICENSE.jsr166y.txt (Public Domain)
+ * HOMEPAGE:
+ * http://gee.cs.oswego.edu/cgi-bin/viewcvs.cgi/jsr166/
+ * http://viewvc.jboss.org/cgi-bin/viewvc.cgi/jbosscache/experimental/jsr166/
+
+ This product contains a modified version of Robert Harder's Public Domain
+ Base64 Encoder and Decoder, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.base64.txt (Public Domain)
+ * HOMEPAGE:
+ * http://iharder.sourceforge.net/current/java/base64/
+
+ This product contains a modified portion of 'Webbit', an event based
+ WebSocket and HTTP server, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.webbit.txt (BSD License)
+ * HOMEPAGE:
+ * https://github.com/joewalnes/webbit
+
+ This product contains a modified portion of 'SLF4J', a simple logging
+ facade for Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.slf4j.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.slf4j.org/
+
+ This product contains a modified portion of 'Apache Harmony', an open source
+ Java SE, which can be obtained at:
+
+ * NOTICE:
+ * license/NOTICE.harmony.txt
+ * LICENSE:
+ * license/LICENSE.harmony.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://archive.apache.org/dist/harmony/
+
+ This product contains a modified portion of 'jbzip2', a Java bzip2 compression
+ and decompression library written by Matthew J. Francis. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jbzip2.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jbzip2/
+
+ This product contains a modified portion of 'libdivsufsort', a C API library to construct
+ the suffix array and the Burrows-Wheeler transformed string for any input string of
+ a constant-size alphabet written by Yuta Mori. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.libdivsufsort.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/y-256/libdivsufsort
+
+ This product contains a modified portion of Nitsan Wakart's 'JCTools', Java Concurrency Tools for the JVM,
+ which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jctools.txt (ASL2 License)
+ * HOMEPAGE:
+ * https://github.com/JCTools/JCTools
+
+ This product optionally depends on 'JZlib', a re-implementation of zlib in
+ pure Java, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jzlib.txt (BSD style License)
+ * HOMEPAGE:
+ * http://www.jcraft.com/jzlib/
+
+ This product optionally depends on 'Compress-LZF', a Java library for encoding and
+ decoding data in LZF format, written by Tatu Saloranta. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.compress-lzf.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/ning/compress
+
+ This product optionally depends on 'lz4', a LZ4 Java compression
+ and decompression library written by Adrien Grand. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lz4.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jpountz/lz4-java
+
+ This product optionally depends on 'lzma-java', a LZMA Java compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.lzma-java.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jponge/lzma-java
+
+ This product optionally depends on 'zstd-jni', a zstd-jni Java compression
+ and decompression library, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.zstd-jni.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/luben/zstd-jni
+
+ This product contains a modified portion of 'jfastlz', a Java port of FastLZ compression
+ and decompression library written by William Kinney. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jfastlz.txt (MIT License)
+ * HOMEPAGE:
+ * https://code.google.com/p/jfastlz/
+
+ This product contains a modified portion of and optionally depends on 'Protocol Buffers', Google's data
+ interchange format, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.protobuf.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/protobuf
+
+ This product optionally depends on 'Bouncy Castle Crypto APIs' to generate
+ a temporary self-signed X.509 certificate when the JVM does not provide the
+ equivalent functionality. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.bouncycastle.txt (MIT License)
+ * HOMEPAGE:
+ * https://www.bouncycastle.org/
+
+ This product optionally depends on 'Snappy', a compression library produced
+ by Google Inc, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.snappy.txt (New BSD License)
+ * HOMEPAGE:
+ * https://github.com/google/snappy
+
+ This product optionally depends on 'JBoss Marshalling', an alternative Java
+ serialization API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.jboss-marshalling.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/jboss-remoting/jboss-marshalling
+
+ This product optionally depends on 'Caliper', Google's micro-
+ benchmarking framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.caliper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/google/caliper
+
+ This product optionally depends on 'Apache Commons Logging', a logging
+ framework, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-logging.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/logging/
+
+ This product optionally depends on 'Apache Log4J', a logging framework, which
+ can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.log4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://logging.apache.org/log4j/
+
+ This product optionally depends on 'Aalto XML', an ultra-high performance
+ non-blocking XML processor, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.aalto-xml.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://wiki.fasterxml.com/AaltoHome
+
+ This product contains a modified version of 'HPACK', a Java implementation of
+ the HTTP/2 HPACK algorithm written by Twitter. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.hpack.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/twitter/hpack
+
+ This product contains a modified version of 'HPACK', a Java implementation of
+ the HTTP/2 HPACK algorithm written by Cory Benfield. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.hyper-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/python-hyper/hpack/
+
+ This product contains a modified version of 'HPACK', a Java implementation of
+ the HTTP/2 HPACK algorithm written by Tatsuhiro Tsujikawa. It can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.nghttp2-hpack.txt (MIT License)
+ * HOMEPAGE:
+ * https://github.com/nghttp2/nghttp2/
+
+ This product contains a modified portion of 'Apache Commons Lang', a Java library
+ provides utilities for the java.lang API, which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.commons-lang.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://commons.apache.org/proper/commons-lang/
+
+
+ This product contains the Maven wrapper scripts from 'Maven Wrapper', that provides an easy way to ensure a user has everything necessary to run the Maven build.
+
+ * LICENSE:
+ * license/LICENSE.mvn-wrapper.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/takari/maven-wrapper
+
+ This product contains the dnsinfo.h header file, that provides a way to retrieve the system DNS configuration on MacOS.
+ This private header is also used by Apple's open source
+ mDNSResponder (https://opensource.apple.com/tarballs/mDNSResponder/).
+
+ * LICENSE:
+ * license/LICENSE.dnsinfo.txt (Apple Public Source License 2.0)
+ * HOMEPAGE:
+ * https://www.opensource.apple.com/source/configd/configd-453.19/dnsinfo/dnsinfo.h
+
+ This product optionally depends on 'Brotli4j', Brotli compression and
+ decompression for Java., which can be obtained at:
+
+ * LICENSE:
+ * license/LICENSE.brotli4j.txt (Apache License 2.0)
+ * HOMEPAGE:
+ * https://github.com/hyperxpro/Brotli4j
+
************************
Eclipse Public License 1.0
************************
@@ -48,3 +347,11 @@ Eclipse Public License 1.0
The following binary components are provided under the Eclipse Public License 1.0. See project link for details.
(EPL 1.0) Eclipse Paho MQTT Client (org.eclipse.paho:org.eclipse.paho.client.mqttv3:1.2.0 - https://github.com/eclipse/paho.mqtt.java)
+
+*****************
+Public Domain
+*****************
+
+The following binary components are provided under the Creative Commons Zero license version 1.0. See project link for details.
+
+ (CC0v1.0) Reactive Streams (org.reactivestreams:reactive-streams:jar:1.0.3 - http://www.reactive-streams.org/)
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
index 2ed90145e7..3e9bed91ef 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/pom.xml
@@ -48,6 +48,11 @@
<groupId>org.apache.nifi</groupId>
<artifactId>nifi-record</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-security-utils</artifactId>
+ <version>1.18.0-SNAPSHOT</version>
+ </dependency>
<!-- External dependencies -->
<dependency>
@@ -55,6 +60,11 @@
<artifactId>org.eclipse.paho.client.mqttv3</artifactId>
<version>1.2.2</version>
</dependency>
+ <dependency>
+ <groupId>com.hivemq</groupId>
+ <artifactId>hivemq-mqtt-client</artifactId>
+ <version>1.3.0</version>
+ </dependency>
<dependency>
<groupId>org.apache.commons</groupId>
<artifactId>commons-lang3</artifactId>
@@ -84,23 +94,4 @@
<scope>test</scope>
</dependency>
</dependencies>
-
-
- <build>
- <plugins>
- <plugin>
- <groupId>org.apache.maven.plugins</groupId>
- <artifactId>maven-surefire-plugin</artifactId>
- <configuration>
- <excludes>
- <exclude>**/integration/TestConsumeMQTT.java</exclude>
- <exclude>**/integration/TestConsumeMqttSSL.java</exclude>
- <exclude>**/integration/TestPublishAndSubscribeMqttIntegration.java</exclude>
- <exclude>**/integration/TestPublishMQTT.java</exclude>
- <exclude>**/integration/TestPublishMqttSSL.java</exclude>
- </excludes>
- </configuration>
- </plugin>
- </plugins>
- </build>
</project>
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
index 7c5a7ff49c..46d1452cea 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/ConsumeMQTT.java
@@ -42,10 +42,11 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.OutputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
-import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
import org.apache.nifi.serialization.MalformedRecordException;
import org.apache.nifi.serialization.RecordReader;
import org.apache.nifi.serialization.RecordReaderFactory;
@@ -58,10 +59,6 @@ import org.apache.nifi.serialization.record.Record;
import org.apache.nifi.serialization.record.RecordField;
import org.apache.nifi.serialization.record.RecordFieldType;
import org.apache.nifi.serialization.record.RecordSchema;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
import java.io.ByteArrayInputStream;
import java.io.IOException;
@@ -209,7 +206,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
private volatile String topicFilter;
private final AtomicBoolean scheduled = new AtomicBoolean(false);
- private volatile LinkedBlockingQueue<MQTTQueueMessage> mqttQueue;
+ private volatile LinkedBlockingQueue<ReceivedMqttMessage> mqttQueue;
public static final Relationship REL_MESSAGE = new Relationship.Builder()
.name("Message")
@@ -226,7 +223,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
private static final List<PropertyDescriptor> descriptors;
private static final Set<Relationship> relationships;
- static{
+ static {
final List<PropertyDescriptor> innerDescriptorsList = getAbstractPropertyDescriptors();
innerDescriptorsList.add(PROP_GROUPID);
innerDescriptorsList.add(PROP_TOPIC_FILTER);
@@ -238,7 +235,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
innerDescriptorsList.add(MESSAGE_DEMARCATOR);
descriptors = Collections.unmodifiableList(innerDescriptorsList);
- final Set<Relationship> innerRelationshipsSet = new HashSet<Relationship>();
+ final Set<Relationship> innerRelationshipsSet = new HashSet<>();
innerRelationshipsSet.add(REL_MESSAGE);
innerRelationshipsSet.add(REL_PARSE_FAILURE);
relationships = Collections.unmodifiableSet(innerRelationshipsSet);
@@ -249,15 +246,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
// resize the receive buffer, but preserve data
if (descriptor == PROP_MAX_QUEUE_SIZE) {
// it's a mandatory integer, never null
- int newSize = Integer.valueOf(newValue);
+ int newSize = Integer.parseInt(newValue);
if (mqttQueue != null) {
int msgPending = mqttQueue.size();
if (msgPending > newSize) {
- logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.",
- new Object[]{newSize, msgPending});
+ logger.warn("New receive buffer size ({}) is smaller than the number of messages pending ({}), ignoring resize request. Processor will be invalid.", newSize, msgPending);
return;
}
- LinkedBlockingQueue<MQTTQueueMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
+ LinkedBlockingQueue<ReceivedMqttMessage> newBuffer = new LinkedBlockingQueue<>(newSize);
mqttQueue.drainTo(newBuffer);
mqttQueue = newBuffer;
}
@@ -297,15 +293,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
final boolean readerIsSet = context.getProperty(RECORD_READER).isSet();
final boolean writerIsSet = context.getProperty(RECORD_WRITER).isSet();
- if((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
+ if ((readerIsSet && !writerIsSet) || (!readerIsSet && writerIsSet)) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
- .explanation("Both Record Reader and Writer must be set when used").build());
+ .explanation("both Record Reader and Writer must be set when used.").build());
}
final boolean demarcatorIsSet = context.getProperty(MESSAGE_DEMARCATOR).isSet();
- if(readerIsSet && demarcatorIsSet) {
+ if (readerIsSet && demarcatorIsSet) {
results.add(new ValidationResult.Builder().subject("Reader and Writer").valid(false)
- .explanation("You cannot use both a demarcator and a Reader/Writer").build());
+ .explanation("Message Demarcator and Record Reader/Writer cannot be used at the same time.").build());
}
return results;
@@ -346,17 +342,17 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
public void onUnscheduled(final ProcessContext context) {
scheduled.set(false);
synchronized (this) {
- super.onStopped();
+ stopClient();
}
}
@OnStopped
- public void onStopped(final ProcessContext context) throws IOException {
- if(mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
+ public void onStopped(final ProcessContext context) {
+ if (mqttQueue != null && !mqttQueue.isEmpty() && processSessionFactory != null) {
logger.info("Finishing processing leftover messages");
ProcessSession session = processSessionFactory.createSession();
- if(context.getProperty(RECORD_READER).isSet()) {
+ if (context.getProperty(RECORD_READER).isSet()) {
transferQueueRecord(context, session);
} else if (context.getProperty(MESSAGE_DEMARCATOR).isSet()) {
transferQueueDemarcator(context, session);
@@ -364,7 +360,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
transferQueue(session);
}
} else {
- if (mqttQueue!= null && !mqttQueue.isEmpty()){
+ if (mqttQueue != null && !mqttQueue.isEmpty()) {
throw new ProcessException("Stopping the processor but there is no ProcessSessionFactory stored and there are messages in the MQTT internal queue. Removing the processor now will " +
"clear the queue but will result in DATA LOSS. This is normally due to starting the processor, receiving messages and stopping before the onTrigger happens. The messages " +
"in the MQTT internal queue cannot finish processing until until the processor is triggered to run.");
@@ -375,7 +371,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
final boolean isScheduled = scheduled.get();
- if (!isConnected() && isScheduled){
+ if (!isConnected() && isScheduled) {
synchronized (this) {
if (!isConnected()) {
initializeClient(context);
@@ -402,42 +398,27 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
// non-null but not connected, so we need to handle each case and only create a new client when it is null
try {
if (mqttClient == null) {
- logger.debug("Creating client");
- mqttClient = createMqttClient(broker, clientID, persistence);
+ mqttClient = createMqttClient();
mqttClient.setCallback(this);
}
if (!mqttClient.isConnected()) {
- logger.debug("Connecting client");
- mqttClient.connect(connOpts);
+ mqttClient.connect();
mqttClient.subscribe(topicPrefix + topicFilter, qos);
}
- } catch (MqttException e) {
- logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+ } catch (Exception e) {
+ logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{clientProperties.getBroker()}, e);
+ mqttClient = null; // prevent stucked processor when subscribe fails
context.yield();
}
}
- private void transferQueue(ProcessSession session){
+ private void transferQueue(ProcessSession session) {
while (!mqttQueue.isEmpty()) {
- final MQTTQueueMessage mqttMessage = mqttQueue.peek();
- FlowFile messageFlowfile = session.create();
-
- Map<String, String> attrs = new HashMap<>();
- attrs.put(BROKER_ATTRIBUTE_KEY, broker);
- attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
- attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
- attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
- attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
-
- messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
-
- messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
- }
- });
+ final ReceivedMqttMessage mqttMessage = mqttQueue.peek();
+
+ final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage),
+ out -> out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload()));
session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
session.transfer(messageFlowfile, REL_MESSAGE);
@@ -446,17 +427,16 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
}
- private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session){
+ private void transferQueueDemarcator(final ProcessContext context, final ProcessSession session) {
final byte[] demarcator = context.getProperty(MESSAGE_DEMARCATOR).evaluateAttributeExpressions().getValue().getBytes(StandardCharsets.UTF_8);
FlowFile messageFlowfile = session.create();
- session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, broker);
-
+ session.putAttribute(messageFlowfile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
messageFlowfile = session.append(messageFlowfile, out -> {
int i = 0;
while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
- final MQTTQueueMessage mqttMessage = mqttQueue.poll();
+ final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
out.write(mqttMessage.getPayload() == null ? new byte[0] : mqttMessage.getPayload());
out.write(demarcator);
session.adjustCounter(COUNTER_RECORDS_RECEIVED, 1L, false);
@@ -469,41 +449,40 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
session.commitAsync();
}
- private void transferFailure(final ProcessSession session, final MQTTQueueMessage mqttMessage) {
+ private void transferFailure(final ProcessSession session, final ReceivedMqttMessage mqttMessage) {
+ final FlowFile messageFlowfile = session.write(createFlowFileAndPopulateAttributes(session, mqttMessage),
+ out -> out.write(mqttMessage.getPayload()));
+
+ session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
+ session.transfer(messageFlowfile, REL_PARSE_FAILURE);
+ session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+ }
+
+ private FlowFile createFlowFileAndPopulateAttributes(ProcessSession session, ReceivedMqttMessage mqttMessage) {
FlowFile messageFlowfile = session.create();
Map<String, String> attrs = new HashMap<>();
- attrs.put(BROKER_ATTRIBUTE_KEY, broker);
+ attrs.put(BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
attrs.put(TOPIC_ATTRIBUTE_KEY, mqttMessage.getTopic());
attrs.put(QOS_ATTRIBUTE_KEY, String.valueOf(mqttMessage.getQos()));
attrs.put(IS_DUPLICATE_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isDuplicate()));
attrs.put(IS_RETAINED_ATTRIBUTE_KEY, String.valueOf(mqttMessage.isRetained()));
messageFlowfile = session.putAllAttributes(messageFlowfile, attrs);
-
- messageFlowfile = session.write(messageFlowfile, new OutputStreamCallback() {
- @Override
- public void process(final OutputStream out) throws IOException {
- out.write(mqttMessage.getPayload());
- }
- });
-
- session.getProvenanceReporter().receive(messageFlowfile, getTransitUri(mqttMessage.getTopic()));
- session.transfer(messageFlowfile, REL_PARSE_FAILURE);
- session.adjustCounter(COUNTER_PARSE_FAILURES, 1, false);
+ return messageFlowfile;
}
- private void transferQueueRecord(final ProcessContext context, final ProcessSession session){
+ private void transferQueueRecord(final ProcessContext context, final ProcessSession session) {
final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);
FlowFile flowFile = session.create();
- session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, broker);
+ session.putAttribute(flowFile, BROKER_ATTRIBUTE_KEY, clientProperties.getBroker());
final Map<String, String> attributes = new HashMap<>();
final AtomicInteger recordCount = new AtomicInteger();
- final List<MQTTQueueMessage> doneList = new ArrayList<MQTTQueueMessage>();
+ final List<ReceivedMqttMessage> doneList = new ArrayList<>();
RecordSetWriter writer = null;
boolean isWriterInitialized = false;
@@ -511,8 +490,8 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
try {
while (!mqttQueue.isEmpty() && i < MAX_MESSAGES_PER_FLOW_FILE) {
- final MQTTQueueMessage mqttMessage = mqttQueue.poll();
- if(mqttMessage == null) {
+ final ReceivedMqttMessage mqttMessage = mqttQueue.poll();
+ if (mqttMessage == null) {
break;
}
@@ -533,16 +512,15 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
Record record;
while ((record = reader.nextRecord()) != null) {
- if(!isWriterInitialized) {
+ if (!isWriterInitialized) {
final RecordSchema recordSchema = record.getSchema();
final OutputStream rawOut = session.write(flowFile);
RecordSchema writeSchema;
try {
writeSchema = writerFactory.getSchema(flowFile.getAttributes(), recordSchema);
- if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
- final List<RecordField> fields = new ArrayList<>();
- fields.addAll(writeSchema.getFields());
+ if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+ final List<RecordField> fields = new ArrayList<>(writeSchema.getFields());
fields.add(new RecordField(TOPIC_FIELD_KEY, RecordFieldType.STRING.getDataType()));
fields.add(new RecordField(QOS_FIELD_KEY, RecordFieldType.INT.getDataType()));
@@ -562,7 +540,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
try {
- if(context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
+ if (context.getProperty(ADD_ATTRIBUTES_AS_FIELDS).asBoolean()) {
record.setValue(TOPIC_FIELD_KEY, mqttMessage.getTopic());
record.setValue(QOS_FIELD_KEY, mqttMessage.getQos());
record.setValue(IS_RETAINED_FIELD_KEY, mqttMessage.isRetained());
@@ -583,16 +561,14 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
} catch (final IOException | MalformedRecordException | SchemaValidationException e) {
logger.error("Failed to write message, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage);
- continue;
}
} catch (Exception e) {
logger.error("Failed to write message, sending to the parse failure relationship", e);
transferFailure(session, mqttMessage);
- continue;
}
}
- if(writer != null) {
+ if (writer != null) {
final WriteResult writeResult = writer.finishRecordSet();
attributes.put(RECORD_COUNT_KEY, String.valueOf(writeResult.getRecordCount()));
attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
@@ -605,26 +581,26 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
// we try to add the messages back into the internal queue
int numberOfMessages = 0;
- for(MQTTQueueMessage done : doneList) {
+ for (ReceivedMqttMessage done : doneList) {
try {
mqttQueue.offer(done, 1, TimeUnit.SECONDS);
} catch (InterruptedException ex) {
numberOfMessages++;
- if(getLogger().isDebugEnabled()) {
+ if (getLogger().isDebugEnabled()) {
logger.debug("Could not add message back into the internal queue, this could lead to data loss", ex);
}
}
}
- if(numberOfMessages > 0) {
- logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", new Object[] {numberOfMessages});
+ if (numberOfMessages > 0) {
+ logger.error("Could not add {} message(s) back into the internal queue, this could mean data loss", numberOfMessages);
}
- throw new ProcessException("Could not process data received from the MQTT broker(s): " + broker, e);
+ throw new ProcessException("Could not process data received from the MQTT broker(s): " + clientProperties.getBroker(), e);
} finally {
closeWriter(writer);
}
- if(recordCount.get() == 0) {
+ if (recordCount.get() == 0) {
session.remove(flowFile);
return;
}
@@ -635,7 +611,7 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
final int count = recordCount.get();
session.adjustCounter(COUNTER_RECORDS_PROCESSED, count, false);
- getLogger().info("Successfully processed {} records for {}", new Object[] {count, flowFile});
+ getLogger().info("Successfully processed {} records for {}", count, flowFile);
}
private void closeWriter(final RecordSetWriter writer) {
@@ -649,8 +625,9 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
private String getTransitUri(String... appends) {
- StringBuilder stringBuilder = new StringBuilder(brokerUri);
- for(String append : appends) {
+ String broker = clientProperties.getBrokerUri().toString();
+ StringBuilder stringBuilder = new StringBuilder(broker.endsWith("/") ? broker : broker + "/");
+ for (String append : appends) {
stringBuilder.append(append);
}
return stringBuilder.toString();
@@ -658,29 +635,34 @@ public class ConsumeMQTT extends AbstractMQTTProcessor implements MqttCallback {
@Override
public void connectionLost(Throwable cause) {
- logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
+ logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
}
@Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
+ public void messageArrived(ReceivedMqttMessage message) {
if (logger.isDebugEnabled()) {
byte[] payload = message.getPayload();
- String text = new String(payload, "UTF-8");
+ String text = new String(payload, StandardCharsets.UTF_8);
if (StringUtils.isAsciiPrintable(text)) {
- logger.debug("Message arrived from topic {}. Payload: {}", new Object[] {topic, text});
+ logger.debug("Message arrived from topic {}. Payload: {}", message.getTopic(), text);
} else {
- logger.debug("Message arrived from topic {}. Binary value of size {}", new Object[] {topic, payload.length});
+ logger.debug("Message arrived from topic {}. Binary value of size {}", message.getTopic(), payload.length);
}
}
- if(!mqttQueue.offer(new MQTTQueueMessage(topic, message), 1, TimeUnit.SECONDS)) {
- throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+ try {
+ if (!mqttQueue.offer(message, 1, TimeUnit.SECONDS)) {
+ throw new IllegalStateException("The subscriber queue is full, cannot receive another message until the processor is scheduled to run.");
+ }
+ } catch (InterruptedException e) {
+ throw new MqttException("Failed to process message arrived from topic " + message.getTopic());
}
}
@Override
- public void deliveryComplete(IMqttDeliveryToken token) {
- logger.warn("Received MQTT 'delivery complete' message to subscriber: " + token);
+ public void deliveryComplete(String token) {
+ // Unlikely situation. Api uses the same callback for publisher and consumer as well.
+ // That's why we have this log message here to indicate something really messy thing happened.
+ logger.error("Received MQTT 'delivery complete' message to subscriber. Token: [{}]", token);
}
-
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
index 0db7615d7e..2ea80b1ded 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/PublishMQTT.java
@@ -36,18 +36,15 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
-import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.processors.mqtt.common.AbstractMQTTProcessor;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.stream.io.StreamUtils;
import org.apache.nifi.util.StopWatch;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import java.io.IOException;
-import java.io.InputStream;
import java.util.Arrays;
import java.util.Collections;
import java.util.HashSet;
@@ -137,18 +134,18 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
@OnStopped
public void onStopped(final ProcessContext context) {
synchronized (this) {
- super.onStopped();
+ stopClient();
}
}
@Override
public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
- FlowFile flowfile = session.get();
+ final FlowFile flowfile = session.get();
if (flowfile == null) {
return;
}
- if (!isConnected()){
+ if (!isConnected()) {
synchronized (this) {
if (!isConnected()) {
initializeClient(context);
@@ -157,7 +154,7 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
}
// get the MQTT topic
- String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();
+ final String topic = context.getProperty(PROP_TOPIC).evaluateAttributeExpressions(flowfile).getValue();
if (topic == null || topic.isEmpty()) {
logger.warn("Evaluation of the topic property returned null or evaluated to be empty, routing to failure");
@@ -167,18 +164,11 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
// do the read
final byte[] messageContent = new byte[(int) flowfile.getSize()];
- session.read(flowfile, new InputStreamCallback() {
- @Override
- public void process(final InputStream in) throws IOException {
- StreamUtils.fillBuffer(in, messageContent, true);
- }
- });
+ session.read(flowfile, in -> StreamUtils.fillBuffer(in, messageContent, true));
int qos = context.getProperty(PROP_QOS).evaluateAttributeExpressions(flowfile).asInteger();
- final MqttMessage mqttMessage = new MqttMessage(messageContent);
- mqttMessage.setQos(qos);
- mqttMessage.setPayload(messageContent);
- mqttMessage.setRetained(context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean());
+ boolean retained = context.getProperty(PROP_RETAIN).evaluateAttributeExpressions(flowfile).asBoolean();
+ final StandardMqttMessage mqttMessage = new StandardMqttMessage(messageContent, qos, retained);
try {
final StopWatch stopWatch = new StopWatch(true);
@@ -188,9 +178,9 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
*/
mqttClient.publish(topic, mqttMessage);
- session.getProvenanceReporter().send(flowfile, broker, stopWatch.getElapsed(TimeUnit.MILLISECONDS));
+ session.getProvenanceReporter().send(flowfile, clientProperties.getBroker(), stopWatch.getElapsed(TimeUnit.MILLISECONDS));
session.transfer(flowfile, REL_SUCCESS);
- } catch(MqttException me) {
+ } catch (MqttException me) {
logger.error("Failed to publish message.", me);
session.transfer(flowfile, REL_FAILURE);
}
@@ -201,35 +191,35 @@ public class PublishMQTT extends AbstractMQTTProcessor implements MqttCallback {
// non-null but not connected, so we need to handle each case and only create a new client when it is null
try {
if (mqttClient == null) {
- logger.debug("Creating client");
- mqttClient = createMqttClient(broker, clientID, persistence);
+ mqttClient = createMqttClient();
mqttClient.setCallback(this);
}
if (!mqttClient.isConnected()) {
- logger.debug("Connecting client");
- mqttClient.connect(connOpts);
+ mqttClient.connect();
}
- } catch (MqttException e) {
- logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", new Object[]{broker}, e);
+ } catch (Exception e) {
+ logger.error("Connection to {} lost (or was never connected) and connection failed. Yielding processor", clientProperties.getBroker(), e);
context.yield();
}
}
@Override
public void connectionLost(Throwable cause) {
- logger.error("Connection to {} lost due to: {}", new Object[]{broker, cause.getMessage()}, cause);
+ logger.error("Connection to {} lost", clientProperties.getBroker(), cause);
}
@Override
- public void messageArrived(String topic, MqttMessage message) throws Exception {
- logger.error("Message arrived to a PublishMQTT processor { topic:'" + topic +"; payload:"+ Arrays.toString(message.getPayload())+"}");
+ public void messageArrived(ReceivedMqttMessage message) {
+ // Unlikely situation. Api uses the same callback for publisher and consumer as well.
+ // That's why we have this log message here to indicate something really messy thing happened.
+ logger.error("Message arrived to a PublishMQTT processor { topic:'" + message.getTopic() + "; payload:" + Arrays.toString(message.getPayload()) + "}");
}
@Override
- public void deliveryComplete(IMqttDeliveryToken token) {
+ public void deliveryComplete(String token) {
// Client.publish waits for message to be delivered so this token will always have a null message and is useless in this application.
- logger.trace("Received 'delivery complete' message from broker for:" + token.toString());
+ logger.trace("Received 'delivery complete' message from broker. Token: [{}]", token);
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
new file mode 100644
index 0000000000..7532411055
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java
@@ -0,0 +1,200 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import com.hivemq.client.mqtt.mqtt5.message.subscribe.suback.Mqtt5SubAck;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.security.util.KeyStoreUtils;
+import org.apache.nifi.security.util.TlsException;
+
+import java.nio.charset.StandardCharsets;
+import java.util.Objects;
+import java.util.concurrent.CompletableFuture;
+import java.util.concurrent.TimeUnit;
+
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.SSL;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WS;
+import static org.apache.nifi.processors.mqtt.common.MqttProtocolScheme.WSS;
+
+public class HiveMqV5ClientAdapter implements MqttClient {
+
+ private final Mqtt5BlockingClient mqtt5BlockingClient;
+ private final MqttClientProperties clientProperties;
+ private final ComponentLog logger;
+
+ private MqttCallback callback;
+
+ public HiveMqV5ClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
+ this.mqtt5BlockingClient = createClient(clientProperties, logger);
+ this.clientProperties = clientProperties;
+ this.logger = logger;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return mqtt5BlockingClient.getState().isConnected();
+ }
+
+ @Override
+ public void connect() {
+ logger.debug("Connecting to broker");
+
+ final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+ .keepAlive(clientProperties.getKeepAliveInterval());
+
+ final boolean cleanSession = clientProperties.isCleanSession();
+ connectBuilder.cleanStart(cleanSession);
+ if (!cleanSession) {
+ connectBuilder.sessionExpiryInterval(clientProperties.getSessionExpiryInterval());
+ }
+
+ final String lastWillTopic = clientProperties.getLastWillTopic();
+ if (lastWillTopic != null) {
+ connectBuilder.willPublish()
+ .topic(lastWillTopic)
+ .payload(clientProperties.getLastWillMessage().getBytes())
+ .retain(clientProperties.getLastWillRetain())
+ .qos(MqttQos.fromCode(clientProperties.getLastWillQos()))
+ .applyWillPublish();
+ }
+
+ final String username = clientProperties.getUsername();
+ final String password = clientProperties.getPassword();
+ if (username != null && password != null) {
+ connectBuilder.simpleAuth()
+ .username(clientProperties.getUsername())
+ .password(password.getBytes(StandardCharsets.UTF_8))
+ .applySimpleAuth();
+ }
+
+ final Mqtt5Connect mqtt5Connect = connectBuilder.build();
+ mqtt5BlockingClient.connect(mqtt5Connect);
+ }
+
+ @Override
+ public void disconnect() {
+ logger.debug("Disconnecting client");
+ // Currently it is not possible to set timeout for disconnect with HiveMQ Client.
+ mqtt5BlockingClient.disconnect();
+ }
+
+ @Override
+ public void close() {
+ // there is no paho's close equivalent in hivemq client
+ }
+
+ @Override
+ public void publish(String topic, StandardMqttMessage message) {
+ logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos());
+
+ mqtt5BlockingClient.publishWith()
+ .topic(topic)
+ .payload(message.getPayload())
+ .retain(message.isRetained())
+ .qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos())))
+ .send();
+ }
+
+ @Override
+ public void subscribe(String topicFilter, int qos) {
+ Objects.requireNonNull(callback, "callback should be set");
+
+ logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
+
+ CompletableFuture<Mqtt5SubAck> futureAck = mqtt5BlockingClient.toAsync().subscribeWith()
+ .topicFilter(topicFilter)
+ .qos(Objects.requireNonNull(MqttQos.fromCode(qos)))
+ .callback(mqtt5Publish -> {
+ final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(
+ mqtt5Publish.getPayloadAsBytes(),
+ mqtt5Publish.getQos().getCode(),
+ mqtt5Publish.isRetain(),
+ mqtt5Publish.getTopic().toString());
+ callback.messageArrived(receivedMessage);
+ })
+ .send();
+
+ // Setting "listener" callback is only possible with async client, though sending subscribe message
+ // should happen in a blocking way to make sure the processor is blocked until ack is not arrived.
+ try {
+ Mqtt5SubAck ack = futureAck.get(clientProperties.getConnectionTimeout(), TimeUnit.SECONDS);
+ logger.debug("Received mqtt5 subscribe ack: {}", ack);
+ } catch (Exception e) {
+ throw new MqttException("An error has occurred during sending subscribe message to broker", e);
+ }
+ }
+
+ @Override
+ public void setCallback(MqttCallback callback) {
+ this.callback = callback;
+ }
+
+ private static Mqtt5BlockingClient createClient(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
+ logger.debug("Creating Mqtt v5 client");
+
+ Mqtt5ClientBuilder mqtt5ClientBuilder = Mqtt5Client.builder()
+ .identifier(clientProperties.getClientId())
+ .serverHost(clientProperties.getBrokerUri().getHost());
+
+ int port = clientProperties.getBrokerUri().getPort();
+ if (port != -1) {
+ mqtt5ClientBuilder.serverPort(port);
+ }
+
+ // default is tcp
+ if (WS.equals(clientProperties.getScheme()) || WSS.equals(clientProperties.getScheme())) {
+ mqtt5ClientBuilder.webSocketConfig().applyWebSocketConfig();
+ }
+
+ if (SSL.equals(clientProperties.getScheme())) {
+ if (clientProperties.getTlsConfiguration().getTruststorePath() != null) {
+ mqtt5ClientBuilder
+ .sslConfig()
+ .trustManagerFactory(KeyStoreUtils.loadTrustManagerFactory(
+ clientProperties.getTlsConfiguration().getTruststorePath(),
+ clientProperties.getTlsConfiguration().getTruststorePassword(),
+ clientProperties.getTlsConfiguration().getTruststoreType().getType()))
+ .applySslConfig();
+ }
+
+ if (clientProperties.getTlsConfiguration().getKeystorePath() != null) {
+ mqtt5ClientBuilder
+ .sslConfig()
+ .keyManagerFactory(KeyStoreUtils.loadKeyManagerFactory(
+ clientProperties.getTlsConfiguration().getKeystorePath(),
+ clientProperties.getTlsConfiguration().getKeystorePassword(),
+ null,
+ clientProperties.getTlsConfiguration().getKeystoreType().getType()))
+ .applySslConfig();
+ }
+ }
+
+ return mqtt5ClientBuilder.buildBlocking();
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
new file mode 100644
index 0000000000..90cdc5c1c8
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/PahoMqttClientAdapter.java
@@ -0,0 +1,191 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.common.MqttCallback;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttClientProperties;
+import org.apache.nifi.processors.mqtt.common.MqttException;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
+import org.apache.nifi.security.util.TlsConfiguration;
+import org.eclipse.paho.client.mqttv3.IMqttClient;
+import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+import org.eclipse.paho.client.mqttv3.MqttMessage;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import java.util.Properties;
+
+public class PahoMqttClientAdapter implements MqttClient {
+
+ public static final int DISCONNECT_TIMEOUT = 5000;
+
+ private final IMqttClient client;
+ private final MqttClientProperties clientProperties;
+ private final ComponentLog logger;
+
+ public PahoMqttClientAdapter(MqttClientProperties clientProperties, ComponentLog logger) {
+ this.client = createClient(clientProperties, logger);
+ this.clientProperties = clientProperties;
+ this.logger = logger;
+ }
+
+ @Override
+ public boolean isConnected() {
+ return client.isConnected();
+ }
+
+ @Override
+ public void connect() {
+ logger.debug("Connecting to broker");
+
+ try {
+ final MqttConnectOptions connectOptions = new MqttConnectOptions();
+
+ connectOptions.setCleanSession(clientProperties.isCleanSession());
+ connectOptions.setKeepAliveInterval(clientProperties.getKeepAliveInterval());
+ connectOptions.setMqttVersion(clientProperties.getMqttVersion().getVersionCode());
+ connectOptions.setConnectionTimeout(clientProperties.getConnectionTimeout());
+
+ final TlsConfiguration tlsConfiguration = clientProperties.getTlsConfiguration();
+ if (tlsConfiguration != null) {
+ connectOptions.setSSLProperties(transformSSLContextService(tlsConfiguration));
+ }
+
+ final String lastWillTopic = clientProperties.getLastWillTopic();
+ if (lastWillTopic != null) {
+ boolean lastWillRetain = clientProperties.getLastWillRetain() != null && clientProperties.getLastWillRetain();
+ connectOptions.setWill(lastWillTopic, clientProperties.getLastWillMessage().getBytes(), clientProperties.getLastWillQos(), lastWillRetain);
+ }
+
+ final String username = clientProperties.getUsername();
+ if (username != null) {
+ connectOptions.setUserName(username);
+ connectOptions.setPassword(clientProperties.getPassword().toCharArray());
+ }
+
+ client.connect(connectOptions);
+ } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+ throw new MqttException("An error has occurred during connecting to broker", e);
+ }
+ }
+
+ @Override
+ public void disconnect() {
+ logger.debug("Disconnecting client with timeout: {}", DISCONNECT_TIMEOUT);
+
+ try {
+ client.disconnect(DISCONNECT_TIMEOUT);
+ } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+ throw new MqttException("An error has occurred during disconnecting client with timeout: " + DISCONNECT_TIMEOUT, e);
+ }
+ }
+
+ @Override
+ public void close() {
+ logger.debug("Closing client");
+
+ try {
+ client.close();
+ } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+ throw new MqttException("An error has occurred during closing client", e);
+ }
+ }
+
+ @Override
+ public void publish(String topic, StandardMqttMessage message) {
+ logger.debug("Publishing message to {} with QoS: {}", topic, message.getQos());
+
+ try {
+ client.publish(topic, message.getPayload(), message.getQos(), message.isRetained());
+ } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+ throw new MqttException("An error has occurred during publishing message to " + topic + " with QoS: " + message.getQos(), e);
+ }
+ }
+
+ @Override
+ public void subscribe(String topicFilter, int qos) {
+ logger.debug("Subscribing to {} with QoS: {}", topicFilter, qos);
+
+ try {
+ client.subscribe(topicFilter, qos);
+ } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+ throw new MqttException("An error has occurred during subscribing to " + topicFilter + " with QoS: " + qos, e);
+ }
+ }
+
+ @Override
+ public void setCallback(MqttCallback callback) {
+ client.setCallback(new org.eclipse.paho.client.mqttv3.MqttCallback() {
+ @Override
+ public void connectionLost(Throwable cause) {
+ callback.connectionLost(cause);
+ }
+
+ @Override
+ public void messageArrived(String topic, MqttMessage message) {
+ logger.debug("Message arrived with id: {}", message.getId());
+ final ReceivedMqttMessage receivedMessage = new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic);
+ callback.messageArrived(receivedMessage);
+ }
+
+ @Override
+ public void deliveryComplete(IMqttDeliveryToken token) {
+ callback.deliveryComplete(token.toString());
+ }
+ });
+ }
+
+ public static Properties transformSSLContextService(TlsConfiguration tlsConfiguration) {
+ final Properties properties = new Properties();
+ if (tlsConfiguration.getProtocol() != null) {
+ properties.setProperty("com.ibm.ssl.protocol", tlsConfiguration.getProtocol());
+ }
+ if (tlsConfiguration.getKeystorePath() != null) {
+ properties.setProperty("com.ibm.ssl.keyStore", tlsConfiguration.getKeystorePath());
+ }
+ if (tlsConfiguration.getKeystorePassword() != null) {
+ properties.setProperty("com.ibm.ssl.keyStorePassword", tlsConfiguration.getKeystorePassword());
+ }
+ if (tlsConfiguration.getKeystoreType() != null) {
+ properties.setProperty("com.ibm.ssl.keyStoreType", tlsConfiguration.getKeystoreType().getType());
+ }
+ if (tlsConfiguration.getTruststorePath() != null) {
+ properties.setProperty("com.ibm.ssl.trustStore", tlsConfiguration.getTruststorePath());
+ }
+ if (tlsConfiguration.getTruststorePassword() != null) {
+ properties.setProperty("com.ibm.ssl.trustStorePassword", tlsConfiguration.getTruststorePassword());
+ }
+ if (tlsConfiguration.getTruststoreType() != null) {
+ properties.setProperty("com.ibm.ssl.trustStoreType", tlsConfiguration.getTruststoreType().getType());
+ }
+ return properties;
+ }
+
+ private static org.eclipse.paho.client.mqttv3.MqttClient createClient(MqttClientProperties clientProperties, ComponentLog logger) {
+ logger.debug("Creating Mqtt v3 client");
+
+ try {
+ return new org.eclipse.paho.client.mqttv3.MqttClient(clientProperties.getBroker(), clientProperties.getClientId(), new MemoryPersistence());
+ } catch (org.eclipse.paho.client.mqttv3.MqttException e) {
+ throw new MqttException("An error has occurred during creating adapter for MQTT v3 client", e);
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 13bc624216..8b8c186360 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -31,25 +31,25 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessSessionFactory;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttClient;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.Collection;
import java.util.List;
-import java.util.Properties;
import java.util.UUID;
+import java.util.concurrent.TimeUnit;
+import static org.apache.commons.lang3.EnumUtils.isValidEnumIgnoreCase;
+import static org.apache.commons.lang3.StringUtils.EMPTY;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_FALSE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_CLEAN_SESSION_TRUE;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_310;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_311;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_500;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_MQTT_VERSION_AUTO;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_0;
import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VALUE_QOS_1;
@@ -57,63 +57,68 @@ import static org.apache.nifi.processors.mqtt.common.MqttConstants.ALLOWABLE_VAL
public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProcessor {
- public static int DISCONNECT_TIMEOUT = 5000;
+ private static final String DEFAULT_SESSION_EXPIRY_INTERVAL = "24 hrs";
protected ComponentLog logger;
- protected IMqttClient mqttClient;
- protected volatile String broker;
- protected volatile String brokerUri;
- protected volatile String clientID;
- protected MqttConnectOptions connOpts;
- protected MemoryPersistence persistence = new MemoryPersistence();
- public ProcessSessionFactory processSessionFactory;
+ protected MqttClientProperties clientProperties;
- public static final Validator QOS_VALIDATOR = new Validator() {
+ protected MqttClientFactory mqttClientFactory = new MqttClientFactory();
+ protected MqttClient mqttClient;
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- Integer inputInt = Integer.parseInt(input);
- if (inputInt < 0 || inputInt > 2) {
- return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2").build();
- }
- return new ValidationResult.Builder().subject(subject).valid(true).build();
+ public ProcessSessionFactory processSessionFactory;
+
+ public static final Validator QOS_VALIDATOR = (subject, input, context) -> {
+ Integer inputInt = Integer.parseInt(input);
+ if (inputInt < 0 || inputInt > 2) {
+ return new ValidationResult.Builder().subject(subject).valid(false).explanation("QoS must be an integer between 0 and 2.").build();
}
+ return new ValidationResult.Builder().subject(subject).valid(true).build();
};
- public static final Validator BROKER_VALIDATOR = new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- try{
- URI brokerURI = new URI(input);
- if (!"".equals(brokerURI.getPath())) {
- return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is:" + brokerURI.getPath()).build();
- }
- if (!("tcp".equals(brokerURI.getScheme()) || "ssl".equals(brokerURI.getScheme()) || "ws".equals(brokerURI.getScheme()) || "wss".equals(brokerURI.getScheme()))) {
- return new ValidationResult.Builder().subject(subject).valid(false).explanation("only the 'tcp', 'ssl', 'ws' and 'wss' schemes are supported.").build();
- }
- } catch (URISyntaxException e) {
- return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
+ public static final Validator BROKER_VALIDATOR = (subject, input, context) -> {
+ try {
+ URI brokerURI = new URI(input);
+ if (!EMPTY.equals(brokerURI.getPath())) {
+ return new ValidationResult.Builder().subject(subject).valid(false).explanation("the broker URI cannot have a path. It currently is: " + brokerURI.getPath()).build();
}
- return new ValidationResult.Builder().subject(subject).valid(true).build();
+ if (!isValidEnumIgnoreCase(MqttProtocolScheme.class, brokerURI.getScheme())) {
+ return new ValidationResult.Builder().subject(subject).valid(false)
+ .explanation("scheme is invalid. Supported schemes are: " + getSupportedSchemeList()).build();
+ }
+ } catch (URISyntaxException e) {
+ return new ValidationResult.Builder().subject(subject).valid(false).explanation("it is not valid URI syntax.").build();
}
+ return new ValidationResult.Builder().subject(subject).valid(true).build();
};
- public static final Validator RETAIN_VALIDATOR = new Validator() {
-
- @Override
- public ValidationResult validate(String subject, String input, ValidationContext context) {
- if("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)){
- return new ValidationResult.Builder().subject(subject).valid(true).build();
- } else{
- return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
- .validate(subject, input, context);
- }
+ private static String getSupportedSchemeList() {
+ return String.join(", ", Arrays.stream(MqttProtocolScheme.values()).map(value -> value.name().toLowerCase()).toArray(String[]::new));
+ }
+ public static final Validator RETAIN_VALIDATOR = (subject, input, context) -> {
+ if ("true".equalsIgnoreCase(input) || "false".equalsIgnoreCase(input)) {
+ return new ValidationResult.Builder().subject(subject).valid(true).build();
+ } else {
+ return StandardValidators.createAttributeExpressionLanguageValidator(AttributeExpression.ResultType.BOOLEAN, false)
+ .validate(subject, input, context);
}
+
};
+ public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder()
+ .name("MQTT Specification Version")
+ .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.")
+ .allowableValues(
+ ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
+ ALLOWABLE_VALUE_MQTT_VERSION_500,
+ ALLOWABLE_VALUE_MQTT_VERSION_311,
+ ALLOWABLE_VALUE_MQTT_VERSION_310
+ )
+ .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
+ .required(true)
+ .build();
+
public static final PropertyDescriptor PROP_BROKER_URI = new PropertyDescriptor.Builder()
.name("Broker URI")
.description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context " +
@@ -123,7 +128,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.addValidator(BROKER_VALIDATOR)
.build();
-
public static final PropertyDescriptor PROP_CLIENTID = new PropertyDescriptor.Builder()
.name("Client ID")
.description("MQTT client ID to use. If not set, a UUID will be generated.")
@@ -155,7 +159,6 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.identifiesControllerService(SSLContextService.class)
.build();
-
public static final PropertyDescriptor PROP_LAST_WILL_TOPIC = new PropertyDescriptor.Builder()
.name("Last Will Topic")
.description("The topic to send the client's Last Will to. If the Last Will topic and message are not set then a Last Will will not be sent.")
@@ -174,7 +177,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.name("Last Will Retain")
.description("Whether to retain the client's Last Will. If the Last Will topic and message are not set then a Last Will will not be sent.")
.required(false)
- .allowableValues("true","false")
+ .allowableValues("true", "false")
.build();
public static final PropertyDescriptor PROP_LAST_WILL_QOS = new PropertyDescriptor.Builder()
@@ -190,7 +193,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
public static final PropertyDescriptor PROP_CLEAN_SESSION = new PropertyDescriptor.Builder()
.name("Session state")
- .description("Whether to start afresh or resume previous flows. See the allowable value descriptions for more details.")
+ .description("Whether to start a fresh or resume previous flows. See the allowable value descriptions for more details.")
.required(true)
.allowableValues(
ALLOWABLE_VALUE_CLEAN_SESSION_TRUE,
@@ -199,16 +202,13 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.defaultValue(ALLOWABLE_VALUE_CLEAN_SESSION_TRUE.getValue())
.build();
- public static final PropertyDescriptor PROP_MQTT_VERSION = new PropertyDescriptor.Builder()
- .name("MQTT Specification Version")
- .description("The MQTT specification version when connecting with the broker. See the allowable value descriptions for more details.")
- .allowableValues(
- ALLOWABLE_VALUE_MQTT_VERSION_AUTO,
- ALLOWABLE_VALUE_MQTT_VERSION_311,
- ALLOWABLE_VALUE_MQTT_VERSION_310
- )
- .defaultValue(ALLOWABLE_VALUE_MQTT_VERSION_AUTO.getValue())
- .required(true)
+ public static final PropertyDescriptor PROP_SESSION_EXPIRY_INTERVAL = new PropertyDescriptor.Builder()
+ .name("Session Expiry Interval")
+ .description("After this interval the broker will expire the client and clear the session state.")
+ .addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
+ .dependsOn(PROP_MQTT_VERSION, ALLOWABLE_VALUE_MQTT_VERSION_500)
+ .dependsOn(PROP_CLEAN_SESSION, ALLOWABLE_VALUE_CLEAN_SESSION_FALSE)
+ .defaultValue(DEFAULT_SESSION_EXPIRY_INTERVAL)
.build();
public static final PropertyDescriptor PROP_CONN_TIMEOUT = new PropertyDescriptor.Builder()
@@ -232,8 +232,8 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
.build();
- public static List<PropertyDescriptor> getAbstractPropertyDescriptors(){
- final List<PropertyDescriptor> descriptors = new ArrayList<PropertyDescriptor>();
+ public static List<PropertyDescriptor> getAbstractPropertyDescriptors() {
+ final List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.add(PROP_BROKER_URI);
descriptors.add(PROP_CLIENTID);
descriptors.add(PROP_USERNAME);
@@ -244,6 +244,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
descriptors.add(PROP_LAST_WILL_RETAIN);
descriptors.add(PROP_LAST_WILL_QOS);
descriptors.add(PROP_CLEAN_SESSION);
+ descriptors.add(PROP_SESSION_EXPIRY_INTERVAL);
descriptors.add(PROP_MQTT_VERSION);
descriptors.add(PROP_CONN_TIMEOUT);
descriptors.add(PROP_KEEP_ALIVE_INTERVAL);
@@ -257,7 +258,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
final boolean passwordSet = validationContext.getProperty(PROP_PASSWORD).isSet();
if ((usernameSet && !passwordSet) || (!usernameSet && passwordSet)) {
- results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set").build());
+ results.add(new ValidationResult.Builder().subject("Username and Password").valid(false).explanation("if username or password is set, both must be set.").build());
}
final boolean lastWillTopicSet = validationContext.getProperty(PROP_LAST_WILL_TOPIC).isSet();
@@ -269,7 +270,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
// If any of the Last Will Properties are set
if (lastWillTopicSet || lastWillMessageSet || lastWillRetainSet || lastWillQosSet) {
// And any are not set
- if(!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)){
+ if (!(lastWillTopicSet && lastWillMessageSet && lastWillRetainSet && lastWillQosSet)) {
// Then mark as invalid
results.add(new ValidationResult.Builder().subject("Last Will Properties").valid(false).explanation("if any of the Last Will Properties (message, topic, retain and QoS) are " +
"set, all must be set.").build());
@@ -289,88 +290,34 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
return results;
}
- public static Properties transformSSLContextService(SSLContextService sslContextService){
- Properties properties = new Properties();
- if (sslContextService.getSslAlgorithm() != null) {
- properties.setProperty("com.ibm.ssl.protocol", sslContextService.getSslAlgorithm());
- }
- if (sslContextService.getKeyStoreFile() != null) {
- properties.setProperty("com.ibm.ssl.keyStore", sslContextService.getKeyStoreFile());
- }
- if (sslContextService.getKeyStorePassword() != null) {
- properties.setProperty("com.ibm.ssl.keyStorePassword", sslContextService.getKeyStorePassword());
- }
- if (sslContextService.getKeyStoreType() != null) {
- properties.setProperty("com.ibm.ssl.keyStoreType", sslContextService.getKeyStoreType());
- }
- if (sslContextService.getTrustStoreFile() != null) {
- properties.setProperty("com.ibm.ssl.trustStore", sslContextService.getTrustStoreFile());
- }
- if (sslContextService.getTrustStorePassword() != null) {
- properties.setProperty("com.ibm.ssl.trustStorePassword", sslContextService.getTrustStorePassword());
- }
- if (sslContextService.getTrustStoreType() != null) {
- properties.setProperty("com.ibm.ssl.trustStoreType", sslContextService.getTrustStoreType());
- }
- return properties;
+ protected void onScheduled(final ProcessContext context) {
+ clientProperties = getMqttClientProperties(context);
}
- protected void onScheduled(final ProcessContext context){
- broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
- brokerUri = broker.endsWith("/") ? broker : broker + "/";
- clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
-
- if (clientID == null) {
- clientID = UUID.randomUUID().toString();
- }
-
- connOpts = new MqttConnectOptions();
- connOpts.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
- connOpts.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
- connOpts.setMqttVersion(context.getProperty(PROP_MQTT_VERSION).asInteger());
- connOpts.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
-
- PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
- if (sslProp.isSet()) {
- Properties sslProps = transformSSLContextService((SSLContextService) sslProp.asControllerService());
- connOpts.setSSLProperties(sslProps);
- }
-
- PropertyValue lastWillTopicProp = context.getProperty(PROP_LAST_WILL_TOPIC);
- if (lastWillTopicProp.isSet()){
- String lastWillMessage = context.getProperty(PROP_LAST_WILL_MESSAGE).getValue();
- PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
- Integer lastWillQOS = context.getProperty(PROP_LAST_WILL_QOS).asInteger();
- connOpts.setWill(lastWillTopicProp.getValue(), lastWillMessage.getBytes(), lastWillQOS, lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
- }
-
-
- PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
- if(usernameProp.isSet()) {
- connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
- connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
- }
- }
+ protected void stopClient() {
+ // Since client is created in the onTrigger method it can happen that it never will be created because of an initialization error.
+ // We are preventing additional nullPtrException here, but the clean solution would be to create the client in the onScheduled method.
+ if (mqttClient != null) {
+ try {
+ logger.info("Disconnecting client");
+ mqttClient.disconnect();
+ } catch (Exception e) {
+ logger.error("Error disconnecting MQTT client", e);
+ }
- protected void onStopped() {
- try {
- logger.info("Disconnecting client");
- mqttClient.disconnect(DISCONNECT_TIMEOUT);
- } catch(MqttException me) {
- logger.error("Error disconnecting MQTT client due to {}", new Object[]{me.getMessage()}, me);
- }
+ try {
+ logger.info("Closing client");
+ mqttClient.close();
+ } catch (Exception e) {
+ logger.error("Error closing MQTT client", e);
+ }
- try {
- logger.info("Closing client");
- mqttClient.close();
mqttClient = null;
- } catch (MqttException me) {
- logger.error("Error closing MQTT client due to {}", new Object[]{me.getMessage()}, me);
}
}
- protected IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
- return new MqttClient(broker, clientID, persistence);
+ protected MqttClient createMqttClient() throws TlsException {
+ return mqttClientFactory.create(clientProperties, getLogger());
}
@@ -384,7 +331,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
onTrigger(context, session);
session.commitAsync();
} catch (final Throwable t) {
- getLogger().error("{} failed to process due to {}; rolling back session", new Object[]{this, t});
+ getLogger().error("{} failed to process due to {}; rolling back session", this, t);
session.rollback(true);
throw t;
}
@@ -392,8 +339,52 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
public abstract void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException;
- protected boolean isConnected(){
+ protected boolean isConnected() {
return (mqttClient != null && mqttClient.isConnected());
}
+ protected MqttClientProperties getMqttClientProperties(final ProcessContext context) {
+ final MqttClientProperties clientProperties = new MqttClientProperties();
+
+ try {
+ clientProperties.setBrokerUri(new URI(context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue()));
+ } catch (URISyntaxException e) {
+ throw new IllegalArgumentException("Invalid Broker URI", e);
+ }
+
+ String clientId = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
+ if (clientId == null) {
+ clientId = UUID.randomUUID().toString();
+ }
+ clientProperties.setClientId(clientId);
+
+ clientProperties.setMqttVersion(MqttVersion.fromVersionCode(context.getProperty(PROP_MQTT_VERSION).asInteger()));
+
+ clientProperties.setCleanSession(context.getProperty(PROP_CLEAN_SESSION).asBoolean());
+ clientProperties.setSessionExpiryInterval(context.getProperty(PROP_SESSION_EXPIRY_INTERVAL).asTimePeriod(TimeUnit.SECONDS));
+
+ clientProperties.setKeepAliveInterval(context.getProperty(PROP_KEEP_ALIVE_INTERVAL).asInteger());
+ clientProperties.setConnectionTimeout(context.getProperty(PROP_CONN_TIMEOUT).asInteger());
+
+ final PropertyValue sslProp = context.getProperty(PROP_SSL_CONTEXT_SERVICE);
+ if (sslProp.isSet()) {
+ final SSLContextService sslContextService = (SSLContextService) sslProp.asControllerService();
+ clientProperties.setTlsConfiguration(sslContextService.createTlsConfiguration());
+ }
+
+ clientProperties.setLastWillTopic(context.getProperty(PROP_LAST_WILL_TOPIC).getValue());
+ clientProperties.setLastWillMessage(context.getProperty(PROP_LAST_WILL_MESSAGE).getValue());
+ final PropertyValue lastWillRetain = context.getProperty(PROP_LAST_WILL_RETAIN);
+ clientProperties.setLastWillRetain(lastWillRetain.isSet() ? lastWillRetain.asBoolean() : false);
+ clientProperties.setLastWillQos(context.getProperty(PROP_LAST_WILL_QOS).asInteger());
+
+ final PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
+ if (usernameProp.isSet()) {
+ clientProperties.setUsername(usernameProp.evaluateAttributeExpressions().getValue());
+ }
+
+ clientProperties.setPassword(context.getProperty(PROP_PASSWORD).getValue());
+
+ return clientProperties;
+ }
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
similarity index 51%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
index d5e63c789b..a890616f5c 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttCallback.java
@@ -14,44 +14,10 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.processors.mqtt.common;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
- private String topic;
-
- private byte[] payload;
- private int qos = 1;
- private boolean retained = false;
- private boolean duplicate = false;
-
- public MQTTQueueMessage(String topic, MqttMessage message) {
- this.topic = topic;
- payload = message.getPayload();
- qos = message.getQos();
- retained = message.isRetained();
- duplicate = message.isDuplicate();
- }
-
- public String getTopic() {
- return topic;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public int getQos() {
- return qos;
- }
-
- public boolean isRetained() {
- return retained;
- }
-
- public boolean isDuplicate() {
- return duplicate;
- }
+public interface MqttCallback {
+ void connectionLost(Throwable cause);
+ void messageArrived(ReceivedMqttMessage message);
+ void deliveryComplete(String token);
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
new file mode 100644
index 0000000000..f21d3e9242
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClient.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+public interface MqttClient {
+
+ /**
+ * Determines if this client is currently connected to an MQTT broker.
+ *
+ * @return whether the client is connected.
+ */
+ boolean isConnected();
+
+ /**
+ * Connects the client to an MQTT broker.
+ */
+ void connect();
+
+ /**
+ * Disconnects client from an MQTT broker.
+ */
+ void disconnect();
+
+ /**
+ * Releases all resource associated with the client. After the client has
+ * been closed it cannot be reused. For instance attempts to connect will fail.
+ */
+ void close();
+
+ /**
+ * Publishes a message to a topic on the MQTT broker.
+ *
+ * @param topic to deliver the message to, for example "pipe-1/flow-rate"
+ * @param message to deliver to the MQTT broker
+ */
+ void publish(String topic, StandardMqttMessage message);
+
+ /**
+ * Subscribe to a topic.
+ *
+ * @param topicFilter the topic to subscribe to, which can include wildcards.
+ * @param qos the maximum quality of service at which to subscribe. Messages
+ * published at a lower quality of service will be received at the published
+ * QoS. Messages published at a higher quality of service will be received using
+ * the QoS specified on the subscribe.
+ */
+ void subscribe(String topicFilter, int qos);
+
+ /**
+ * Sets a callback listener to use for events that happen asynchronously.
+ *
+ * @param callback for matching events
+ */
+ void setCallback(MqttCallback callback);
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java
new file mode 100644
index 0000000000..630385f5e1
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processors.mqtt.adapters.HiveMqV5ClientAdapter;
+import org.apache.nifi.processors.mqtt.adapters.PahoMqttClientAdapter;
+import org.apache.nifi.security.util.TlsException;
+
+public class MqttClientFactory {
+ public MqttClient create(MqttClientProperties clientProperties, ComponentLog logger) throws TlsException {
+ switch (clientProperties.getMqttVersion()) {
+ case MQTT_VERSION_3_AUTO:
+ case MQTT_VERSION_3_1:
+ case MQTT_VERSION_3_1_1:
+ return new PahoMqttClientAdapter(clientProperties, logger);
+ case MQTT_VERSION_5_0:
+ return new HiveMqV5ClientAdapter(clientProperties, logger);
+ default:
+ throw new MqttException("Unsupported Mqtt version: " + clientProperties.getMqttVersion());
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java
new file mode 100644
index 0000000000..eecde9b1b2
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.apache.nifi.security.util.TlsConfiguration;
+
+import java.net.URI;
+
+public class MqttClientProperties {
+ private URI brokerUri;
+ private String clientId;
+
+ private MqttVersion mqttVersion;
+
+ private int keepAliveInterval;
+ private int connectionTimeout;
+
+ private boolean cleanSession;
+ private Long sessionExpiryInterval;
+
+ private TlsConfiguration tlsConfiguration;
+
+ private String lastWillTopic;
+ private String lastWillMessage;
+ private Boolean lastWillRetain;
+ private Integer lastWillQos;
+
+ private String username;
+ private String password;
+
+ public String getBroker() {
+ return brokerUri.toString();
+ }
+
+ public MqttProtocolScheme getScheme() {
+ return MqttProtocolScheme.valueOf(brokerUri.getScheme().toUpperCase());
+ }
+
+ public URI getBrokerUri() {
+ return brokerUri;
+ }
+
+ public void setBrokerUri(URI brokerUri) {
+ this.brokerUri = brokerUri;
+ }
+
+ public String getClientId() {
+ return clientId;
+ }
+
+ public void setClientId(String clientId) {
+ this.clientId = clientId;
+ }
+
+ public MqttVersion getMqttVersion() {
+ return mqttVersion;
+ }
+
+ public void setMqttVersion(MqttVersion mqttVersion) {
+ this.mqttVersion = mqttVersion;
+ }
+
+ public int getKeepAliveInterval() {
+ return keepAliveInterval;
+ }
+
+ public void setKeepAliveInterval(int keepAliveInterval) {
+ this.keepAliveInterval = keepAliveInterval;
+ }
+
+ public int getConnectionTimeout() {
+ return connectionTimeout;
+ }
+
+ public void setConnectionTimeout(int connectionTimeout) {
+ this.connectionTimeout = connectionTimeout;
+ }
+
+ public boolean isCleanSession() {
+ return cleanSession;
+ }
+
+ public void setCleanSession(boolean cleanSession) {
+ this.cleanSession = cleanSession;
+ }
+
+ public Long getSessionExpiryInterval() {
+ return sessionExpiryInterval;
+ }
+
+ public void setSessionExpiryInterval(Long sessionExpiryInterval) {
+ this.sessionExpiryInterval = sessionExpiryInterval;
+ }
+
+ public TlsConfiguration getTlsConfiguration() {
+ return tlsConfiguration;
+ }
+
+ public void setTlsConfiguration(TlsConfiguration tlsConfiguration) {
+ this.tlsConfiguration = tlsConfiguration;
+ }
+
+ public String getLastWillTopic() {
+ return lastWillTopic;
+ }
+
+ public void setLastWillTopic(String lastWillTopic) {
+ this.lastWillTopic = lastWillTopic;
+ }
+
+ public String getLastWillMessage() {
+ return lastWillMessage;
+ }
+
+ public void setLastWillMessage(String lastWillMessage) {
+ this.lastWillMessage = lastWillMessage;
+ }
+
+ public Boolean getLastWillRetain() {
+ return lastWillRetain;
+ }
+
+ public void setLastWillRetain(Boolean lastWillRetain) {
+ this.lastWillRetain = lastWillRetain;
+ }
+
+ public Integer getLastWillQos() {
+ return lastWillQos;
+ }
+
+ public void setLastWillQos(Integer lastWillQos) {
+ this.lastWillQos = lastWillQos;
+ }
+
+ public String getUsername() {
+ return username;
+ }
+
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ public String getPassword() {
+ return password;
+ }
+
+ public void setPassword(String password) {
+ this.password = password;
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
index a29e6ff616..072708f6da 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java
@@ -18,7 +18,11 @@
package org.apache.nifi.processors.mqtt.common;
import org.apache.nifi.components.AllowableValue;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1;
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_1_1;
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_3_AUTO;
+import static org.apache.nifi.processors.mqtt.common.MqttVersion.MQTT_VERSION_5_0;
public class MqttConstants {
@@ -66,15 +70,16 @@ public class MqttConstants {
------------------------------------------
*/
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
- new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT),
- "AUTO",
+ new AllowableValue(String.valueOf(MQTT_VERSION_3_AUTO.getVersionCode()), MQTT_VERSION_3_AUTO.getDisplayName(),
"Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker");
+ public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_500 =
+ new AllowableValue(String.valueOf(MQTT_VERSION_5_0.getVersionCode()), MQTT_VERSION_5_0.getDisplayName());
+
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
- new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1),
- "v3.1.1");
+ new AllowableValue(String.valueOf(MQTT_VERSION_3_1_1.getVersionCode()), MQTT_VERSION_3_1_1.getDisplayName());
public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
- new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1),
- "v3.1.0");
+ new AllowableValue(String.valueOf(MQTT_VERSION_3_1.getVersionCode()), MQTT_VERSION_3_1.getDisplayName());
+
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java
similarity index 52%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java
index d5e63c789b..cee1599136 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttException.java
@@ -14,44 +14,15 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.processors.mqtt.common;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
- private String topic;
-
- private byte[] payload;
- private int qos = 1;
- private boolean retained = false;
- private boolean duplicate = false;
-
- public MQTTQueueMessage(String topic, MqttMessage message) {
- this.topic = topic;
- payload = message.getPayload();
- qos = message.getQos();
- retained = message.isRetained();
- duplicate = message.isDuplicate();
- }
-
- public String getTopic() {
- return topic;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public int getQos() {
- return qos;
- }
+public class MqttException extends RuntimeException {
- public boolean isRetained() {
- return retained;
+ public MqttException(String message) {
+ super(message);
}
- public boolean isDuplicate() {
- return duplicate;
+ public MqttException(String message, Throwable cause) {
+ super(message, cause);
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java
similarity index 51%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java
index d5e63c789b..1474694223 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttProtocolScheme.java
@@ -14,44 +14,11 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.processors.mqtt.common;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
- private String topic;
-
- private byte[] payload;
- private int qos = 1;
- private boolean retained = false;
- private boolean duplicate = false;
-
- public MQTTQueueMessage(String topic, MqttMessage message) {
- this.topic = topic;
- payload = message.getPayload();
- qos = message.getQos();
- retained = message.isRetained();
- duplicate = message.isDuplicate();
- }
-
- public String getTopic() {
- return topic;
- }
-
- public byte[] getPayload() {
- return payload;
- }
-
- public int getQos() {
- return qos;
- }
-
- public boolean isRetained() {
- return retained;
- }
-
- public boolean isDuplicate() {
- return duplicate;
- }
+public enum MqttProtocolScheme {
+ TCP,
+ SSL,
+ WS,
+ WSS
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java
new file mode 100644
index 0000000000..0aec86f138
--- /dev/null
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttVersion.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
+
+public enum MqttVersion {
+ MQTT_VERSION_3_AUTO(MqttConnectOptions.MQTT_VERSION_DEFAULT, "v3 AUTO"),
+ MQTT_VERSION_3_1(MqttConnectOptions.MQTT_VERSION_3_1, "v3.1.0"),
+ MQTT_VERSION_3_1_1(MqttConnectOptions.MQTT_VERSION_3_1_1, "v3.1.1"),
+ MQTT_VERSION_5_0(5, "v5.0");
+
+ private final int versionCode;
+ private final String displayName;
+
+ MqttVersion(int versionCode, String displayName) {
+ this.versionCode = versionCode;
+ this.displayName = displayName;
+ }
+
+ public int getVersionCode() {
+ return versionCode;
+ }
+
+ public String getDisplayName() {
+ return displayName;
+ }
+
+ public static MqttVersion fromVersionCode(int versionCode) {
+ for (MqttVersion version : values()) {
+ if (version.getVersionCode() == versionCode) {
+ return version;
+ }
+ }
+ throw new IllegalArgumentException("Unable to map MqttVersionCode from version code: " + versionCode);
+ }
+}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java
similarity index 61%
copy from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
copy to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java
index d5e63c789b..abe1fe6281 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/ReceivedMqttMessage.java
@@ -14,44 +14,25 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.processors.mqtt.common;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
+/**
+ * Represents a received MQTT message
+ */
+public class ReceivedMqttMessage extends StandardMqttMessage {
-public class MQTTQueueMessage {
private String topic;
- private byte[] payload;
- private int qos = 1;
- private boolean retained = false;
- private boolean duplicate = false;
-
- public MQTTQueueMessage(String topic, MqttMessage message) {
+ public ReceivedMqttMessage(byte[] payload, int qos, boolean retained, String topic) {
+ super(payload, qos, retained);
this.topic = topic;
- payload = message.getPayload();
- qos = message.getQos();
- retained = message.isRetained();
- duplicate = message.isDuplicate();
}
public String getTopic() {
return topic;
}
- public byte[] getPayload() {
- return payload;
- }
-
- public int getQos() {
- return qos;
- }
-
- public boolean isRetained() {
- return retained;
- }
-
public boolean isDuplicate() {
- return duplicate;
+ return false;
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java
similarity index 64%
rename from nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
rename to nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java
index d5e63c789b..2a232eab86 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MQTTQueueMessage.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/StandardMqttMessage.java
@@ -14,29 +14,20 @@
* See the License for the specific language governing permissions and
* limitations under the License.
*/
-
package org.apache.nifi.processors.mqtt.common;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-
-public class MQTTQueueMessage {
- private String topic;
-
+/**
+ * Represents a MQTT message.
+ */
+public class StandardMqttMessage {
private byte[] payload;
- private int qos = 1;
- private boolean retained = false;
- private boolean duplicate = false;
+ private int qos;
+ private boolean retained;
- public MQTTQueueMessage(String topic, MqttMessage message) {
- this.topic = topic;
- payload = message.getPayload();
- qos = message.getQos();
- retained = message.isRetained();
- duplicate = message.isDuplicate();
- }
-
- public String getTopic() {
- return topic;
+ public StandardMqttMessage(byte[] payload, int qos, boolean retained) {
+ this.payload = payload;
+ this.qos = qos;
+ this.retained = retained;
}
public byte[] getPayload() {
@@ -50,8 +41,4 @@ public class MQTTQueueMessage {
public boolean isRetained() {
return retained;
}
-
- public boolean isDuplicate() {
- return duplicate;
- }
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index f66b4dd402..81ea6d7db1 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -18,8 +18,10 @@
package org.apache.nifi.processors.mqtt;
import org.apache.nifi.processor.ProcessSession;
-import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
+import org.apache.nifi.processors.mqtt.common.ReceivedMqttMessage;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.processors.mqtt.common.TestConsumeMqttCommon;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.security.util.SslContextFactory;
@@ -29,10 +31,6 @@ import org.apache.nifi.security.util.TlsException;
import org.apache.nifi.ssl.SSLContextService;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.BeforeAll;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
@@ -59,8 +57,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
}
@Override
- public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
- mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Subscriber);
+ protected MqttClient createMqttClient() {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Subscriber);
return mqttTestClient;
}
}
@@ -111,10 +109,10 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
public void testMessageNotConsumedOnCommitFail() throws NoSuchFieldException, IllegalAccessException {
testRunner.run(1, false);
ConsumeMQTT processor = (ConsumeMQTT) testRunner.getProcessor();
- MQTTQueueMessage mock = mock(MQTTQueueMessage.class);
+ ReceivedMqttMessage mock = mock(ReceivedMqttMessage.class);
when(mock.getPayload()).thenReturn(new byte[0]);
when(mock.getTopic()).thenReturn("testTopic");
- BlockingQueue<MQTTQueueMessage> mqttQueue = getMqttQueue(processor);
+ BlockingQueue<ReceivedMqttMessage> mqttQueue = getMqttQueue(processor);
mqttQueue.add(mock);
ProcessSession session = testRunner.getProcessSessionFactory().createSession();
@@ -131,11 +129,7 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
}
@Override
- public void internalPublish(final MqttMessage message, final String topicName) {
- try {
- mqttTestClient.publish(topicName, message);
- } catch (MqttException e) {
- throw new RuntimeException(e);
- }
+ public void internalPublish(final StandardMqttMessage message, final String topicName) {
+ mqttTestClient.publish(topicName, message);
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
index 1755365c1c..41181c65ed 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestPublishMQTT.java
@@ -17,13 +17,12 @@
package org.apache.nifi.processors.mqtt;
-import org.apache.nifi.processors.mqtt.common.MQTTQueueMessage;
+import org.apache.nifi.processors.mqtt.common.MqttClient;
+import org.apache.nifi.processors.mqtt.common.MqttException;
import org.apache.nifi.processors.mqtt.common.MqttTestClient;
+import org.apache.nifi.processors.mqtt.common.StandardMqttMessage;
import org.apache.nifi.processors.mqtt.common.TestPublishMqttCommon;
import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
import org.junit.jupiter.api.BeforeEach;
import java.util.Arrays;
@@ -34,11 +33,12 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
@Override
public void verifyPublishedMessage(byte[] payload, int qos, boolean retain) {
- MQTTQueueMessage mqttQueueMessage = mqttTestClient.publishedMessage;
- assertEquals(Arrays.toString(payload), Arrays.toString(mqttQueueMessage.getPayload()));
- assertEquals(qos, mqttQueueMessage.getQos());
- assertEquals(retain, mqttQueueMessage.isRetained());
- assertEquals(topic, mqttQueueMessage.getTopic());
+ StandardMqttMessage lastPublishedMessage = mqttTestClient.getLastPublishedMessage();
+ String lastPublishedTopic = mqttTestClient.getLastPublishedTopic();
+ assertEquals(Arrays.toString(payload), Arrays.toString(lastPublishedMessage.getPayload()));
+ assertEquals(qos, lastPublishedMessage.getQos());
+ assertEquals(retain, lastPublishedMessage.isRetained());
+ assertEquals(topic, lastPublishedTopic);
}
private MqttTestClient mqttTestClient;
@@ -50,8 +50,8 @@ public class TestPublishMQTT extends TestPublishMqttCommon {
}
@Override
- public IMqttClient createMqttClient(String broker, String clientID, MemoryPersistence persistence) throws MqttException {
- mqttTestClient = new MqttTestClient(broker, clientID, MqttTestClient.ConnectType.Publisher);
+ protected MqttClient createMqttClient() throws MqttException {
+ mqttTestClient = new MqttTestClient(MqttTestClient.ConnectType.Publisher);
return mqttTestClient;
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
index b135dcfe44..91997061bc 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/MqttTestClient.java
@@ -17,24 +17,9 @@
package org.apache.nifi.processors.mqtt.common;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.IMqttDeliveryToken;
-import org.eclipse.paho.client.mqttv3.IMqttMessageListener;
-import org.eclipse.paho.client.mqttv3.IMqttToken;
-import org.eclipse.paho.client.mqttv3.MqttCallback;
-import org.eclipse.paho.client.mqttv3.MqttConnectOptions;
-import org.eclipse.paho.client.mqttv3.MqttException;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
-import org.eclipse.paho.client.mqttv3.MqttPersistenceException;
-import org.eclipse.paho.client.mqttv3.MqttSecurityException;
-import org.eclipse.paho.client.mqttv3.MqttTopic;
-
import java.util.concurrent.atomic.AtomicBoolean;
-public class MqttTestClient implements IMqttClient {
-
- public String serverURI;
- public String clientId;
+public class MqttTestClient implements MqttClient {
public AtomicBoolean connected = new AtomicBoolean(false);
@@ -42,184 +27,54 @@ public class MqttTestClient implements IMqttClient {
public ConnectType type;
public enum ConnectType {Publisher, Subscriber}
- public MQTTQueueMessage publishedMessage;
+ private StandardMqttMessage lastPublishedMessage;
+ private String lastPublishedTopic;
public String subscribedTopic;
public int subscribedQos;
- public MqttTestClient(String serverURI, String clientId, ConnectType type) throws MqttException {
- this.serverURI = serverURI;
- this.clientId = clientId;
+ public MqttTestClient(ConnectType type) {
this.type = type;
}
@Override
- public void connect() throws MqttSecurityException, MqttException {
- connected.set(true);
+ public boolean isConnected() {
+ return connected.get();
}
@Override
- public void connect(MqttConnectOptions options) throws MqttSecurityException, MqttException {
+ public void connect() {
connected.set(true);
}
@Override
- public IMqttToken connectWithResult(MqttConnectOptions options) throws MqttSecurityException, MqttException {
- return null;
- }
-
- @Override
- public void disconnect() throws MqttException {
- connected.set(false);
- }
-
- @Override
- public void disconnect(long quiesceTimeout) throws MqttException {
- connected.set(false);
- }
-
- @Override
- public void disconnectForcibly() throws MqttException {
- connected.set(false);
- }
-
- @Override
- public void disconnectForcibly(long disconnectTimeout) throws MqttException {
+ public void disconnect() {
connected.set(false);
}
@Override
- public void disconnectForcibly(long quiesceTimeout, long disconnectTimeout) throws MqttException {
- connected.set(false);
- }
-
- @Override
- public void subscribe(String topicFilter) throws MqttException, MqttSecurityException {
- subscribedTopic = topicFilter;
- subscribedQos = -1;
- }
-
- @Override
- public void subscribe(String[] topicFilters) throws MqttException {
- throw new UnsupportedOperationException("Multiple topic filters is not supported");
- }
-
- @Override
- public void subscribe(String topicFilter, int qos) throws MqttException {
- subscribedTopic = topicFilter;
- subscribedQos = qos;
- }
-
- @Override
- public void subscribe(String[] topicFilters, int[] qos) throws MqttException {
- throw new UnsupportedOperationException("Multiple topic filters is not supported");
- }
-
- @Override
- public void subscribe(String s, IMqttMessageListener iMqttMessageListener) throws MqttException, MqttSecurityException {
-
- }
-
- @Override
- public void subscribe(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
-
- }
-
- @Override
- public void subscribe(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException {
-
- }
-
- @Override
- public void subscribe(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
-
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String s) throws MqttException {
- return null;
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String s, IMqttMessageListener iMqttMessageListener) throws MqttException {
- return null;
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String s, int i) throws MqttException {
- return null;
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String s, int i, IMqttMessageListener iMqttMessageListener) throws MqttException {
- return null;
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String[] strings) throws MqttException {
- return null;
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String[] strings, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
- return null;
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String[] strings, int[] ints) throws MqttException {
- return null;
- }
-
- @Override
- public IMqttToken subscribeWithResponse(String[] strings, int[] ints, IMqttMessageListener[] iMqttMessageListeners) throws MqttException {
- return null;
- }
-
- @Override
- public void unsubscribe(String topicFilter) throws MqttException {
- subscribedTopic = "";
- subscribedQos = -2;
- }
+ public void close() {
- @Override
- public void unsubscribe(String[] topicFilters) throws MqttException {
- throw new UnsupportedOperationException("Multiple topic filters is not supported");
}
@Override
- public void publish(String topic, byte[] payload, int qos, boolean retained) throws MqttException, MqttPersistenceException {
- MqttMessage message = new MqttMessage(payload);
- message.setQos(qos);
- message.setRetained(retained);
+ public void publish(String topic, StandardMqttMessage message) {
switch (type) {
case Publisher:
- publishedMessage = new MQTTQueueMessage(topic, message);
+ lastPublishedMessage = message;
+ lastPublishedTopic = topic;
break;
case Subscriber:
- try {
- mqttCallback.messageArrived(topic, message);
- } catch (Exception e) {
- throw new MqttException(e);
- }
+ mqttCallback.messageArrived(new ReceivedMqttMessage(message.getPayload(), message.getQos(), message.isRetained(), topic));
break;
}
}
@Override
- public void publish(String topic, MqttMessage message) throws MqttException, MqttPersistenceException {
- switch (type) {
- case Publisher:
- publishedMessage = new MQTTQueueMessage(topic, message);
- break;
- case Subscriber:
- try {
- mqttCallback.messageArrived(topic, message);
- } catch (Exception e) {
- throw new MqttException(e);
- }
- break;
- }
+ public void subscribe(String topicFilter, int qos) {
+ subscribedTopic = topicFilter;
+ subscribedQos = qos;
}
@Override
@@ -227,48 +82,11 @@ public class MqttTestClient implements IMqttClient {
this.mqttCallback = callback;
}
- @Override
- public MqttTopic getTopic(String topic) {
- return null;
- }
-
- @Override
- public boolean isConnected() {
- return connected.get();
- }
-
- @Override
- public String getClientId() {
- return clientId;
- }
-
- @Override
- public String getServerURI() {
- return serverURI;
- }
-
- @Override
- public IMqttDeliveryToken[] getPendingDeliveryTokens() {
- return new IMqttDeliveryToken[0];
+ public StandardMqttMessage getLastPublishedMessage() {
+ return lastPublishedMessage;
}
- @Override
- public void setManualAcks(boolean b) {
-
- }
-
- @Override
- public void reconnect() throws MqttException {
-
- }
-
- @Override
- public void messageArrivedComplete(int i, int i1) throws MqttException {
-
- }
-
- @Override
- public void close() throws MqttException {
-
+ public String getLastPublishedTopic() {
+ return lastPublishedTopic;
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
index 0eec42ce71..7cbaa11c8d 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/common/TestConsumeMqttCommon.java
@@ -28,8 +28,6 @@ import org.apache.nifi.schema.access.SchemaAccessUtils;
import org.apache.nifi.util.MockFlowFile;
import org.apache.nifi.util.TestRunner;
import org.apache.nifi.util.TestRunners;
-import org.eclipse.paho.client.mqttv3.IMqttClient;
-import org.eclipse.paho.client.mqttv3.MqttMessage;
import org.junit.jupiter.api.Test;
import java.lang.reflect.Field;
@@ -66,7 +64,7 @@ public abstract class TestConsumeMqttCommon {
private static final int LEAST_ONE = 1;
private static final int EXACTLY_ONCE = 2;
- public abstract void internalPublish(MqttMessage message, String topicName);
+ public abstract void internalPublish(StandardMqttMessage message, String topicName);
@Test
public void testClientIDConfiguration() {
@@ -295,10 +293,8 @@ public abstract class TestConsumeMqttCommon {
testRunner.assertValid();
- MqttMessage innerMessage = new MqttMessage();
- innerMessage.setPayload(ByteBuffer.wrap("testMessage".getBytes()).array());
- innerMessage.setQos(2);
- MQTTQueueMessage testMessage = new MQTTQueueMessage("testTopic", innerMessage);
+ final byte[] content = ByteBuffer.wrap("testMessage".getBytes()).array();
+ ReceivedMqttMessage testMessage = new ReceivedMqttMessage(content, 2, false, "testTopic");
ConsumeMQTT consumeMQTT = (ConsumeMQTT) testRunner.getProcessor();
consumeMQTT.onScheduled(testRunner.getProcessContext());
@@ -313,7 +309,7 @@ public abstract class TestConsumeMqttCommon {
Field f = ConsumeMQTT.class.getDeclaredField("mqttQueue");
f.setAccessible(true);
@SuppressWarnings("unchecked")
- LinkedBlockingQueue<MQTTQueueMessage> queue = (LinkedBlockingQueue<MQTTQueueMessage>) f.get(consumeMQTT);
+ LinkedBlockingQueue<ReceivedMqttMessage> queue = (LinkedBlockingQueue<ReceivedMqttMessage>) f.get(consumeMQTT);
queue.add(testMessage);
consumeMQTT.onUnscheduled(testRunner.getProcessContext());
@@ -551,7 +547,7 @@ public abstract class TestConsumeMqttCommon {
private static boolean isConnected(AbstractMQTTProcessor processor) throws NoSuchFieldException, IllegalAccessException {
Field f = AbstractMQTTProcessor.class.getDeclaredField("mqttClient");
f.setAccessible(true);
- IMqttClient mqttClient = (IMqttClient) f.get(processor);
+ MqttClient mqttClient = (MqttClient) f.get(processor);
return mqttClient.isConnected();
}
@@ -563,10 +559,10 @@ public abstract class TestConsumeMqttCommon {
}
@SuppressWarnings("unchecked")
- public static BlockingQueue<MQTTQueueMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
+ public static BlockingQueue<ReceivedMqttMessage> getMqttQueue(ConsumeMQTT consumeMQTT) throws IllegalAccessException, NoSuchFieldException {
Field mqttQueueField = ConsumeMQTT.class.getDeclaredField("mqttQueue");
mqttQueueField.setAccessible(true);
- return (BlockingQueue<MQTTQueueMessage>) mqttQueueField.get(consumeMQTT);
+ return (BlockingQueue<ReceivedMqttMessage>) mqttQueueField.get(consumeMQTT);
}
public static void transferQueue(ConsumeMQTT consumeMQTT, ProcessSession session) throws NoSuchMethodException, InvocationTargetException, IllegalAccessException {
@@ -585,11 +581,7 @@ public abstract class TestConsumeMqttCommon {
}
private void publishMessage(final String payload, final int qos) {
- final MqttMessage message = new MqttMessage();
- message.setPayload(payload.getBytes(StandardCharsets.UTF_8));
- message.setQos(qos);
- message.setRetained(false);
-
+ final StandardMqttMessage message = new StandardMqttMessage(payload.getBytes(StandardCharsets.UTF_8), qos, false);
internalPublish(message, "testTopic");
}
}