You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2017/04/08 01:54:48 UTC
nifi git commit: NIFI-2714 This closes #1549. Added regex support to
ConsumeKafka_0_10
Repository: nifi
Updated Branches:
refs/heads/master 778ba3957 -> 4bfb905f3
NIFI-2714 This closes #1549. Added regex support to ConsumeKafka_0_10
Enabled the ability to specify wildcard topics as a regular expression
as supported in the Kafka client library.
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4bfb905f
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4bfb905f
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4bfb905f
Branch: refs/heads/master
Commit: 4bfb905f37d5084f70f228185431c16ce73369fb
Parents: 778ba39
Author: Jack Pickett <p1...@users.noreply.github.com>
Authored: Wed Mar 1 09:43:32 2017 -0500
Committer: joewitt <jo...@apache.org>
Committed: Fri Apr 7 21:52:33 2017 -0400
----------------------------------------------------------------------
.../kafka/pubsub/ConsumeKafka_0_10.java | 39 ++++++++++++++++----
.../processors/kafka/pubsub/ConsumerPool.java | 33 ++++++++++++++++-
.../kafka/pubsub/ConsumeKafkaTest.java | 32 ++++++++++++++++
nifi-nar-bundles/nifi-kafka-bundle/pom.xml | 2 +-
4 files changed, 95 insertions(+), 11 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
index e859f94..4da485f 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafka_0_10.java
@@ -25,6 +25,7 @@ import java.util.HashSet;
import java.util.List;
import java.util.Map;
import java.util.Set;
+import java.util.regex.Pattern;
import java.util.concurrent.TimeUnit;
import org.apache.kafka.clients.consumer.ConsumerConfig;
import org.apache.kafka.common.KafkaException;
@@ -78,6 +79,10 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
static final AllowableValue OFFSET_NONE = new AllowableValue("none", "none", "Throw exception to the consumer if no previous offset is found for the consumer's group");
+ static final AllowableValue TOPIC_NAME = new AllowableValue("names", "names", "Topic is a full topic name or comma separated list of names");
+
+ static final AllowableValue TOPIC_PATTERN = new AllowableValue("pattern", "pattern", "Topic is a regex using the Java Pattern syntax");
+
static final PropertyDescriptor TOPICS = new PropertyDescriptor.Builder()
.name("topic")
.displayName("Topic Name(s)")
@@ -87,6 +92,15 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
.expressionLanguageSupported(true)
.build();
+ static final PropertyDescriptor TOPIC_TYPE = new PropertyDescriptor.Builder()
+ .name("topic_type")
+ .displayName("Topic Name Format")
+ .description("Specifies whether the Topic(s) provided are a comma separated list of names or a single regular expression")
+ .required(true)
+ .allowableValues(TOPIC_NAME, TOPIC_PATTERN)
+ .defaultValue(TOPIC_NAME.getValue())
+ .build();
+
static final PropertyDescriptor GROUP_ID = new PropertyDescriptor.Builder()
.name(ConsumerConfig.GROUP_ID_CONFIG)
.displayName("Group ID")
@@ -166,6 +180,7 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
List<PropertyDescriptor> descriptors = new ArrayList<>();
descriptors.addAll(KafkaProcessorUtils.getCommonPropertyDescriptors());
descriptors.add(TOPICS);
+ descriptors.add(TOPIC_TYPE);
descriptors.add(GROUP_ID);
descriptors.add(AUTO_OFFSET_RESET);
descriptors.add(KEY_ATTRIBUTE_ENCODING);
@@ -229,18 +244,26 @@ public class ConsumeKafka_0_10 extends AbstractProcessor {
props.put(ConsumerConfig.KEY_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
props.put(ConsumerConfig.VALUE_DESERIALIZER_CLASS_CONFIG, ByteArrayDeserializer.class.getName());
final String topicListing = context.getProperty(ConsumeKafka_0_10.TOPICS).evaluateAttributeExpressions().getValue();
+ final String topicType = context.getProperty(ConsumeKafka_0_10.TOPIC_TYPE).evaluateAttributeExpressions().getValue();
final List<String> topics = new ArrayList<>();
- for (final String topic : topicListing.split(",", 100)) {
- final String trimmedName = topic.trim();
- if (!trimmedName.isEmpty()) {
- topics.add(trimmedName);
- }
- }
final String keyEncoding = context.getProperty(KEY_ATTRIBUTE_ENCODING).getValue();
final String securityProtocol = context.getProperty(KafkaProcessorUtils.SECURITY_PROTOCOL).getValue();
final String bootstrapServers = context.getProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS).getValue();
-
- return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
+ if (topicType.equals(TOPIC_NAME.getValue())) {
+ for (final String topic : topicListing.split(",", 100)) {
+ final String trimmedName = topic.trim();
+ if (!trimmedName.isEmpty()) {
+ topics.add(trimmedName);
+ }
+ }
+ return new ConsumerPool(maxLeases, demarcator, props, topics, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
+ } else if (topicType.equals(TOPIC_PATTERN.getValue())) {
+ final Pattern topicPattern = Pattern.compile(topicListing.trim());
+ return new ConsumerPool(maxLeases, demarcator, props, topicPattern, maxUncommittedTime, keyEncoding, securityProtocol, bootstrapServers, log);
+ } else {
+ getLogger().error("Subscription type has an unknown value {}", new Object[] {topicType});
+ return null;
+ }
}
@OnUnscheduled
http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
index baacdc7..b375b34 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/main/java/org/apache/nifi/processors/kafka/pubsub/ConsumerPool.java
@@ -25,6 +25,7 @@ import java.util.ArrayList;
import java.util.Collections;
import java.util.List;
import java.util.Map;
+import java.util.regex.Pattern;
import java.util.concurrent.ArrayBlockingQueue;
import java.util.concurrent.BlockingQueue;
import java.util.concurrent.atomic.AtomicLong;
@@ -40,6 +41,7 @@ public class ConsumerPool implements Closeable {
private final BlockingQueue<SimpleConsumerLease> pooledLeases;
private final List<String> topics;
+ private final Pattern topicPattern;
private final Map<String, Object> kafkaProperties;
private final long maxWaitMillis;
private final ComponentLog logger;
@@ -74,7 +76,7 @@ public class ConsumerPool implements Closeable {
public ConsumerPool(
final int maxConcurrentLeases,
final byte[] demarcator,
- final Map<String, Object> kafkaProperties,
+ final Map<String, Object> kafkaProperties,
final List<String> topics,
final long maxWaitMillis,
final String keyEncoding,
@@ -90,6 +92,29 @@ public class ConsumerPool implements Closeable {
this.bootstrapServers = bootstrapServers;
this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
this.topics = Collections.unmodifiableList(topics);
+ this.topicPattern = null;
+ }
+
+ public ConsumerPool(
+ final int maxConcurrentLeases,
+ final byte[] demarcator,
+ final Map<String, Object> kafkaProperties,
+ final Pattern topics,
+ final long maxWaitMillis,
+ final String keyEncoding,
+ final String securityProtocol,
+ final String bootstrapServers,
+ final ComponentLog logger) {
+ this.pooledLeases = new ArrayBlockingQueue<>(maxConcurrentLeases);
+ this.maxWaitMillis = maxWaitMillis;
+ this.logger = logger;
+ this.demarcatorBytes = demarcator;
+ this.keyEncoding = keyEncoding;
+ this.securityProtocol = securityProtocol;
+ this.bootstrapServers = bootstrapServers;
+ this.kafkaProperties = Collections.unmodifiableMap(kafkaProperties);
+ this.topics = null;
+ this.topicPattern = topics;
}
/**
@@ -119,7 +144,11 @@ public class ConsumerPool implements Closeable {
* This subscription tightly couples the lease to the given
* consumer. They cannot be separated from then on.
*/
- consumer.subscribe(topics, lease);
+ if (topics != null) {
+ consumer.subscribe(topics, lease);
+ } else {
+ consumer.subscribe(topicPattern, lease);
+ }
}
lease.setProcessSession(session);
leasesObtainedCountRef.incrementAndGet();
http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
index 9b380d5..4a5c4fb 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-0-10-processors/src/test/java/org/apache/nifi/processors/kafka/pubsub/ConsumeKafkaTest.java
@@ -134,6 +134,38 @@ public class ConsumeKafkaTest {
}
@Test
+ public void validateGetAllMessagesPattern() throws Exception {
+ String groupName = "validateGetAllMessagesPattern";
+
+ when(mockConsumerPool.obtainConsumer(anyObject())).thenReturn(mockLease);
+ when(mockLease.continuePolling()).thenReturn(Boolean.TRUE, Boolean.TRUE, Boolean.FALSE);
+ when(mockLease.commit()).thenReturn(Boolean.TRUE);
+
+ ConsumeKafka_0_10 proc = new ConsumeKafka_0_10() {
+ @Override
+ protected ConsumerPool createConsumerPool(final ProcessContext context, final ComponentLog log) {
+ return mockConsumerPool;
+ }
+ };
+ final TestRunner runner = TestRunners.newTestRunner(proc);
+ runner.setValidateExpressionUsage(false);
+ runner.setProperty(KafkaProcessorUtils.BOOTSTRAP_SERVERS, "0.0.0.0:1234");
+ runner.setProperty(ConsumeKafka_0_10.TOPICS, "(fo.*)|(ba)");
+ runner.setProperty(ConsumeKafka_0_10.TOPIC_TYPE, "pattern");
+ runner.setProperty(ConsumeKafka_0_10.GROUP_ID, groupName);
+ runner.setProperty(ConsumeKafka_0_10.AUTO_OFFSET_RESET, ConsumeKafka_0_10.OFFSET_EARLIEST);
+ runner.run(1, false);
+
+ verify(mockConsumerPool, times(1)).obtainConsumer(anyObject());
+ verify(mockLease, times(3)).continuePolling();
+ verify(mockLease, times(2)).poll();
+ verify(mockLease, times(1)).commit();
+ verify(mockLease, times(1)).close();
+ verifyNoMoreInteractions(mockConsumerPool);
+ verifyNoMoreInteractions(mockLease);
+ }
+
+ @Test
public void validateGetErrorMessages() throws Exception {
String groupName = "validateGetErrorMessages";
http://git-wip-us.apache.org/repos/asf/nifi/blob/4bfb905f/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
index 130609d..c963f12 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/pom.xml
@@ -54,5 +54,5 @@
<version>1.2.0-SNAPSHOT</version>
</dependency>
</dependencies>
- </dependencyManagement>
+ </dependencyManagement>
</project>