You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ex...@apache.org on 2021/04/14 21:09:06 UTC
[nifi] branch main updated: NIFI-8409 - ConsumeMQTT Processor
Broker URI and Username Expression Language
This is an automated email from the ASF dual-hosted git repository.
exceptionfactory pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/main by this push:
new 0f8789b NIFI-8409 - ConsumeMQTT Processor Broker URI and Username Expression Language
0f8789b is described below
commit 0f8789b8b07df40336386b2831721f1e0c4a497b
Author: Pierre Villard <pi...@gmail.com>
AuthorDate: Fri Apr 9 17:27:22 2021 +0200
NIFI-8409 - ConsumeMQTT Processor Broker URI and Username Expression Language
This closes #4984
Signed-off-by: David Handermann <ex...@apache.org>
---
.../apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java | 8 +++++---
.../java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java | 3 ++-
2 files changed, 7 insertions(+), 4 deletions(-)
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
index 3d34d94..a83bcc5 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/main/java/org/apache/nifi/processors/mqtt/common/AbstractMQTTProcessor.java
@@ -119,6 +119,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.description("The URI to use to connect to the MQTT broker (e.g. tcp://localhost:1883). The 'tcp', 'ssl', 'ws' and 'wss' schemes are supported. In order to use 'ssl', the SSL Context " +
"Service property must be set.")
.required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(BROKER_VALIDATOR)
.build();
@@ -135,6 +136,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
.name("Username")
.description("Username to use when connecting to the broker")
.required(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -275,7 +277,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
}
try {
- URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).getValue());
+ URI brokerURI = new URI(validationContext.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue());
if (brokerURI.getScheme().equalsIgnoreCase("ssl") && !validationContext.getProperty(PROP_SSL_CONTEXT_SERVICE).isSet()) {
results.add(new ValidationResult.Builder().subject(PROP_SSL_CONTEXT_SERVICE.getName() + " or " + PROP_BROKER_URI.getName()).valid(false).explanation("if the 'ssl' scheme is used in " +
"the broker URI, the SSL Context Service must be set.").build());
@@ -314,7 +316,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
}
protected void onScheduled(final ProcessContext context){
- broker = context.getProperty(PROP_BROKER_URI).getValue();
+ broker = context.getProperty(PROP_BROKER_URI).evaluateAttributeExpressions().getValue();
brokerUri = broker.endsWith("/") ? broker : broker + "/";
clientID = context.getProperty(PROP_CLIENTID).evaluateAttributeExpressions().getValue();
@@ -345,7 +347,7 @@ public abstract class AbstractMQTTProcessor extends AbstractSessionFactoryProces
PropertyValue usernameProp = context.getProperty(PROP_USERNAME);
if(usernameProp.isSet()) {
- connOpts.setUserName(usernameProp.getValue());
+ connOpts.setUserName(usernameProp.evaluateAttributeExpressions().getValue());
connOpts.setPassword(context.getProperty(PROP_PASSWORD).getValue().toCharArray());
}
}
diff --git a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
index d8c86d6..cf66d1a 100644
--- a/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
+++ b/nifi-nar-bundles/nifi-mqtt-bundle/nifi-mqtt-processors/src/test/java/org/apache/nifi/processors/mqtt/TestConsumeMQTT.java
@@ -82,7 +82,8 @@ public class TestConsumeMQTT extends TestConsumeMqttCommon {
public void testSSLContextServiceTruststoreOnly() throws InitializationException {
String brokerURI = "ssl://localhost:8883";
TestRunner runner = TestRunners.newTestRunner(ConsumeMQTT.class);
- runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, brokerURI);
+ runner.setVariable("brokerURI", brokerURI);
+ runner.setProperty(ConsumeMQTT.PROP_BROKER_URI, "${brokerURI}");
runner.setProperty(ConsumeMQTT.PROP_CLIENTID, "TestClient");
runner.setProperty(ConsumeMQTT.PROP_TOPIC_FILTER, "testTopic");
runner.setProperty(ConsumeMQTT.PROP_MAX_QUEUE_SIZE, "100");