You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2022/08/19 13:33:43 UTC

[GitHub] [nifi] turcsanyip commented on a diff in pull request #6225: NIFI-10251 Add v5 protocol support for existing MQTT processors

turcsanyip commented on code in PR #6225:
URL: https://github.com/apache/nifi/pull/6225#discussion_r950100231


##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java:
##########
@@ -67,14 +69,45 @@ public class MqttConstants {
      */
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
             new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT),
-                    "AUTO",
+                    "v3 AUTO",
                     "Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker");
 
+    public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_500 =
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_5_0.getNumericValue()),
+                    "v5.0");
+
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1),
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_3_1_1.getNumericValue()),
                     "v3.1.1");
 
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1),
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_3_1.getNumericValue()),
                     "v3.1.0");
+
+    public enum MqttVersion {
+        MQTT_VERSION_3_1(3),
+        MQTT_VERSION_3_1_1(4),
+        MQTT_VERSION_5_0(5);
+
+        private final int numericValue;

Review Comment:
   `versionCode` may be more descriptive.
   Its type could be `String` because it is always be used as `String`.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import org.apache.nifi.processors.mqtt.common.MqttConnectionProperties;
+import org.apache.nifi.processors.mqtt.common.NifiMqttCallback;
+import org.apache.nifi.processors.mqtt.common.NifiMqttClient;
+import org.apache.nifi.processors.mqtt.common.NifiMqttException;
+import org.apache.nifi.processors.mqtt.common.NifiMqttMessage;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class HiveMqV5ClientAdapter implements NifiMqttClient {
+
+    private final Mqtt5Client mqtt5Client;
+
+    private NifiMqttCallback callback;
+
+    public HiveMqV5ClientAdapter(Mqtt5BlockingClient mqtt5BlockingClient) {
+        this.mqtt5Client = mqtt5BlockingClient;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return mqtt5Client.getState().isConnected();
+    }
+
+    @Override
+    public void connect(MqttConnectionProperties connectionProperties) throws NifiMqttException {
+        final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+                .keepAlive(connectionProperties.getKeepAliveInterval());
+
+        final boolean cleanSession = connectionProperties.isCleanSession();
+        connectBuilder.cleanStart(cleanSession);
+        if (!cleanSession) {
+            connectBuilder.sessionExpiryInterval(connectionProperties.getSessionExpiryInterval());
+        }
+
+        final String lastWillTopic = connectionProperties.getLastWillTopic();
+        if (lastWillTopic != null) {
+            connectBuilder.willPublish()
+                    .topic(lastWillTopic)
+                    .payload(connectionProperties.getLastWillMessage().getBytes())
+                    .retain(connectionProperties.getLastWillRetain())
+                    .qos(MqttQos.fromCode(connectionProperties.getLastWillQOS()))
+                    .applyWillPublish();
+        }
+
+        // checking for presence of password because username can be null
+        final char[] password = connectionProperties.getPassword();
+        if (password != null) {
+            connectBuilder.simpleAuth()
+                    .username(connectionProperties.getUsername())
+                    .password(toBytes(password))
+                    .applySimpleAuth();
+
+            clearSensitive(connectionProperties.getPassword());
+            clearSensitive(password);
+        }
+
+        final Mqtt5Connect mqtt5Connect = connectBuilder.build();
+        mqtt5Client.toBlocking().connect(mqtt5Connect);
+    }
+
+    @Override
+    public void disconnect(long disconnectTimeout) throws NifiMqttException {
+        // Currently it is not possible to set timeout for disconnect with HiveMQ Client. (Only connect timeout exists.)
+        mqtt5Client.toBlocking().disconnect();
+    }
+
+    @Override
+    public void close() throws NifiMqttException {
+        // there is no paho's close equivalent in hivemq client
+    }
+
+    @Override
+    public void publish(String topic, NifiMqttMessage message) throws NifiMqttException {
+        mqtt5Client.toAsync().publishWith()
+                .topic(topic)
+                .payload(message.getPayload())
+                .retain(message.isRetained())
+                .qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos())))
+                .send();

Review Comment:
   I think we need to publish in blocking mode (no `mqtt5Client.toAsync()`). Otherwise we will not be notified when a publish is unsuccessful, will we?
   The returned `Future` could be used for it but blocking mode seems to me simpler.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientFactory.java:
##########
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5ClientBuilder;
+import org.apache.nifi.processors.mqtt.adapters.HiveMqV5ClientAdapter;
+import org.apache.nifi.processors.mqtt.adapters.PahoMqttClientAdapter;
+import org.eclipse.paho.client.mqttv3.MqttClient;
+import org.eclipse.paho.client.mqttv3.MqttException;
+import org.eclipse.paho.client.mqttv3.persist.MemoryPersistence;
+
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.SupportedSchemes.SSL;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.SupportedSchemes.WS;
+import static org.apache.nifi.processors.mqtt.common.MqttConstants.SupportedSchemes.WSS;
+
+public class MqttClientFactory {
+    public NifiMqttClient create(MqttClientProperties clientProperties, MqttConnectionProperties connectionProperties) {
+        switch (clientProperties.getMqttVersion()) {
+            case 0:
+            case 3:
+            case 4:
+                return createPahoMqttV3ClientAdapter(clientProperties);
+            case 5:
+                return createHiveMqV5ClientAdapter(clientProperties, connectionProperties);

Review Comment:
   Using some constants could to be useful here.
   `MqttVersion` enum may be used in `MqttClientProperties` instead of `int`, though it does not contain the AUTO option currently.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java:
##########
@@ -67,14 +69,45 @@ public class MqttConstants {
      */
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
             new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT),
-                    "AUTO",
+                    "v3 AUTO",
                     "Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker");
 
+    public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_500 =
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_5_0.getNumericValue()),
+                    "v5.0");
+
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1),
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_3_1_1.getNumericValue()),
                     "v3.1.1");
 
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1),
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_3_1.getNumericValue()),
                     "v3.1.0");
+
+    public enum MqttVersion {
+        MQTT_VERSION_3_1(3),
+        MQTT_VERSION_3_1_1(4),
+        MQTT_VERSION_5_0(5);

Review Comment:
   Values 3 and 4 come from the Paho library (`MqttConnectOptions.MQTT_VERSION_3_1` and `MqttConnectOptions.MQTT_VERSION_3_1_1`). I think it would make sense to reference those constants here.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttClientProperties.java:
##########
@@ -0,0 +1,58 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+import java.net.URI;
+
+public class MqttClientProperties {
+    private URI brokerURI;
+    private String clientID;

Review Comment:
   Minor: Following the conventions you use in the PR, these should be `brokerUri` and `clientId`. Also `MqttConnectionProperties.lastWillQos`.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/MqttConstants.java:
##########
@@ -67,14 +69,45 @@ public class MqttConstants {
      */
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_AUTO =
             new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_DEFAULT),
-                    "AUTO",
+                    "v3 AUTO",
                     "Start with v3.1.1 and fallback to v3.1.0 if not supported by a broker");
 
+    public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_500 =
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_5_0.getNumericValue()),
+                    "v5.0");
+
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_311 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1_1),
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_3_1_1.getNumericValue()),
                     "v3.1.1");
 
     public static final AllowableValue ALLOWABLE_VALUE_MQTT_VERSION_310 =
-            new AllowableValue(String.valueOf(MqttConnectOptions.MQTT_VERSION_3_1),
+            new AllowableValue(String.valueOf(MqttVersion.MQTT_VERSION_3_1.getNumericValue()),
                     "v3.1.0");
+
+    public enum MqttVersion {
+        MQTT_VERSION_3_1(3),
+        MQTT_VERSION_3_1_1(4),
+        MQTT_VERSION_5_0(5);
+
+        private final int numericValue;
+
+        MqttVersion(int numericValue) {
+            this.numericValue = numericValue;
+        }
+
+        public int getNumericValue() {
+            return numericValue;
+        }
+    }
+
+    public enum SupportedSchemes {

Review Comment:
   Please use singular for enum names (like `MqttVersion` above).
   I would also omit the "Supported" prefix. `MqttScheme` or `MqttProtocolScheme` sound better to me.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/NifiMqttMessage.java:
##########
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.common;
+
+public class NifiMqttMessage {

Review Comment:
   Do we really need it as a separate class besides `MQTTQueueMessage`?
   `MQTTQueueMessage` wraps a topic name and the fields from `NifiMqttMessage`. In the places where `NifiMqttMessage` is constructed, the topic is always present and they travel together to the point where `MQTTQueueMessage` is created. Could we just create a single object with all the topic and other fields at the beginning?



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import org.apache.nifi.processors.mqtt.common.MqttConnectionProperties;
+import org.apache.nifi.processors.mqtt.common.NifiMqttCallback;
+import org.apache.nifi.processors.mqtt.common.NifiMqttClient;
+import org.apache.nifi.processors.mqtt.common.NifiMqttException;
+import org.apache.nifi.processors.mqtt.common.NifiMqttMessage;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class HiveMqV5ClientAdapter implements NifiMqttClient {
+
+    private final Mqtt5Client mqtt5Client;
+
+    private NifiMqttCallback callback;
+
+    public HiveMqV5ClientAdapter(Mqtt5BlockingClient mqtt5BlockingClient) {
+        this.mqtt5Client = mqtt5BlockingClient;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return mqtt5Client.getState().isConnected();
+    }
+
+    @Override
+    public void connect(MqttConnectionProperties connectionProperties) throws NifiMqttException {
+        final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+                .keepAlive(connectionProperties.getKeepAliveInterval());
+
+        final boolean cleanSession = connectionProperties.isCleanSession();
+        connectBuilder.cleanStart(cleanSession);
+        if (!cleanSession) {
+            connectBuilder.sessionExpiryInterval(connectionProperties.getSessionExpiryInterval());
+        }
+
+        final String lastWillTopic = connectionProperties.getLastWillTopic();
+        if (lastWillTopic != null) {
+            connectBuilder.willPublish()
+                    .topic(lastWillTopic)
+                    .payload(connectionProperties.getLastWillMessage().getBytes())
+                    .retain(connectionProperties.getLastWillRetain())
+                    .qos(MqttQos.fromCode(connectionProperties.getLastWillQOS()))
+                    .applyWillPublish();
+        }
+
+        // checking for presence of password because username can be null
+        final char[] password = connectionProperties.getPassword();
+        if (password != null) {
+            connectBuilder.simpleAuth()
+                    .username(connectionProperties.getUsername())
+                    .password(toBytes(password))
+                    .applySimpleAuth();
+
+            clearSensitive(connectionProperties.getPassword());
+            clearSensitive(password);
+        }
+
+        final Mqtt5Connect mqtt5Connect = connectBuilder.build();
+        mqtt5Client.toBlocking().connect(mqtt5Connect);
+    }
+
+    @Override
+    public void disconnect(long disconnectTimeout) throws NifiMqttException {
+        // Currently it is not possible to set timeout for disconnect with HiveMQ Client. (Only connect timeout exists.)
+        mqtt5Client.toBlocking().disconnect();
+    }
+
+    @Override
+    public void close() throws NifiMqttException {
+        // there is no paho's close equivalent in hivemq client
+    }
+
+    @Override
+    public void publish(String topic, NifiMqttMessage message) throws NifiMqttException {
+        mqtt5Client.toAsync().publishWith()
+                .topic(topic)
+                .payload(message.getPayload())
+                .retain(message.isRetained())
+                .qos(Objects.requireNonNull(MqttQos.fromCode(message.getQos())))
+                .send();
+    }
+
+    @Override
+    public void subscribe(String topicFilter, int qos) {
+        Objects.requireNonNull(callback, "callback should be set");
+
+        mqtt5Client.toAsync().subscribeWith()
+                .topicFilter(topicFilter)
+                .qos(Objects.requireNonNull(MqttQos.fromCode(qos)))
+                .callback(mqtt5Publish -> {
+                    final NifiMqttMessage nifiMqttMessage = new NifiMqttMessage();
+                    nifiMqttMessage.setPayload(mqtt5Publish.getPayloadAsBytes());
+                    nifiMqttMessage.setQos(mqtt5Publish.getQos().getCode());
+                    nifiMqttMessage.setRetained(mqtt5Publish.isRetain());
+                    try {
+                        callback.messageArrived(mqtt5Publish.getTopic().toString(), nifiMqttMessage);
+                    } catch (Exception e) {
+                        throw new NifiMqttException(e);
+                    }
+                })
+                .send();

Review Comment:
   The result of the `subscribe()` call could be checked via the returned `Future` in order to provide an error message on the processor's bulletin. If I understand it correctly, the error message is only logged by the asynchronous thread currently.



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import org.apache.nifi.processors.mqtt.common.MqttConnectionProperties;
+import org.apache.nifi.processors.mqtt.common.NifiMqttCallback;
+import org.apache.nifi.processors.mqtt.common.NifiMqttClient;
+import org.apache.nifi.processors.mqtt.common.NifiMqttException;
+import org.apache.nifi.processors.mqtt.common.NifiMqttMessage;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class HiveMqV5ClientAdapter implements NifiMqttClient {
+
+    private final Mqtt5Client mqtt5Client;
+
+    private NifiMqttCallback callback;
+
+    public HiveMqV5ClientAdapter(Mqtt5BlockingClient mqtt5BlockingClient) {
+        this.mqtt5Client = mqtt5BlockingClient;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return mqtt5Client.getState().isConnected();
+    }
+
+    @Override
+    public void connect(MqttConnectionProperties connectionProperties) throws NifiMqttException {
+        final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+                .keepAlive(connectionProperties.getKeepAliveInterval());
+
+        final boolean cleanSession = connectionProperties.isCleanSession();
+        connectBuilder.cleanStart(cleanSession);
+        if (!cleanSession) {
+            connectBuilder.sessionExpiryInterval(connectionProperties.getSessionExpiryInterval());
+        }
+
+        final String lastWillTopic = connectionProperties.getLastWillTopic();
+        if (lastWillTopic != null) {
+            connectBuilder.willPublish()
+                    .topic(lastWillTopic)
+                    .payload(connectionProperties.getLastWillMessage().getBytes())
+                    .retain(connectionProperties.getLastWillRetain())
+                    .qos(MqttQos.fromCode(connectionProperties.getLastWillQOS()))
+                    .applyWillPublish();
+        }
+
+        // checking for presence of password because username can be null
+        final char[] password = connectionProperties.getPassword();
+        if (password != null) {
+            connectBuilder.simpleAuth()
+                    .username(connectionProperties.getUsername())
+                    .password(toBytes(password))
+                    .applySimpleAuth();

Review Comment:
   Not clear the intent for me here. Does it make sense to pass a password in without a username? 



##########
nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/adapters/HiveMqV5ClientAdapter.java:
##########
@@ -0,0 +1,194 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.mqtt.adapters;
+
+import com.hivemq.client.mqtt.datatypes.MqttQos;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5BlockingClient;
+import com.hivemq.client.mqtt.mqtt5.Mqtt5Client;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5Connect;
+import com.hivemq.client.mqtt.mqtt5.message.connect.Mqtt5ConnectBuilder;
+import org.apache.nifi.processors.mqtt.common.MqttConnectionProperties;
+import org.apache.nifi.processors.mqtt.common.NifiMqttCallback;
+import org.apache.nifi.processors.mqtt.common.NifiMqttClient;
+import org.apache.nifi.processors.mqtt.common.NifiMqttException;
+import org.apache.nifi.processors.mqtt.common.NifiMqttMessage;
+
+import javax.net.ssl.KeyManagerFactory;
+import javax.net.ssl.TrustManagerFactory;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.io.InputStream;
+import java.nio.ByteBuffer;
+import java.nio.CharBuffer;
+import java.nio.charset.StandardCharsets;
+import java.security.KeyStore;
+import java.security.KeyStoreException;
+import java.security.NoSuchAlgorithmException;
+import java.security.cert.CertificateException;
+import java.util.Arrays;
+import java.util.Objects;
+
+public class HiveMqV5ClientAdapter implements NifiMqttClient {
+
+    private final Mqtt5Client mqtt5Client;
+
+    private NifiMqttCallback callback;
+
+    public HiveMqV5ClientAdapter(Mqtt5BlockingClient mqtt5BlockingClient) {
+        this.mqtt5Client = mqtt5BlockingClient;
+    }
+
+    @Override
+    public boolean isConnected() {
+        return mqtt5Client.getState().isConnected();
+    }
+
+    @Override
+    public void connect(MqttConnectionProperties connectionProperties) throws NifiMqttException {
+        final Mqtt5ConnectBuilder connectBuilder = Mqtt5Connect.builder()
+                .keepAlive(connectionProperties.getKeepAliveInterval());
+
+        final boolean cleanSession = connectionProperties.isCleanSession();
+        connectBuilder.cleanStart(cleanSession);
+        if (!cleanSession) {
+            connectBuilder.sessionExpiryInterval(connectionProperties.getSessionExpiryInterval());
+        }
+
+        final String lastWillTopic = connectionProperties.getLastWillTopic();
+        if (lastWillTopic != null) {
+            connectBuilder.willPublish()
+                    .topic(lastWillTopic)
+                    .payload(connectionProperties.getLastWillMessage().getBytes())
+                    .retain(connectionProperties.getLastWillRetain())
+                    .qos(MqttQos.fromCode(connectionProperties.getLastWillQOS()))
+                    .applyWillPublish();
+        }
+
+        // checking for presence of password because username can be null
+        final char[] password = connectionProperties.getPassword();
+        if (password != null) {
+            connectBuilder.simpleAuth()
+                    .username(connectionProperties.getUsername())
+                    .password(toBytes(password))
+                    .applySimpleAuth();
+
+            clearSensitive(connectionProperties.getPassword());
+            clearSensitive(password);
+        }
+
+        final Mqtt5Connect mqtt5Connect = connectBuilder.build();
+        mqtt5Client.toBlocking().connect(mqtt5Connect);

Review Comment:
   The client is already in blocking mode (ensured by the constructor) so no further `toBlocking()` call would be needed (like in `isConnected()` method).



-- 
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.

To unsubscribe, e-mail: issues-unsubscribe@nifi.apache.org

For queries about this service, please contact Infrastructure at:
users@infra.apache.org