You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2015/11/25 19:03:31 UTC

[1/4] nifi git commit: NIFI-1192 added support for dynamic properties to GetKafka Due to the fact that current component uses artificial names for properties set via UI and then maps those properties to the actual names used by Kafka, we can not rely on

Repository: nifi
Updated Branches:
  refs/heads/master 4281a51c8 -> e1742c5a0


NIFI-1192 added support for dynamic properties to GetKafka
Due to the fact that current component uses artificial names for properties set via UI and then maps those properties to the actual names used by Kafka, we can not rely on NiFi UI to display an error if user attempts to set a dynamic property which will eventually map to the same Kafka property. So, I’ve decided that any dynamic property will simply override an existing property with WARNING message displayed. It is actually consistent with how Kafka does it and displayed the overrides in the console. Updated the relevant annotation description.
It is also worth to mentioned that current code was using an old property from Kafka 0.7 (“zk.connectiontimeout.ms”) which is no longer present in Kafka 0.8 (WARN Timer-Driven Process Thread-7 utils.VerifiableProperties:83 - Property zk.connectiontimeout.ms is not valid). The add/override strategy would provide for more flexibility when dealing with Kafka volatile configuration until things will settle down and we can get some sensible defaults in place.

While doing it addressed the following issues that were discovered while making modification and testing:
ISSUE: When GetKafka started and there are no messages in Kafka topic the onTrigger(..) method would block due to the fact that Kafka’s ConsumerIterator.hasNext() blocks. When attempt was made to stop GetKafka would stops successfully due to the interrupt. However in UI it would appear as ERROR based on the fact that InterruptException was not handled.
RESOLUTION: After discussing it with @markap14 the the general desire is to let the task exit as quick as possible and that the whole thread maintenance logic was there initially due to the fact that there was no way to tell Kafka consumer to return immediately if there are no events. In this patch we are now using ‘consumer.timeout.ms’ property of Kafka and setting its value to 1 millisecond (default is -1 - always block infinitely). This ensures that tasks that attempted to read an empty topic will exit immediately just to be rescheduled by NiFi based on user configurations.

ISSUE:  Kafka would not release FlowFile with events if it didn’t have enough to complete the batch since it would block waiting for more messages (based on the blocking issue described above).
RESOLUTION: The invocation of hasNext() results in Kafka’s ConsumerTimeoutException which is handled in the catch block where the FlowFile with partial batch will be released to success. Not sure if we need to put a WARN message. In fact in my opinion we should not as it may create unnecessary confusion.

ISSUE: When configuring a consumer for topic and specifying multiple concurrent consumers in ‘topicCountMap’ based on 'context.getMaxConcurrentTasks()’ each consumer would bind to a topic partition. If you have less partitions then the value returned by 'context.getMaxConcurrentTasks()’ you would essentially allocate Kafka resources that would never get a chance to receive a single message  (see more here https://cwiki.apache.org/confluence/display/KAFKA/Consumer+Group+Example).
RESOLUTION: Logic was added to determine the amount of partitions for a topic and in the event where 'context.getMaxConcurrentTasks()’ value is greater than the amount of partitions, the partition count will be used to when creating ‘topicCountMap’ and WARNING message will be displayed)see code). Unfortunately we can’t do anything with the actual tasks, but based on current state of the code they will exit immediately just to be rescheduled where the process will repeat. NOTE: That is not ideal as it will be rescheduling tasks that will never have a chance to do anything, but at least it could be fixed on the user side after reading the warning message.

NIFI-1192 added dynamic properties support for PutKafka

NIFI-1192 polishing

NIFI-1192 polished and addressed PR comments


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d949ee1a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d949ee1a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d949ee1a

Branch: refs/heads/master
Commit: d949ee1a1ee529fbf0103b4620fef47832132a53
Parents: f1f67f6
Author: Oleg Zhurakousky <ol...@suitcase.io>
Authored: Mon Nov 23 15:15:03 2015 -0500
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Tue Nov 24 12:14:36 2015 -0500

----------------------------------------------------------------------
 .../nifi-kafka-processors/pom.xml               |   5 +
 .../apache/nifi/processors/kafka/GetKafka.java  | 209 +++++++++++--------
 .../nifi/processors/kafka/KafkaUtils.java       |  56 +++++
 .../apache/nifi/processors/kafka/PutKafka.java  |  27 +++
 4 files changed, 213 insertions(+), 84 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
index 1a8dc9d..7793102 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/pom.xml
@@ -66,6 +66,11 @@
             <scope>test</scope>
         </dependency>
         <dependency>
+            <groupId>commons-io</groupId>
+            <artifactId>commons-io</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-simple</artifactId>
             <scope>test</scope>

http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 4be6194..8f8d2e9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -24,14 +24,15 @@ import java.util.HashMap;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
+import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.Lock;
-import java.util.concurrent.locks.ReentrantLock;
+import java.util.concurrent.atomic.AtomicBoolean;
 
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -39,9 +40,7 @@ import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
-import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.lifecycle.OnStopped;
-import org.apache.nifi.annotation.lifecycle.OnUnscheduled;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.Validator;
 import org.apache.nifi.flowfile.FlowFile;
@@ -56,6 +55,7 @@ import org.apache.nifi.processor.util.StandardValidators;
 import kafka.consumer.Consumer;
 import kafka.consumer.ConsumerConfig;
 import kafka.consumer.ConsumerIterator;
+import kafka.consumer.ConsumerTimeoutException;
 import kafka.consumer.KafkaStream;
 import kafka.javaapi.consumer.ConsumerConnector;
 import kafka.message.MessageAndMetadata;
@@ -70,6 +70,11 @@ import kafka.message.MessageAndMetadata;
                 + " the message does not have a key, or if the batch size is greater than 1, this attribute will not be added"),
         @WritesAttribute(attribute = "kafka.partition", description = "The partition of the Kafka Topic from which the message was received. This attribute is added only if the batch size is 1"),
         @WritesAttribute(attribute = "kafka.offset", description = "The offset of the message within the Kafka partition. This attribute is added only if the batch size is 1")})
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+            description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
+        + " overriden with warning message describing the override."
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
 public class GetKafka extends AbstractProcessor {
 
     public static final String SMALLEST = "smallest";
@@ -167,9 +172,7 @@ public class GetKafka extends AbstractProcessor {
     private final BlockingQueue<ConsumerIterator<byte[], byte[]>> streamIterators = new LinkedBlockingQueue<>();
     private volatile ConsumerConnector consumer;
 
-    final Lock interruptionLock = new ReentrantLock();
-    // guarded by interruptionLock
-    private final Set<Thread> interruptableThreads = new HashSet<>();
+    private final AtomicBoolean consumerStreamsReady = new AtomicBoolean();
 
     @Override
     protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
@@ -203,26 +206,69 @@ public class GetKafka extends AbstractProcessor {
         return relationships;
     }
 
-    @OnScheduled
     public void createConsumers(final ProcessContext context) {
         final String topic = context.getProperty(TOPIC).getValue();
 
-        final Map<String, Integer> topicCountMap = new HashMap<>(1);
-        topicCountMap.put(topic, context.getMaxConcurrentTasks());
-
         final Properties props = new Properties();
         props.setProperty("zookeeper.connect", context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue());
         props.setProperty("group.id", context.getProperty(GROUP_ID).getValue());
         props.setProperty("client.id", context.getProperty(CLIENT_NAME).getValue());
         props.setProperty("auto.commit.interval.ms", String.valueOf(context.getProperty(ZOOKEEPER_COMMIT_DELAY).asTimePeriod(TimeUnit.MILLISECONDS)));
-        props.setProperty("auto.commit.enable", "true"); // just be explicit
         props.setProperty("auto.offset.reset", context.getProperty(AUTO_OFFSET_RESET).getValue());
-        props.setProperty("zk.connectiontimeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
+        props.setProperty("zookeeper.connection.timeout.ms", context.getProperty(ZOOKEEPER_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
         props.setProperty("socket.timeout.ms", context.getProperty(KAFKA_TIMEOUT).asTimePeriod(TimeUnit.MILLISECONDS).toString());
 
+        for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                if (props.containsKey(descriptor.getName())) {
+                    this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
+                                    + props.getProperty(descriptor.getName()) + "' with dynamically set value '"
+                                    + entry.getValue() + "'.");
+                }
+                props.setProperty(descriptor.getName(), entry.getValue());
+            }
+        }
+
+        /*
+         * Unless user sets it to some explicit value we are setting it to the
+         * lowest possible value of 1 millisecond to ensure the
+         * consumerStream.hasNext() doesn't block. See
+         * http://kafka.apache.org/documentation.html#configuration) as well as
+         * comment in 'catch ConsumerTimeoutException' in onTrigger() for more
+         * explanation as to the reasoning behind it.
+         */
+        if (!props.containsKey("consumer.timeout.ms")) {
+            this.getLogger().info("Setting 'consumer.timeout.ms' to 1 milliseconds to avoid consumer"
+                            + " block in the event when no events are present in Kafka topic. If you wish to change this value "
+                            + " set it as dynamic property. If you wish to explicitly enable consumer block (at your own risk)"
+                            + " set its value to -1.");
+            props.setProperty("consumer.timeout.ms", "1");
+        }
+
         final ConsumerConfig consumerConfig = new ConsumerConfig(props);
         consumer = Consumer.createJavaConsumerConnector(consumerConfig);
 
+        final Map<String, Integer> topicCountMap = new HashMap<>(1);
+
+        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
+                context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
+
+        int concurrentTaskToUse = context.getMaxConcurrentTasks();
+        if (context.getMaxConcurrentTasks() < partitionCount){
+            this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
+                    + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
+                            + "Consider making it equal to the amount of partition count for most efficient event consumption.");
+        } else if (context.getMaxConcurrentTasks() > partitionCount){
+            concurrentTaskToUse = partitionCount;
+            this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
+                    + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
+                            + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to "
+                                    + "consume events");
+        }
+
+        topicCountMap.put(topic, concurrentTaskToUse);
+
         final Map<String, List<KafkaStream<byte[], byte[]>>> consumerMap = consumer.createMessageStreams(topicCountMap);
         final List<KafkaStream<byte[], byte[]>> streams = consumerMap.get(topic);
 
@@ -231,10 +277,12 @@ public class GetKafka extends AbstractProcessor {
         for (final KafkaStream<byte[], byte[]> stream : streams) {
             streamIterators.add(stream.iterator());
         }
+        this.consumerStreamsReady.set(true);
     }
 
     @OnStopped
     public void shutdownConsumer() {
+        this.consumerStreamsReady.set(false);
         if (consumer != null) {
             try {
                 consumer.commitOffsets();
@@ -244,75 +292,57 @@ public class GetKafka extends AbstractProcessor {
         }
     }
 
-    @OnUnscheduled
-    public void interruptIterators() {
-        // Kafka doesn't provide a non-blocking API for pulling messages. We can, however,
-        // interrupt the Threads. We do this when the Processor is stopped so that we have the
-        // ability to shutdown the Processor.
-        interruptionLock.lock();
-        try {
-            for (final Thread t : interruptableThreads) {
-                t.interrupt();
-            }
+    @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
 
-            interruptableThreads.clear();
-        } finally {
-            interruptionLock.unlock();
+    @Override
+    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
+        /*
+         * Will ensure that consumer streams are ready upon the first invocation
+         * of onTrigger. Will be reset to 'false' in the event of exception
+         */
+        synchronized (this.consumerStreamsReady) {
+            if (!this.consumerStreamsReady.get()) {
+                this.createConsumers(context);
+            }
+        }
+        ConsumerIterator<byte[], byte[]> iterator = this.getStreamIterator();
+        if (iterator != null) {
+            this.consumeFromKafka(context, session, iterator);
         }
     }
 
     protected ConsumerIterator<byte[], byte[]> getStreamIterator() {
-        return streamIterators.poll();
+        return this.streamIterators.poll();
     }
 
-    @Override
-    public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        final ConsumerIterator<byte[], byte[]> iterator = getStreamIterator();
-        if (iterator == null) {
-            return;
-        }
+
+    private void consumeFromKafka(final ProcessContext context, final ProcessSession session,
+            ConsumerIterator<byte[], byte[]> iterator) throws ProcessException {
 
         final int batchSize = context.getProperty(BATCH_SIZE).asInteger();
         final String demarcator = context.getProperty(MESSAGE_DEMARCATOR).getValue().replace("\\n", "\n").replace("\\r", "\r").replace("\\t", "\t");
         final byte[] demarcatorBytes = demarcator.getBytes(StandardCharsets.UTF_8);
         final String topic = context.getProperty(TOPIC).getValue();
 
-        FlowFile flowFile = null;
-        try {
-            // add the current thread to the Set of those to be interrupted if processor stopped.
-            interruptionLock.lock();
-            try {
-                interruptableThreads.add(Thread.currentThread());
-            } finally {
-                interruptionLock.unlock();
-            }
-
-            final long start = System.nanoTime();
-            flowFile = session.create();
-
-            final Map<String, String> attributes = new HashMap<>();
-            attributes.put("kafka.topic", topic);
+        FlowFile flowFile = session.create();
 
-            int numMessages = 0;
-            for (int msgCount = 0; msgCount < batchSize; msgCount++) {
-                // if the processor is stopped, iterator.hasNext() will throw an Exception.
-                // In this case, we just break out of the loop.
-                try {
-                    if (!iterator.hasNext()) {
-                        break;
-                    }
-                } catch (final Exception e) {
-                    break;
-                }
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("kafka.topic", topic);
+        final long start = System.nanoTime();
+        int msgCount = 0;
 
+        try {
+            for (; msgCount < batchSize && iterator.hasNext(); msgCount++) {
                 final MessageAndMetadata<byte[], byte[]> mam = iterator.next();
-                if (mam == null) {
-                    return;
-                }
-
-                final byte[] key = mam.key();
 
                 if (batchSize == 1) {
+                    final byte[] key = mam.key();
                     // the kafka.key, kafka.offset, and kafka.partition attributes are added only
                     // for a batch size of 1.
                     if (key != null) {
@@ -334,33 +364,26 @@ public class GetKafka extends AbstractProcessor {
                         out.write(mam.message());
                     }
                 });
-                numMessages++;
-            }
-
-            // If we received no messages, remove the FlowFile. Otherwise, send to success.
-            if (flowFile.getSize() == 0L) {
-                session.remove(flowFile);
-            } else {
-                flowFile = session.putAllAttributes(flowFile, attributes);
-                final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
-                session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + numMessages + " Kafka messages", millis);
-                getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, numMessages, millis});
-                session.transfer(flowFile, REL_SUCCESS);
             }
+            this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount);
+        } catch (ConsumerTimeoutException e) {
+            /*
+             * By default Kafka blocks indefinitely if topic is empty via
+             * stream.hasNext(). If 'consumer.timeout.ms' property is set (see
+             * http://kafka.apache.org/documentation.html#configuration) the
+             * hasNext() will fail with this exception. To this processor it
+             * simply means there are no messages and current task should exit
+             * in non-failure releasing the flow file if it was able to
+             * accumulate any events.
+             */
+            this.releaseFlowFile(flowFile, session, attributes, start, topic, msgCount);
         } catch (final Exception e) {
+            this.shutdownConsumer();
             getLogger().error("Failed to receive FlowFile from Kafka due to {}", new Object[]{e});
             if (flowFile != null) {
                 session.remove(flowFile);
             }
         } finally {
-            // Remove the current thread from the Set of Threads to interrupt.
-            interruptionLock.lock();
-            try {
-                interruptableThreads.remove(Thread.currentThread());
-            } finally {
-                interruptionLock.unlock();
-            }
-
             // Add the iterator back to the queue
             if (iterator != null) {
                 streamIterators.offer(iterator);
@@ -368,4 +391,22 @@ public class GetKafka extends AbstractProcessor {
         }
     }
 
+    /**
+     * Will release flow file. Releasing of the flow file in the context of this
+     * operation implies the following:
+     *
+     * If Empty then remove from session and return
+     * If has something then transfer to REL_SUCCESS
+     */
+    private void releaseFlowFile(FlowFile flowFile, ProcessSession session, Map<String, String> attributes, long start, String topic, int msgCount){
+        if (flowFile.getSize() == 0L) {
+            session.remove(flowFile);
+        } else {
+            flowFile = session.putAllAttributes(flowFile, attributes);
+            final long millis = TimeUnit.NANOSECONDS.toMillis(System.nanoTime() - start);
+            session.getProvenanceReporter().receive(flowFile, "kafka://" + topic, "Received " + msgCount + " Kafka messages", millis);
+            getLogger().info("Successfully received {} from Kafka with {} messages in {} millis", new Object[]{flowFile, msgCount, millis});
+            session.transfer(flowFile, REL_SUCCESS);
+        }
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
new file mode 100644
index 0000000..657d88b
--- /dev/null
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaUtils.java
@@ -0,0 +1,56 @@
+package org.apache.nifi.processors.kafka;
+
+import java.util.Collections;
+
+import org.I0Itec.zkclient.ZkClient;
+import org.I0Itec.zkclient.exception.ZkMarshallingError;
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+import org.I0Itec.zkclient.serialize.ZkSerializer;
+
+import kafka.admin.AdminUtils;
+import kafka.api.TopicMetadata;
+import kafka.utils.ZKStringSerializer;
+import scala.collection.JavaConversions;
+
+/**
+ * Utility class to support interruction with Kafka internals.
+ *
+ */
+class KafkaUtils {
+
+    /**
+     * Will retrieve the amount of partitions for a given Kafka topic.
+     */
+    static int retrievePartitionCountForTopic(String zookeeperConnectionString, String topicName) {
+        ZkClient zkClient = new ZkClient(zookeeperConnectionString);
+        zkClient.setZkSerializer(new ZkSerializer() {
+            @Override
+            public byte[] serialize(Object o) throws ZkMarshallingError {
+                return ZKStringSerializer.serialize(o);
+            }
+
+            @Override
+            public Object deserialize(byte[] bytes) throws ZkMarshallingError {
+                return ZKStringSerializer.deserialize(bytes);
+            }
+        });
+        scala.collection.Set<TopicMetadata> topicMetadatas = AdminUtils
+                .fetchTopicMetadataFromZk(JavaConversions.asScalaSet(Collections.singleton(topicName)), zkClient);
+        return topicMetadatas.size();
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/d949ee1a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index ea7f7bb..b5766e4 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -27,6 +27,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
 import java.util.Set;
+import java.util.Map.Entry;
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -44,6 +45,7 @@ import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.kafka.common.Node;
 import org.apache.kafka.common.PartitionInfo;
 import org.apache.kafka.common.serialization.ByteArraySerializer;
+import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
@@ -81,6 +83,11 @@ import scala.actors.threadpool.Arrays;
 @CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka. The messages to send may be individual FlowFiles or may be delimited, using a "
     + "user-specified delimiter, such as a new-line.")
 @TriggerWhenEmpty // because we have a queue of sessions that are ready to be committed
+@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
+            description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
+        + " In the event a dynamic property represents a property that was already set as part of the static properties, its value wil be"
+        + " overriden with warning message describing the override."
+        + " For the list of available Kafka properties please refer to: http://kafka.apache.org/documentation.html#configuration.")
 public class PutKafka extends AbstractSessionFactoryProcessor {
 
     private static final String SINGLE_BROKER_REGEX = ".*?\\:\\d{3,5}";
@@ -356,6 +363,18 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
         properties.setProperty("retries", "0");
         properties.setProperty("block.on.buffer.full", "false");
 
+        for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
+            PropertyDescriptor descriptor = entry.getKey();
+            if (descriptor.isDynamic()) {
+                if (properties.containsKey(descriptor.getName())) {
+                    this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
+                                    + properties.getProperty(descriptor.getName()) + "' with dynamically set value '"
+                                    + entry.getValue() + "'.");
+                }
+                properties.setProperty(descriptor.getName(), entry.getValue());
+            }
+        }
+
         return properties;
     }
 
@@ -398,6 +417,14 @@ public class PutKafka extends AbstractSessionFactoryProcessor {
     }
 
     @Override
+    protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        return new PropertyDescriptor.Builder()
+                .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
+                .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
+                .build();
+    }
+
+    @Override
     public void onTrigger(final ProcessContext context, final ProcessSessionFactory sessionFactory) throws ProcessException {
         FlowFileMessageBatch batch;
         while ((batch = completeBatches.poll()) != null) {


[4/4] nifi git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi

Posted by ma...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/nifi


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/e1742c5a
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/e1742c5a
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/e1742c5a

Branch: refs/heads/master
Commit: e1742c5a0414077bfd6688cc96332713b7dc7468
Parents: 4fa2a71 4281a51
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 25 13:03:22 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Nov 25 13:03:22 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/standard/ListenSyslog.java    | 3 ++-
 .../main/java/org/apache/nifi/processors/standard/PutSyslog.java  | 1 +
 2 files changed, 3 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[2/4] nifi git commit: Merge branch 'NIFI-1192B' of https://github.com/olegz/nifi into NIFI-1192

Posted by ma...@apache.org.
Merge branch 'NIFI-1192B' of https://github.com/olegz/nifi into NIFI-1192


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/657885e5
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/657885e5
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/657885e5

Branch: refs/heads/master
Commit: 657885e5ba8210c76c7d023b2841ef0ac0a64f28
Parents: 4e2c94d d949ee1
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 25 12:30:15 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Nov 25 12:30:15 2015 -0500

----------------------------------------------------------------------
 .../nifi-kafka-processors/pom.xml               |   5 +
 .../apache/nifi/processors/kafka/GetKafka.java  | 209 +++++++++++--------
 .../nifi/processors/kafka/KafkaUtils.java       |  56 +++++
 .../apache/nifi/processors/kafka/PutKafka.java  |  27 +++
 4 files changed, 213 insertions(+), 84 deletions(-)
----------------------------------------------------------------------



[3/4] nifi git commit: NIFI-1192: Removed some additional white space

Posted by ma...@apache.org.
NIFI-1192: Removed some additional white space


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/4fa2a713
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/4fa2a713
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/4fa2a713

Branch: refs/heads/master
Commit: 4fa2a713a2f4df901f0efa321db40682bb950ce9
Parents: 657885e
Author: Mark Payne <ma...@hotmail.com>
Authored: Wed Nov 25 12:57:06 2015 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Wed Nov 25 12:57:06 2015 -0500

----------------------------------------------------------------------
 .../java/org/apache/nifi/processors/kafka/GetKafka.java  | 11 ++++-------
 1 file changed, 4 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/4fa2a713/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
index 8f8d2e9..5e7a7ae 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/GetKafka.java
@@ -223,8 +223,7 @@ public class GetKafka extends AbstractProcessor {
             if (descriptor.isDynamic()) {
                 if (props.containsKey(descriptor.getName())) {
                     this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
-                                    + props.getProperty(descriptor.getName()) + "' with dynamically set value '"
-                                    + entry.getValue() + "'.");
+                        + props.getProperty(descriptor.getName()) + "' with dynamically set value '" + entry.getValue() + "'.");
                 }
                 props.setProperty(descriptor.getName(), entry.getValue());
             }
@@ -251,20 +250,18 @@ public class GetKafka extends AbstractProcessor {
 
         final Map<String, Integer> topicCountMap = new HashMap<>(1);
 
-        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(
-                context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
+        int partitionCount = KafkaUtils.retrievePartitionCountForTopic(context.getProperty(ZOOKEEPER_CONNECTION_STRING).getValue(), context.getProperty(TOPIC).getValue());
 
         int concurrentTaskToUse = context.getMaxConcurrentTasks();
         if (context.getMaxConcurrentTasks() < partitionCount){
             this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
                     + "this processor is less than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
-                            + "Consider making it equal to the amount of partition count for most efficient event consumption.");
+                + "Consider making it equal to the amount of partition count for most efficient event consumption.");
         } else if (context.getMaxConcurrentTasks() > partitionCount){
             concurrentTaskToUse = partitionCount;
             this.getLogger().warn("The amount of concurrent tasks '" + context.getMaxConcurrentTasks() + "' configured for "
                     + "this processor is greater than the amount of partitions '" + partitionCount + "' for topic '" + context.getProperty(TOPIC).getValue() + "'. "
-                            + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to "
-                                    + "consume events");
+                + "Therefore those tasks would never see a message. To avoid that the '" + partitionCount + "'(partition count) will be used to consume events");
         }
 
         topicCountMap.put(topic, concurrentTaskToUse);