You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/04/06 16:43:22 UTC
[08/15] nifi git commit: NIFI-4149 - Indicate if EL is evaluated
against FFs or not - take into account input requirement for documentation
rendering - Renamed variable registry scope and added comments - Doc + change
in mock framework to check scope + u
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
index 4ceb12e..6039aa0 100644
--- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
+++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/ExtractHL7Attributes.java
@@ -41,6 +41,7 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -88,7 +89,7 @@ public class ExtractHL7Attributes extends AbstractProcessor {
.displayName("Character Encoding")
.description("The Character Encoding that is used to encode the HL7 data")
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
index ba90370..6a8df89 100644
--- a/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
+++ b/nifi-nar-bundles/nifi-hl7-bundle/nifi-hl7-processors/src/main/java/org/apache/nifi/processors/hl7/RouteHL7.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.hl7.hapi.HapiMessage;
import org.apache.nifi.hl7.model.HL7Message;
@@ -81,7 +82,7 @@ public class RouteHL7 extends AbstractProcessor {
.name("Character Encoding")
.description("The Character Encoding that is used to encode the HL7 data")
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue("UTF-8")
.build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java
index 8ad6f8a..902f215 100644
--- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java
+++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/AbstractHTMLProcessor.java
@@ -20,6 +20,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -68,7 +69,7 @@ public abstract class AbstractHTMLProcessor extends AbstractProcessor {
" when an attribute value is extracted from a HTML element.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor CSS_SELECTOR = new PropertyDescriptor
@@ -76,7 +77,7 @@ public abstract class AbstractHTMLProcessor extends AbstractProcessor {
.description("CSS selector syntax string used to extract the desired HTML element(s).")
.required(true)
.addValidator(CSS_SELECTOR_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor HTML_CHARSET = new PropertyDescriptor
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java
index 713fabd..8346f03 100644
--- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java
+++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/GetHTMLElement.java
@@ -20,6 +20,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -77,7 +78,7 @@ public class GetHTMLElement
.description("Prepends the specified value to the resulting Element")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor APPEND_ELEMENT_VALUE = new PropertyDescriptor
@@ -85,7 +86,7 @@ public class GetHTMLElement
.description("Appends the specified value to the resulting Element")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor ATTRIBUTE_KEY = new PropertyDescriptor
@@ -97,10 +98,9 @@ public class GetHTMLElement
" an absolute URL form using the specified base URL."))
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
-
public static final PropertyDescriptor OUTPUT_TYPE = new PropertyDescriptor.Builder()
.name("Output Type")
.description("Controls the type of DOM value that is retrieved from the HTML element.")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java
index 7f6e12e..24f9741 100644
--- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java
+++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/ModifyHTMLElement.java
@@ -24,6 +24,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -79,7 +80,7 @@ public class ModifyHTMLElement extends AbstractHTMLProcessor {
.description("Value to update the found HTML elements with")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor ATTRIBUTE_KEY = new PropertyDescriptor
@@ -88,7 +89,7 @@ public class ModifyHTMLElement extends AbstractHTMLProcessor {
" which attribute on the selected element will be modified with the new value."))
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
private List<PropertyDescriptor> descriptors;
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java
index bc9b70c..0d112cb 100644
--- a/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java
+++ b/nifi-nar-bundles/nifi-html-bundle/nifi-html-processors/src/main/java/org/apache/nifi/PutHTMLElement.java
@@ -22,6 +22,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -78,7 +79,7 @@ public class PutHTMLElement extends AbstractHTMLProcessor {
"encoded in the resulting output.")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
private List<PropertyDescriptor> descriptors;
@@ -133,7 +134,7 @@ public class PutHTMLElement extends AbstractHTMLProcessor {
final Elements eles;
try {
doc = parseHTMLDocumentFromFlowfile(flowFile, context, session);
- eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions().getValue());
+ eles = doc.select(context.getProperty(CSS_SELECTOR).evaluateAttributeExpressions(flowFile).getValue());
} catch (Exception ex) {
getLogger().error("Failed to extract HTML from {} due to {}; routing to {}", new Object[] {flowFile, ex.toString(), REL_INVALID_HTML.getName()}, ex);
session.transfer(flowFile, REL_INVALID_HTML);
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
index ca6136c..2ea7343 100644
--- a/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
+++ b/nifi-nar-bundles/nifi-ignite-bundle/nifi-ignite-processors/src/main/java/org/apache/nifi/processors/ignite/cache/AbstractIgniteCacheProcessor.java
@@ -23,6 +23,7 @@ import java.util.Set;
import org.apache.ignite.IgniteCache;
import org.apache.nifi.annotation.lifecycle.OnShutdown;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.expression.AttributeExpression.ResultType;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.Relationship;
@@ -56,7 +57,7 @@ public abstract class AbstractIgniteCacheProcessor extends AbstractIgniteProcess
"for determining Ignite cache key for the Flow File content")
.required(true)
.addValidator(StandardValidators.createAttributeExpressionLanguageValidator(ResultType.STRING, true))
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
/**
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java
index 13838de..cedde21 100644
--- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/AbstractInfluxDBProcessor.java
@@ -23,6 +23,7 @@ import org.apache.commons.lang3.StringUtils;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -44,7 +45,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.description("Specifies the character set of the document data.")
.required(true)
.defaultValue("UTF-8")
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.build();
@@ -54,7 +55,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.description("InfluxDB URL to connect to. Eg: http://influxdb:8086")
.defaultValue("http://localhost:8086")
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.URL_VALIDATOR)
.build();
@@ -72,7 +73,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.displayName("Database Name")
.description("InfluxDB database to connect to")
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -81,7 +82,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.displayName("Username")
.required(false)
.description("Username for accessing InfluxDB")
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
@@ -90,7 +91,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.displayName("Password")
.required(false)
.description("Password for user")
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.sensitive(true)
.build();
@@ -99,7 +100,7 @@ abstract class AbstractInfluxDBProcessor extends AbstractProcessor {
.name("influxdb-max-records-size")
.displayName("Max size of records")
.description("Maximum size of records allowed to be posted in one batch")
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("1 MB")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
index ed45025..f507768 100644
--- a/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
+++ b/nifi-nar-bundles/nifi-influxdb-bundle/nifi-influxdb-processors/src/main/java/org/apache/nifi/processors/influxdb/PutInfluxDB.java
@@ -27,6 +27,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.ProcessSession;
@@ -67,7 +68,7 @@ public class PutInfluxDB extends AbstractInfluxDBProcessor {
.description("InfluxDB consistency level")
.required(true)
.defaultValue(CONSISTENCY_LEVEL_ONE.getValue())
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.allowableValues(CONSISTENCY_LEVEL_ONE, CONSISTENCY_LEVEL_ANY, CONSISTENCY_LEVEL_ALL, CONSISTENCY_LEVEL_QUORUM)
.build();
@@ -77,7 +78,7 @@ public class PutInfluxDB extends AbstractInfluxDBProcessor {
.description("Retention policy for the saving the records")
.defaultValue("autogen")
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
index 5c822be..e9c24d5 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/cf/JMSConnectionFactoryProvider.java
@@ -39,6 +39,7 @@ import org.apache.nifi.annotation.lifecycle.OnEnabled;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.controller.AbstractControllerService;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.ssl.SSLContextService;
@@ -88,7 +89,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
+ "class (i.e., org.apache.activemq.ActiveMQConnectionFactory)")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor CLIENT_LIB_DIR_PATH = new PropertyDescriptor.Builder()
.name(CF_LIB)
@@ -98,7 +99,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
+ "ConnectionFactory implementation.")
.addValidator(new ClientLibValidator())
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
// ConnectionFactory specific properties
@@ -109,7 +110,7 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
+ "'tcp://myhost:61616' for ActiveMQ or 'myhost:1414' for IBM MQ")
.addValidator(new NonEmptyBrokerURIValidator())
.required(true)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
@@ -133,7 +134,9 @@ public class JMSConnectionFactoryProvider extends AbstractControllerService impl
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName
+ "' property to be set on the provided ConnectionFactory implementation.")
- .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+ .name(propertyDescriptorName)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .dynamic(true)
.build();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
index 1ac468c..1b8a8f3 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/AbstractJMSProcessor.java
@@ -28,6 +28,7 @@ import javax.jms.ConnectionFactory;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProviderDefinition;
import org.apache.nifi.processor.AbstractProcessor;
@@ -73,7 +74,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.description("The name of the JMS Destination. Usually provided by the administrator (e.g., 'topic://myTopic' or 'myTopic').")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor DESTINATION_TYPE = new PropertyDescriptor.Builder()
.name("Destination Type")
@@ -90,7 +91,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
"Please see JMS spec for further details")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor SESSION_CACHE_SIZE = new PropertyDescriptor.Builder()
.name("Session Cache size")
@@ -114,7 +115,7 @@ abstract class AbstractJMSProcessor<T extends JMSWorker> extends AbstractProcess
.required(true)
.addValidator(StandardValidators.CHARACTER_SET_VALIDATOR)
.defaultValue(Charset.defaultCharset().name())
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
index e3e9a75..b7103ff 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/ConsumeJMS.java
@@ -36,6 +36,7 @@ import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.jms.cf.JMSConnectionFactoryProvider;
import org.apache.nifi.jms.processors.JMSConsumer.ConsumerCallback;
@@ -106,7 +107,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.description("If destination is Topic if present then make it the consumer durable. " +
"@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createDurableConsumer-javax.jms.Topic-java.lang.String-")
.required(false)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -116,7 +117,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.description("If destination is Topic if present then make it the consumer shared. " +
"@see https://docs.oracle.com/javaee/7/api/javax/jms/Session.html#createSharedConsumer-javax.jms.Topic-java.lang.String-")
.required(false)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("false")
.allowableValues("true", "false")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -126,7 +127,7 @@ public class ConsumeJMS extends AbstractJMSProcessor<JMSConsumer> {
.description("The name of the subscription to use if destination is Topic and is shared or durable.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final Relationship REL_SUCCESS = new Relationship.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
index c555fd7..f58b9cf 100644
--- a/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
+++ b/nifi-nar-bundles/nifi-jms-bundle/nifi-jms-processors/src/main/java/org/apache/nifi/jms/processors/PublishJMS.java
@@ -128,7 +128,7 @@ public class PublishJMS extends AbstractJMSProcessor<JMSPublisher> {
break;
}
processSession.transfer(flowFile, REL_SUCCESS);
- processSession.getProvenanceReporter().send(flowFile, context.getProperty(DESTINATION).evaluateAttributeExpressions().getValue());
+ processSession.getProvenanceReporter().send(flowFile, destinationName);
} catch (Exception e) {
processSession.transfer(flowFile, REL_FAILURE);
this.getLogger().error("Failed while sending message to JMS via " + publisher, e);
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
index 3c0bddc..f0026a4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
@@ -44,6 +44,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -87,7 +88,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor {
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
@@ -104,7 +105,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor {
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -113,7 +114,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor {
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -123,7 +124,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor {
.description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index f6425de..cab15de 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -43,6 +43,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -87,7 +88,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
@@ -105,7 +106,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
@@ -132,7 +133,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.displayName("Message Demarcator")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains "
+ "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use "
+ "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index d835607..5b65b0d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
@@ -76,7 +77,7 @@ final class KafkaProcessorUtils {
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("localhost:9092")
.build();
static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
@@ -84,7 +85,7 @@ final class KafkaProcessorUtils {
.displayName("Security Protocol")
.description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
.required(true)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
.defaultValue(SEC_PLAINTEXT.getValue())
.build();
@@ -96,7 +97,7 @@ final class KafkaProcessorUtils {
+ "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
.name("sasl.kerberos.principal")
@@ -105,7 +106,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
.name("sasl.kerberos.keytab")
@@ -114,7 +115,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
index b86f60b..e670c7d 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_10.java
@@ -43,6 +43,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -104,7 +105,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
@@ -112,7 +113,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -121,7 +122,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -130,7 +131,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
.displayName("Message Key Field")
.description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
@@ -139,7 +140,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
@@ -151,7 +152,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("5 sec")
.build();
@@ -161,7 +162,7 @@ public class PublishKafkaRecord_0_10 extends AbstractProcessor {
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 secs")
.build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
index c9d0f37..5a2716a 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_10.java
@@ -45,6 +45,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -100,7 +101,7 @@ public class PublishKafka_0_10 extends AbstractProcessor {
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
@@ -108,7 +109,7 @@ public class PublishKafka_0_10 extends AbstractProcessor {
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
@@ -120,7 +121,7 @@ public class PublishKafka_0_10 extends AbstractProcessor {
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("5 sec")
.build();
@@ -130,7 +131,7 @@ public class PublishKafka_0_10 extends AbstractProcessor {
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 secs")
.build();
@@ -154,7 +155,7 @@ public class PublishKafka_0_10 extends AbstractProcessor {
+ "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
@@ -171,7 +172,7 @@ public class PublishKafka_0_10 extends AbstractProcessor {
.displayName("Message Demarcator")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
+ "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
+ "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
index c4b0920..2b202df 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_11.java
@@ -45,6 +45,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -89,7 +90,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor {
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
@@ -106,7 +107,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor {
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -115,7 +116,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor {
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -125,7 +126,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor {
.description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
@@ -167,7 +168,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor {
+ "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
+ "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
+ "for the producer to finish its entire transaction instead of pulling as the messages become available.")
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
@@ -191,7 +192,7 @@ public class ConsumeKafkaRecord_0_11 extends AbstractProcessor {
+ "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
index a8d0457..29c9ee3 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_11.java
@@ -44,6 +44,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -88,7 +89,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor {
.description("The name of the Kafka Topic(s) to pull from. More than one can be supplied if comma separated.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
@@ -106,7 +107,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor {
.description("A Group ID is used to identify consumers that are within the same consumer group. Corresponds to Kafka's 'group.id' property.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
@@ -133,7 +134,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor {
.displayName("Message Demarcator")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.description("Since KafkaConsumer receives messages in batches, you have an option to output FlowFiles which contains "
+ "all Kafka messages in a single batch for a given topic and partition and this property allows you to provide a string (interpreted as UTF-8) to use "
+ "for demarcating apart multiple Kafka messages. This is an optional property and if not provided each Kafka message received "
@@ -150,7 +151,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor {
+ "\".*\" if messages are expected to have header values that are unique per message, such as an identifier or timestamp, because it will prevent NiFi from bundling "
+ "the messages together efficiently.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
@@ -184,7 +185,7 @@ public class ConsumeKafka_0_11 extends AbstractProcessor {
+ "read_uncomitted. This means that messages will be received as soon as they are written to Kafka but will be pulled, even if the producer cancels the transactions. If "
+ "this value is true, NiFi will not receive any messages for which the producer's transaction was canceled, but this can result in some latency since the consumer must wait "
+ "for the producer to finish its entire transaction instead of pulling as the messages become available.")
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
index e88f3da..fa7ebcb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/KafkaProcessorUtils.java
@@ -41,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.kerberos.KerberosCredentialsService;
import org.apache.nifi.processor.ProcessContext;
import org.apache.nifi.processor.util.StandardValidators;
@@ -76,7 +77,7 @@ final class KafkaProcessorUtils {
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("localhost:9092")
.build();
static final PropertyDescriptor SECURITY_PROTOCOL = new PropertyDescriptor.Builder()
@@ -84,7 +85,7 @@ final class KafkaProcessorUtils {
.displayName("Security Protocol")
.description("Protocol used to communicate with brokers. Corresponds to Kafka's 'security.protocol' property.")
.required(true)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(SEC_PLAINTEXT, SEC_SSL, SEC_SASL_PLAINTEXT, SEC_SASL_SSL)
.defaultValue(SEC_PLAINTEXT.getValue())
.build();
@@ -96,7 +97,7 @@ final class KafkaProcessorUtils {
+ "It is ignored unless one of the SASL options of the <Security Protocol> are selected.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor USER_PRINCIPAL = new PropertyDescriptor.Builder()
.name("sasl.kerberos.principal")
@@ -105,7 +106,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor USER_KEYTAB = new PropertyDescriptor.Builder()
.name("sasl.kerberos.keytab")
@@ -114,7 +115,7 @@ final class KafkaProcessorUtils {
+ "in the JVM properties defined in the bootstrap.conf file. This principal will be set into 'sasl.jaas.config' Kafka's property.")
.required(false)
.addValidator(StandardValidators.FILE_EXISTS_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
static final PropertyDescriptor SSL_CONTEXT_SERVICE = new PropertyDescriptor.Builder()
.name("ssl.context.service")
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
index 1521bfa..8bbec17 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_0_11.java
@@ -46,6 +46,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -107,7 +108,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
@@ -115,7 +116,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
.displayName("Record Reader")
.description("The Record Reader to use for incoming FlowFiles")
.identifiesControllerService(RecordReaderFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -124,7 +125,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
.displayName("Record Writer")
.description("The Record Writer to use in order to serialize the data before sending to Kafka")
.identifiesControllerService(RecordSetWriterFactory.class)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.build();
@@ -133,7 +134,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
.displayName("Message Key Field")
.description("The name of a field in the Input Records that should be used as the Key for the Kafka message.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
@@ -142,7 +143,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
@@ -154,7 +155,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("5 sec")
.build();
@@ -164,7 +165,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 secs")
.build();
@@ -204,7 +205,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
+ "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder()
@@ -214,7 +215,7 @@ public class PublishKafkaRecord_0_11 extends AbstractProcessor {
+ "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
+ "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
+ "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java
index 130954a..5efe913 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-11-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_0_11.java
@@ -47,6 +47,7 @@ import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.DataUnit;
@@ -102,7 +103,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
.description("The name of the Kafka Topic to publish to.")
.required(true)
.addValidator(StandardValidators.NON_BLANK_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
@@ -110,7 +111,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
.displayName("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka. Corresponds to Kafka's 'acks' property.")
.required(true)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
@@ -122,7 +123,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
+ "entire 'send' call. Corresponds to Kafka's 'max.block.ms' property")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.defaultValue("5 sec")
.build();
@@ -132,7 +133,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
.description("After sending a message to Kafka, this indicates the amount of time that we are willing to wait for a response from Kafka. "
+ "If Kafka does not acknowledge the message within this time period, the FlowFile will be routed to 'failure'.")
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(true)
.defaultValue("5 secs")
.build();
@@ -156,7 +157,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
+ "data loss on Kafka. During a topic compaction on Kafka, messages will be deduplicated based on this key.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
static final PropertyDescriptor KEY_ATTRIBUTE_ENCODING = new PropertyDescriptor.Builder()
@@ -173,7 +174,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
.displayName("Message Demarcator")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.description("Specifies the string (interpreted as UTF-8) to use for demarcating multiple messages within "
+ "a single FlowFile. If not specified, the entire content of the FlowFile will be used as a single message. If specified, the "
+ "contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka message. "
@@ -206,7 +207,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
+ "Any attribute whose name matches the regex will be added to the Kafka messages as a Header. "
+ "If not specified, no FlowFile attributes will be added as headers.")
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.required(false)
.build();
static final PropertyDescriptor USE_TRANSACTIONS = new PropertyDescriptor.Builder()
@@ -216,7 +217,7 @@ public class PublishKafka_0_11 extends AbstractProcessor {
+ "and this property is set to false, then the messages that have already been sent to Kafka will continue on and be delivered to consumers. "
+ "If this is set to true, then the Kafka transaction will be rolled back so that those messages are not available to consumers. Setting this to true "
+ "requires that the <Delivery Guarantee> property be set to \"Guarantee Replicated Delivery.\"")
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues("true", "false")
.defaultValue("true")
.required(true)
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index b1eadcf..e01afdd 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -50,6 +50,7 @@ import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.lifecycle.OnStopped;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.AbstractProcessor;
import org.apache.nifi.processor.ProcessContext;
@@ -93,14 +94,14 @@ public class GetKafka extends AbstractProcessor {
+ " combinations. For example, host1:2181,host2:2181,host3:2188")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name")
.description("The Kafka Topic to pull messages from")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
.name("Zookeeper Commit Frequency")
@@ -108,7 +109,7 @@ public class GetKafka extends AbstractProcessor {
+ " result in better overall performance but can result in more data duplication if a NiFi node is lost")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("60 secs")
.build();
public static final PropertyDescriptor ZOOKEEPER_TIMEOUT = new PropertyDescriptor.Builder()
@@ -116,7 +117,7 @@ public class GetKafka extends AbstractProcessor {
.description("The amount of time to wait for a response from ZooKeeper before determining that there is a communications error")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("30 secs")
.build();
public static final PropertyDescriptor KAFKA_TIMEOUT = new PropertyDescriptor.Builder()
@@ -124,7 +125,7 @@ public class GetKafka extends AbstractProcessor {
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("30 secs")
.build();
public static final PropertyDescriptor BATCH_SIZE = new PropertyDescriptor.Builder()
@@ -134,7 +135,7 @@ public class GetKafka extends AbstractProcessor {
+ "If the messages from Kafka should not be concatenated together, leave this value at 1.")
.required(true)
.addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("1")
.build();
public static final PropertyDescriptor MESSAGE_DEMARCATOR = new PropertyDescriptor.Builder()
@@ -144,7 +145,7 @@ public class GetKafka extends AbstractProcessor {
+ "this value will be placed in between them.")
.required(true)
.addValidator(Validator.VALID) // accept anything as a demarcator, including empty string
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("\\n")
.build();
@@ -153,14 +154,14 @@ public class GetKafka extends AbstractProcessor {
.description("Client Name to use when communicating with Kafka")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
.name("Group ID")
.description("A Group ID is used to identify consumers that are within the same consumer group")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/4c787799/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index ab24e57..be93736 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -38,6 +38,7 @@ import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.processor.DataUnit;
import org.apache.nifi.processor.ProcessContext;
@@ -112,14 +113,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
.required(true)
.addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
.build();
public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
.name("Topic Name")
.description("The Kafka Topic of interest")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
/**
* @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
@@ -137,7 +138,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
+ "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, "
+ "then the FlowFile will be routed to failure relationship.")
.addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.required(false)
.build();
public static final PropertyDescriptor KEY = new PropertyDescriptor.Builder()
@@ -145,12 +146,12 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.description("The Key to use for the Message")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor DELIVERY_GUARANTEE = new PropertyDescriptor.Builder()
.name("Delivery Guarantee")
.description("Specifies the requirement for guaranteeing that a message is sent to Kafka").required(true)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.allowableValues(DELIVERY_BEST_EFFORT, DELIVERY_ONE_NODE, DELIVERY_REPLICATED)
.defaultValue(DELIVERY_BEST_EFFORT.getValue())
.build();
@@ -165,14 +166,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
+ "sent to the 'failure' relationship.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.FLOWFILE_ATTRIBUTES)
.build();
public static final PropertyDescriptor MAX_BUFFER_SIZE = new PropertyDescriptor.Builder()
.name("Max Buffer Size")
.description("The maximum amount of data to buffer in memory before sending to Kafka")
.required(true)
.addValidator(StandardValidators.DATA_SIZE_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("5 MB")
.build();
static final PropertyDescriptor MAX_RECORD_SIZE = new PropertyDescriptor.Builder()
@@ -187,14 +188,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.description("The amount of time to wait for a response from Kafka before determining that there is a communications error")
.required(true)
.addValidator(StandardValidators.TIME_PERIOD_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.defaultValue("30 secs").build();
public static final PropertyDescriptor CLIENT_NAME = new PropertyDescriptor.Builder()
.name("Client Name")
.description("Client Name to use when communicating with Kafka")
.required(true)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
- .expressionLanguageSupported(false)
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
.build();
public static final PropertyDescriptor BATCH_NUM_MESSAGES = new PropertyDescriptor.Builder()
.name("Async Batch Size")