You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/05/19 19:52:11 UTC

nifi git commit: NIFI-3854 This closes #1773. Expand expression language support for Kafka processors

Repository: nifi
Updated Branches:
  refs/heads/master 9294a2613 -> 58ce52d5d


NIFI-3854 This closes #1773. Expand expression language support for Kafka processors

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/58ce52d5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/58ce52d5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/58ce52d5

Branch: refs/heads/master
Commit: 58ce52d5d64d0599110c5b66addb1b093246f157
Parents: 9294a26
Author: Tim Reardon <te...@gmail.com>
Authored: Tue May 9 12:59:35 2017 -0400
Committer: joewitt <jo...@apache.org>
Committed: Fri May 19 15:51:23 2017 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/ConsumeKafkaRecord_0_10.java   |  2 +-
 .../kafka/pubsub/ConsumeKafka_0_10.java         |  2 +-
 .../kafka/pubsub/ConsumeKafkaTest.java          |  3 ---
 .../pubsub/TestConsumeKafkaRecord_0_10.java     |  3 ---
 .../apache/nifi/processors/kafka/GetKafka.java  | 22 ++++++++++----------
 .../apache/nifi/processors/kafka/PutKafka.java  |  6 +++---
 .../processors/kafka/pubsub/ConsumeKafka.java   |  2 +-
 .../kafka/pubsub/ConsumeKafkaTest.java          |  2 --
 8 files changed, 17 insertions(+), 25 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
index 0882870..c44e25e 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaRecord_0_10.java
@@ -253,7 +253,7 @@ public class ConsumeKafkaRecord_0_10 extends AbstractProcessor {
         final String topicType = context.getProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
         final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
 
         final RecordReaderFactory readerFactory = context.getProperty(RECORD_READER).asControllerService(RecordReaderFactory.class);
         final RecordSetWriterFactory writerFactory = context.getProperty(RECORD_WRITER).asControllerService(RecordSetWriterFactory.class);

http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index f678fa3..84e9da6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -248,7 +248,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         final List<String> topics = new ArrayList<>();
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
         final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
         if (topicType.equals(TOPIC_NAME.getValue())) {
           for (final String topic : topicListing.split(",", 100)) {
               final String trimmedName = topic.trim();

http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index cc524dd..bd93e3b 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -114,7 +114,6 @@ public class ConsumeKafkaTest {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
@@ -145,7 +144,6 @@ public class ConsumeKafkaTest {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
         runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
@@ -177,7 +175,6 @@ public class ConsumeKafkaTest {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);

http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
index da63877..3da74e4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/TestConsumeKafkaRecord_0_10.java
@@ -132,7 +132,6 @@ public class TestConsumeKafkaRecord_0_10 {
         when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
         when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);
@@ -155,7 +154,6 @@ public class TestConsumeKafkaRecord_0_10 {
         when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
         when(mockLease.commit()).thenReturn(Boolean.TRUE);
 
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "(fo.*)|(ba)");
         runner.setProperty(ConsumeKafkaRecord_0_10.TOPIC_TYPE, "pattern");
         runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
@@ -179,7 +177,6 @@ public class TestConsumeKafkaRecord_0_10 {
         when(mockLease.continuePolling()).thenReturn(true, false);
         when(mockLease.commit()).thenReturn(Boolean.FALSE);
 
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(ConsumeKafkaRecord_0_10.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafkaRecord_0_10.GROUP_ID, groupName);
         runner.setProperty(ConsumeKafkaRecord_0_10.AUTO_OFFSET_RESET, ConsumeKafkaRecord_0_10.OFFSET_EARLIEST);

http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 84d6b89..b1eadcf 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -93,14 +93,14 @@ public class GetKafka extends AbstractProcessor {
                     + " combinations. For example, host1:2181,host2:2181,host3:2188")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
             .name("Topic Name")
             .description("The Kafka Topic to pull messages from")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor ZOOKEEPER_COMMIT_DELAY = new PropertyDescriptor.Builder()
             .name("Zookeeper Commit Frequency")
@@ -160,7 +160,7 @@ public class GetKafka extends AbstractProcessor {
             .description("A Group ID is used to identify consumers that are within the same consumer group")
             .required(true)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
 
     public static final PropertyDescriptor AUTO_OFFSET_RESET = new PropertyDescriptor.Builder()
@@ -218,11 +218,11 @@ public class GetKafka extends AbstractProcessor {
     }
 
     public void createConsumers(final ProcessContext context) {
-        final String topic = context.getProperty(TOPIC).getValue();
+        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
 
         final Properties props = new Properties();
-        props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
-        props.setProperty("group.id", context.getProperty(GROUP_ID).getValue());
+        props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue());
+        props.setProperty("group.id", context.getProperty(GROUP_ID).evaluateAttributeExpressions().getValue());
         props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
         props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
         props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue());
@@ -257,7 +257,7 @@ public class GetKafka extends AbstractProcessor {
         }
 
         int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
-                context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
+                context.getProperty(ZOOKEEPER_CONNECTION_STRING).evaluateAttributeExpressions().getValue(), context.getProperty(TOPIC).evaluateAttributeExpressions().getValue());
 
         final ConsumerConfig consumerConfig = new ConsumerConfig(props);
         consumer = Consumer.createJavaConsumerConnector(consumerConfig);
@@ -267,12 +267,12 @@ public class GetKafka extends AbstractProcessor {
         int concurrentTaskToUse = context.getMaxConcurrentTasks();
         if (context.getMaxConcurrentTasks() < partitionCount){
             this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
-                    + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
+                    + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).evaluateAttributeExpressions().getValue() + "'. "
                 + "Consider making it equal to the amount of partition count for most efficient event consumption.");
         } else if (context.getMaxConcurrentTasks() > partitionCount){
             concurrentTaskToUse = partitionCount;
             this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
-                    + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
+                    + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).evaluateAttributeExpressions().getValue() + "'. "
                 + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events");
         }
 
@@ -400,7 +400,7 @@ public class GetKafka extends AbstractProcessor {
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
         final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
         final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
-        final String topic = context.getProperty(TOPIC).getValue();
+        final String topic = context.getProperty(TOPIC).evaluateAttributeExpressions().getValue();
 
         FlowFile flowFile = session.create();
 
@@ -481,4 +481,4 @@ public class GetKafka extends AbstractProcessor {
             session.transfer(flowFile, REL_SUCCESS);
         }
     }
-}
\ No newline at end of file
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 8a4c5d5..ab24e57 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-8-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -112,7 +112,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
             .description("A comma-separated list of known Kafka Brokers in the format <host>:<port>")
             .required(true)
             .addValidator(StandardValidators.HOSTNAME_PORT_LIST_VALIDATOR)
-            .expressionLanguageSupported(false)
+            .expressionLanguageSupported(true)
             .build();
     public static final PropertyDescriptor TOPIC = new PropertyDescriptor.Builder()
             .name("Topic Name")
@@ -288,7 +288,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
             flowFile = this.doRendezvousWithKafka(flowFile, context, session);
             if (!this.isFailedFlowFile(flowFile)) {
                 session.getProvenanceReporter().send(flowFile,
-                        context.getProperty(SEED_BROKERS).getValue() + "/"
+                        context.getProperty(SEED_BROKERS).evaluateAttributeExpressions(flowFile).getValue() + "/"
                         + context.getProperty(TOPIC).evaluateAttributeExpressions(flowFile).getValue());
                 session.transfer(flowFile, REL_SUCCESS);
             } else {
@@ -474,7 +474,7 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
     private Properties buildKafkaConfigProperties(final ProcessContext context) {
         Properties properties = new Properties();
         String timeout = String.valueOf(context.getProperty(TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).longValue());
-        properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).getValue());
+        properties.setProperty("bootstrap.servers", context.getProperty(SEED_BROKERS).evaluateAttributeExpressions().getValue());
         properties.setProperty("acks", context.getProperty(DELIVERY_GUARANTEE).getValue());
         properties.setProperty("buffer.memory", String.valueOf(context.getProperty(MAX_BUFFER_SIZE).asDataSize(DataUnit.B).longValue()));
         properties.setProperty("compression.type", context.getProperty(COMPRESSION_CODEC).getValue());

http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
index ff01746..d02d011 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka.java
@@ -247,7 +247,7 @@ public class ConsumeKafka extends AbstractProcessor {
         }
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
         final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
-        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
+        final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).evaluateAttributeExpressions().getValue();
 
         return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/58ce52d5/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 0c1c4a7..c4b0140 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-9-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -121,7 +121,6 @@ public class ConsumeKafkaTest {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);
@@ -152,7 +151,6 @@ public class ConsumeKafkaTest {
             }
         };
         final TestRunner runner = TestRunners.newTestRunner(proc);
-        runner.setValidateExpressionUsage(false);
         runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
         runner.setProperty(ConsumeKafka.TOPICS, "foo,bar");
         runner.setProperty(ConsumeKafka.GROUP_ID, groupName);