You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mt...@apache.org on 2018/08/30 11:08:59 UTC

nifi git commit: NIFI-5388: enabled EL support for dynamic properties of Kafka 1.0 processors

Repository: nifi
Updated Branches:
  refs/heads/master c755ed932 -> 89186fb96


NIFI-5388: enabled EL support for dynamic properties of Kafka 1.0 processors

This closes #2915

Signed-off-by: Mike Thomsen <mi...@gmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/89186fb9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/89186fb9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/89186fb9

Branch: refs/heads/master
Commit: 89186fb96d0bd1367f234d36f8c45e130e6aad4b
Parents: c755ed9
Author: Corey Fritz <co...@snagajob.com>
Authored: Mon Jul 16 21:06:01 2018 -0400
Committer: Mike Thomsen <mi...@gmail.com>
Committed: Thu Aug 30 07:08:00 2018 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java | 8 ++++++--
 .../nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java       | 8 ++++++--
 .../nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java | 4 +++-
 .../nifi/processors/kafka/pubsub/PublishKafka_1_0.java       | 4 +++-
 4 files changed, 18 insertions(+), 6 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/89186fb9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
index 6b05a03..9a831db 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_1_0.java
@@ -74,7 +74,8 @@ import org.apache.nifi.serialization.RecordSetWriterFactory;
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
         description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
-        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
+        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 @SeeAlso({ConsumeKafka_1_0.class, PublishKafka_1_0.class, PublishKafkaRecord_1_0.class})
 public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
 
@@ -264,7 +265,10 @@ public class ConsumeKafkaRecord_1_0 extends AbstractProcessor {
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
-                .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
+                .name(propertyDescriptorName)
+                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
+                .dynamic(true)
+                .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                 .build();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/89186fb9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
index b175eb5..a5f4286 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_1_0.java
@@ -70,7 +70,8 @@ import static org.apache.nifi.processors.kafka.pubsub.KafkaProcessorUtils.UTF8_E
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
         description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
-        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
+        expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 public class ConsumeKafka_1_0 extends AbstractProcessor {
 
     static final AllowableValue OFFSET_EARLIEST = new AllowableValue("earliest", "earliest", "Automatically reset the offset to the earliest offset");
@@ -253,7 +254,10 @@ public class ConsumeKafka_1_0 extends AbstractProcessor {
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
-                .name(propertyDescriptorName).addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class)).dynamic(true)
+                .name(propertyDescriptorName)
+                .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ConsumerConfig.class))
+                .dynamic(true)
+                .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
                 .build();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/89186fb9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
index d0f368d..82a3918 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafkaRecord_1_0.java
@@ -73,7 +73,8 @@ import org.apache.nifi.serialization.record.RecordSet;
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
     description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
     + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
-    + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+    + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
+    expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
     + "FlowFiles that are routed to success.")
 @SeeAlso({PublishKafka_1_0.class, ConsumeKafka_1_0.class, ConsumeKafkaRecord_1_0.class})
@@ -293,6 +294,7 @@ public class PublishKafkaRecord_1_0 extends AbstractProcessor {
             .name(propertyDescriptorName)
             .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
             .dynamic(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/89186fb9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
index d7e910d..477aefd 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-1-0-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/PublishKafka_1_0.java
@@ -68,7 +68,8 @@ import org.apache.nifi.processor.util.StandardValidators;
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
     description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
         + " In the event a dynamic property represents a property that was already set, its value will be ignored and WARN message logged."
-        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ")
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration. ",
+    expressionLanguageScope = ExpressionLanguageScope.VARIABLE_REGISTRY)
 @WritesAttribute(attribute = "msg.count", description = "The number of messages that were sent to Kafka for this FlowFile. This attribute is added only to "
     + "FlowFiles that are routed to success. If the <Message Demarcator> Property is not set, this will always be 1, but if the Property is set, it may "
     + "be greater than 1.")
@@ -289,6 +290,7 @@ public class PublishKafka_1_0 extends AbstractProcessor {
             .name(propertyDescriptorName)
             .addValidator(new KafkaProcessorUtils.KafkaConfigValidator(ProducerConfig.class))
             .dynamic(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
             .build();
     }