You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/08/09 17:27:50 UTC
nifi git commit: NIFI-5489: Add expression language support to AMQP
processors HOST, VHOST and USER Fields.
Repository: nifi
Updated Branches:
refs/heads/master 32ee552ad -> 3731cc855
NIFI-5489: Add expression language support to AMQP processors HOST, VHOST and USER Fields.
This closes #2936
Signed-off-by: zenfenan <ze...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3731cc85
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3731cc85
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3731cc85
Branch: refs/heads/master
Commit: 3731cc855849c6bc971909f561be56138f13d182
Parents: 32ee552
Author: Daniel Jimenez <da...@gmail.com>
Authored: Sat Aug 4 11:11:28 2018 -0500
Committer: zenfenan <si...@gmail.com>
Committed: Thu Aug 9 22:57:23 2018 +0530
----------------------------------------------------------------------
.../amqp/processors/AbstractAMQPProcessor.java | 21 ++++++++++++--------
1 file changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3731cc85/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index 18e176c..2a83497 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.authentication.exception.ProviderCreationException;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -56,27 +57,31 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
.description("Network address of AMQP broker (e.g., localhost)")
.required(true)
.defaultValue("localhost")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
.name("Port")
.description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
.required(true)
.defaultValue("5672")
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.PORT_VALIDATOR)
.build();
public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
.name("Virtual Host")
.description("Virtual Host name which segregates AMQP system for enhanced security.")
.required(false)
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
.name("User Name")
.description("User Name used for authentication and authorization.")
.required(true)
.defaultValue("guest")
- .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
.build();
public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
.name("Password")
@@ -145,7 +150,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
/**
* Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the
- * implementation of {@link #processResource(ProcessContext, ProcessSession)} method for further processing.
+ * implementation of {@link #processResource} method for further processing.
*/
@Override
public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@@ -204,12 +209,12 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
protected Connection createConnection(ProcessContext context) {
final ConnectionFactory cf = new ConnectionFactory();
- cf.setHost(context.getProperty(HOST).getValue());
- cf.setPort(Integer.parseInt(context.getProperty(PORT).getValue()));
- cf.setUsername(context.getProperty(USER).getValue());
+ cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+ cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
+ cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
cf.setPassword(context.getProperty(PASSWORD).getValue());
- final String vHost = context.getProperty(V_HOST).getValue();
+ final String vHost = context.getProperty(V_HOST).evaluateAttributeExpressions().getValue();
if (vHost != null) {
cf.setVirtualHost(vHost);
}