You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ze...@apache.org on 2018/08/09 17:27:50 UTC

nifi git commit: NIFI-5489: Add expression language support to AMQP processors HOST, VHOST and USER Fields.

Repository: nifi
Updated Branches:
  refs/heads/master 32ee552ad -> 3731cc855


NIFI-5489: Add expression language support to AMQP processors HOST, VHOST and USER Fields.

This closes #2936

Signed-off-by: zenfenan <ze...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3731cc85
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3731cc85
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3731cc85

Branch: refs/heads/master
Commit: 3731cc855849c6bc971909f561be56138f13d182
Parents: 32ee552
Author: Daniel Jimenez <da...@gmail.com>
Authored: Sat Aug 4 11:11:28 2018 -0500
Committer: zenfenan <si...@gmail.com>
Committed: Thu Aug 9 22:57:23 2018 +0530

----------------------------------------------------------------------
 .../amqp/processors/AbstractAMQPProcessor.java  | 21 ++++++++++++--------
 1 file changed, 13 insertions(+), 8 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3731cc85/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
index 18e176c..2a83497 100644
--- a/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
+++ b/nifi-nar-bundles/nifi-amqp-bundle/nifi-amqp-processors/src/main/java/org/apache/nifi/amqp/processors/AbstractAMQPProcessor.java
@@ -28,6 +28,7 @@ import org.apache.commons.lang3.StringUtils;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
 import org.apache.nifi.authentication.exception.ProviderCreationException;
 import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -56,27 +57,31 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
             .description("Network address of AMQP broker (e.g., localhost)")
             .required(true)
             .defaultValue("localhost")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
     public static final PropertyDescriptor PORT = new PropertyDescriptor.Builder()
             .name("Port")
             .description("Numeric value identifying Port of AMQP broker (e.g., 5671)")
             .required(true)
             .defaultValue("5672")
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .addValidator(StandardValidators.PORT_VALIDATOR)
             .build();
     public static final PropertyDescriptor V_HOST = new PropertyDescriptor.Builder()
             .name("Virtual Host")
             .description("Virtual Host name which segregates AMQP system for enhanced security.")
             .required(false)
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
     public static final PropertyDescriptor USER = new PropertyDescriptor.Builder()
             .name("User Name")
             .description("User Name used for authentication and authorization.")
             .required(true)
             .defaultValue("guest")
-            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(StandardValidators.NON_EMPTY_EL_VALIDATOR)
             .build();
     public static final PropertyDescriptor PASSWORD = new PropertyDescriptor.Builder()
             .name("Password")
@@ -145,7 +150,7 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
 
     /**
      * Will builds target resource ({@link AMQPPublisher} or {@link AMQPConsumer}) upon first invocation and will delegate to the
-     * implementation of {@link #processResource(ProcessContext, ProcessSession)} method for further processing.
+     * implementation of {@link #processResource} method for further processing.
      */
     @Override
     public final void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
@@ -204,12 +209,12 @@ abstract class AbstractAMQPProcessor<T extends AMQPWorker> extends AbstractProce
 
     protected Connection createConnection(ProcessContext context) {
         final ConnectionFactory cf = new ConnectionFactory();
-        cf.setHost(context.getProperty(HOST).getValue());
-        cf.setPort(Integer.parseInt(context.getProperty(PORT).getValue()));
-        cf.setUsername(context.getProperty(USER).getValue());
+        cf.setHost(context.getProperty(HOST).evaluateAttributeExpressions().getValue());
+        cf.setPort(Integer.parseInt(context.getProperty(PORT).evaluateAttributeExpressions().getValue()));
+        cf.setUsername(context.getProperty(USER).evaluateAttributeExpressions().getValue());
         cf.setPassword(context.getProperty(PASSWORD).getValue());
 
-        final String vHost = context.getProperty(V_HOST).getValue();
+        final String vHost = context.getProperty(V_HOST).evaluateAttributeExpressions().getValue();
         if (vHost != null) {
             cf.setVirtualHost(vHost);
         }