You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/08 01:54:48 UTC

nifi git commit: NIFI-2714 This closes #1549. Added regex support to ConsumeKafka_0_10

Repository: nifi
Updated Branches:
  refs/heads/master 778ba3957 -> 4bfb905f3


NIFI-2714 This closes #1549. Added regex support to ConsumeKafka_0_10

Enabled the ability to specify wildcard topics as a regular expression
as supported in the Kafka client library.

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4bfb905f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4bfb905f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4bfb905f

Branch: refs/heads/master
Commit: 4bfb905f37d5084f70f228185431c16ce73369fb
Parents: 778ba39
Author: Jack Pickett <p1...@users.noreply.github.com>
Authored: Wed Mar 1 09:43:32 2017 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Apr 7 21:52:33 2017 -0400

----------------------------------------------------------------------
 .../kafka/pubsub/ConsumeKafka_0_10.java         | 39 ++++++++++++++++----
 .../processors/kafka/pubsub/ConsumerPool.java   | 33 ++++++++++++++++-
 .../kafka/pubsub/ConsumeKafkaTest.java          | 32 ++++++++++++++++
 nifi-nar-bundles/nifi-kafka-bundle/pom.xml      |  2 +-
 4 files changed, 95 insertions(+), 11 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index e859f94..4da485f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.regex.Pattern;
 import java.util.concurrent.TimeUnit;
 import org.apache.kafka.clients.consumer.ConsumerConfig;
 import org.apache.kafka.common.KafkaException;
@@ -78,6 +79,10 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
 
     static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
 
+    static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
+
+    static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");
+
     static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
             .name("topic")
             .displayName("Topic Name(s)")
@@ -87,6 +92,15 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
             .expressionLanguageSupported(true)
             .build();
 
+    static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
+            .name("topic_type")
+            .displayName("Topic Name Format")
+            .description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
+            .required(true)
+            .allowableValues(TOPIC_NAME, TOPIC_PATTERN)
+            .defaultValue(TOPIC_NAME.getValue())
+            .build();
+
     static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
             .name(ConsumerConfig.GROUP_ID_CONFIG)
             .displayName("Group ID")
@@ -166,6 +180,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         List<PropertyDescriptor> descriptors = new ArrayList<>();
         descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
         descriptors.add(TOPICS);
+        descriptors.add(TOPIC_TYPE);
         descriptors.add(GROUP_ID);
         descriptors.add(AUTO_OFFSET_RESET);
         descriptors.add(KEY_ATTRIBUTE_ENCODING);
@@ -229,18 +244,26 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
         props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
         final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue();
+        final String topicType = context.getProperty(ConsumeKafka_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
         final List<String> topics = new ArrayList<>();
-        for (final String topic : topicListing.split(",", 100)) {
-            final String trimmedName = topic.trim();
-            if (!trimmedName.isEmpty()) {
-                topics.add(trimmedName);
-            }
-        }
         final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
         final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
         final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
-
-        return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
+        if (topicType.equals(TOPIC_NAME.getValue())) {
+          for (final String topic : topicListing.split(",", 100)) {
+              final String trimmedName = topic.trim();
+              if (!trimmedName.isEmpty()) {
+                  topics.add(trimmedName);
+              }
+          }
+          return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
+        } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
+          final Pattern topicPattern = Pattern.compile(topicListing.trim());
+          return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
+        } else {
+          getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
+          return null;
+        }
     }
 
     @OnUnscheduled

http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index baacdc7..b375b34 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
 import java.util.Collections;
 import java.util.List;
 import java.util.Map;
+import java.util.regex.Pattern;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.atomic.AtomicLong;
@@ -40,6 +41,7 @@ public class ConsumerPool implements Closeable {
 
     private final BlockingQueue<SimpleConsumerLease> pooledLeases;
     private final List<String> topics;
+    private final Pattern topicPattern;
     private final Map<String, Object> kafkaProperties;
     private final long maxWaitMillis;
     private final ComponentLog logger;
@@ -74,7 +76,7 @@ public class ConsumerPool implements Closeable {
     public ConsumerPool(
             final int maxConcurrentLeases,
             final byte[] demarcator,
-        final Map<String, Object> kafkaProperties,
+            final Map<String, Object> kafkaProperties,
             final List<String> topics,
             final long maxWaitMillis,
             final String keyEncoding,
@@ -90,6 +92,29 @@ public class ConsumerPool implements Closeable {
         this.bootstrapServers = bootstrapServers;
         this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
         this.topics = Collections.unmodifiableList(topics);
+        this.topicPattern = null;
+    }
+
+    public ConsumerPool(
+            final int maxConcurrentLeases,
+            final byte[] demarcator,
+            final Map<String, Object> kafkaProperties,
+            final Pattern topics,
+            final long maxWaitMillis,
+            final String keyEncoding,
+            final String securityProtocol,
+            final String bootstrapServers,
+            final ComponentLog logger) {
+        this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+        this.maxWaitMillis = maxWaitMillis;
+        this.logger = logger;
+        this.demarcatorBytes = demarcator;
+        this.keyEncoding = keyEncoding;
+        this.securityProtocol = securityProtocol;
+        this.bootstrapServers = bootstrapServers;
+        this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+        this.topics = null;
+        this.topicPattern = topics;
     }
 
     /**
@@ -119,7 +144,11 @@ public class ConsumerPool implements Closeable {
              * This subscription tightly couples the lease to the given
              * consumer. They cannot be separated from then on.
              */
-            consumer.subscribe(topics, lease);
+            if (topics != null) {
+              consumer.subscribe(topics, lease);
+            } else {
+              consumer.subscribe(topicPattern, lease);
+            }
         }
         lease.setProcessSession(session);
         leasesObtainedCountRef.incrementAndGet();

http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 9b380d5..4a5c4fb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -134,6 +134,38 @@ public class ConsumeKafkaTest {
     }
 
     @Test
+    public void validateGetAllMessagesPattern() throws Exception {
+        String groupName = "validateGetAllMessagesPattern";
+
+        when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+        when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+        when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+        ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+            @Override
+            protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+                return mockConsumerPool;
+            }
+        };
+        final TestRunner runner = TestRunners.newTestRunner(proc);
+        runner.setValidateExpressionUsage(false);
+        runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+        runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
+        runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
+        runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+        runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
+        runner.run(1, false);
+
+        verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+        verify(mockLease, times(3)).continuePolling();
+        verify(mockLease, times(2)).poll();
+        verify(mockLease, times(1)).commit();
+        verify(mockLease, times(1)).close();
+        verifyNoMoreInteractions(mockConsumerPool);
+        verifyNoMoreInteractions(mockLease);
+    }
+
+    @Test
     public void validateGetErrorMessages() throws Exception {
         String groupName = "validateGetErrorMessages";
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
index 130609d..c963f12 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
@@ -54,5 +54,5 @@
                 <version>1.2.0-SNAPSHOT</version>
             </dependency>
         </dependencies>
-    </dependencyManagement> 
+    </dependencyManagement>
 </project>