You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2017/01/27 17:44:38 UTC

nifi git commit: NIFI-3363: PutKafka NPE with User-Defined partition

Repository: nifi
Updated Branches:
  refs/heads/0.x 3a0948188 -> 008bffd9c


NIFI-3363: PutKafka NPE with User-Defined partition

- Marked PutKafka Partition Strategy property as deprecated, as
  Kafka 0.8 client doesn't use 'partitioner.class' as producer property, we don't have to specify it.
- Changed Partition Strategy property from a required one to a dynamic property, so that existing processor config can stay in valid state.
- Fixed partition property to work.
- Route a flow file if it failed to be published due to invalid partition.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/008bffd9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/008bffd9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/008bffd9

Branch: refs/heads/0.x
Commit: 008bffd9cd1787295840b411f1498439265bc8c5
Parents: 3a09481
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Jan 18 17:44:40 2017 +0900
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Jan 27 12:43:24 2017 -0500

----------------------------------------------------------------------
 .../apache/nifi/processors/kafka/PutKafka.java  | 77 ++++++++++----------
 .../nifi/processors/kafka/PutKafkaTest.java     | 59 ++++++++++++++-
 2 files changed, 95 insertions(+), 41 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/008bffd9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index abdf73d..38ec20c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -54,8 +54,7 @@ import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
 
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.8.x"})
-@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, , specifically for 0.8.x versions. " +
-        "The messages to send may be individual FlowFiles or may be delimited, using a "
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, specifically for 0.8.x versions. The messages to send may be individual FlowFiles or may be delimited, using a "
         + "user-specified delimiter, such as a new-line. The complementary NiFi processor for fetching messages is GetKafka.")
 @DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
                  description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
@@ -98,11 +97,20 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
     public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy",
             "Compress messages using Snappy");
 
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
+     */
     static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin",
             "Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
                     + "the next Partition to Partition 2, and so on, wrapping as necessary.");
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
+     */
     static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random",
             "Messages will be assigned to random partitions.");
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. To specify partition, simply configure the 'partition' property.
+     */
     static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined",
             "The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be "
                     + "assigned to the same partition.");
@@ -121,19 +129,22 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true)
             .build();
+    /**
+     * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
+     * This property is still valid as a dynamic property, so that existing processor configuration can stay valid.
+     */
     static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder()
             .name("Partition Strategy")
-            .description("Specifies how messages should be partitioned when sent to Kafka")
+            .description("Deprecated. Used to specify how messages should be partitioned when sent to Kafka, but it's no longer used.")
             .allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING)
-            .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
-            .required(true)
+            .dynamic(true)
             .build();
     public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
             .name("Partition")
             .description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages "
                             + "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, "
-                            + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.")
-            .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+                            + "then the FlowFile will be routed to failure relationship.")
+            .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
             .expressionLanguageSupported(true)
             .required(false)
             .build();
@@ -250,7 +261,6 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
         List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
         _propertyDescriptors.add(SEED_BROKERS);
         _propertyDescriptors.add(TOPIC);
-        _propertyDescriptors.add(PARTITION_STRATEGY);
         _propertyDescriptors.add(PARTITION);
         _propertyDescriptors.add(KEY);
         _propertyDescriptors.add(DELIVERY_GUARANTEE);
@@ -313,7 +323,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
             @Override
             public void process(InputStream contentStream) throws IOException {
                 PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream);
-                KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext);
+                KafkaPublisherResult result = null;
+                try {
+                    result = PutKafka.this.kafkaResource.publish(publishingContext);
+                } catch (final IllegalArgumentException e) {
+                    getLogger().error("Failed to publish {}, due to {}", new Object[]{flowFile, e}, e);
+                    result = new KafkaPublisherResult(0, -1);
+
+                }
                 publishResultRef.set(result);
             }
         });
@@ -402,26 +419,16 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
 
     @Override
     protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+        if (PARTITION_STRATEGY.getName().equals(propertyDescriptorName)) {
+            return PARTITION_STRATEGY;
+        }
+
         return new PropertyDescriptor.Builder()
                 .description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
                 .name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
                 .build();
     }
 
-    @Override
-    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
-        final List<ValidationResult> results = new ArrayList<>();
-
-        final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue();
-        if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
-                && !validationContext.getProperty(PARTITION).isSet()) {
-            results.add(new ValidationResult.Builder().subject("Partition").valid(false)
-                    .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy")
-                    .build());
-        }
-        return results;
-    }
-
     /**
      *
      */
@@ -442,15 +449,11 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
      *
      */
     private Integer determinePartition(ProcessContext context, FlowFile flowFile) {
-        String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
-        Integer partitionValue = null;
-        if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
-            String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
-            if (pv != null){
-                partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
-            }
+        String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+        if (pv != null){
+            return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
         }
-        return partitionValue;
+        return null;
     }
 
     /**
@@ -496,19 +499,13 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
         properties.setProperty("timeout.ms", timeout);
         properties.setProperty("metadata.fetch.timeout.ms", timeout);
 
-        String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
-        String partitionerClass = null;
-        if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
-            partitionerClass = Partitioners.RoundRobinPartitioner.class.getName();
-        } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
-            partitionerClass = Partitioners.RandomPartitioner.class.getName();
-        }
-        properties.setProperty("partitioner.class", partitionerClass);
-
         // Set Dynamic Properties
         for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
             PropertyDescriptor descriptor = entry.getKey();
             if (descriptor.isDynamic()) {
+                if (PARTITION_STRATEGY.equals(descriptor)) {
+                    continue;
+                }
                 if (properties.containsKey(descriptor.getName())) {
                     this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
                                     + properties.getProperty(descriptor.getName()) + "' with dynamically set value '"

http://git-wip-us.apache.org/repos/asf/nifi/blob/008bffd9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
index fbd2963..77b2bb9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 import static org.junit.Assert.fail;
 
 import java.nio.charset.StandardCharsets;
@@ -89,7 +90,7 @@ public class PutKafkaTest {
     }
 
     @Test
-    public void validateMultiCharacterDelimiyedMessages() {
+    public void validateMultiCharacterDelimitedMessages() {
         String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
         PutKafka putKafka = new PutKafka();
         TestRunner runner = TestRunners.newTestRunner(putKafka);
@@ -210,6 +211,62 @@ public class PutKafkaTest {
         runner.shutdown();
     }
 
+    @Test
+    public void validateDeprecatedPartitionStrategy() {
+        String topicName = "validateDeprecatedPartitionStrategy";
+        PutKafka putKafka = new PutKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+
+        // Old configuration using deprecated property still work.
+        runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
+        runner.setProperty(PutKafka.PARTITION, "${partition}");
+
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("partition", "0");
+        runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1, false);
+
+        runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+        ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
+        assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+        assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+
+        runner.shutdown();
+    }
+
+    @Test
+    public void validatePartitionOutOfBounds() {
+        String topicName = "validatePartitionOutOfBounds";
+        PutKafka putKafka = new PutKafka();
+        TestRunner runner = TestRunners.newTestRunner(putKafka);
+        runner.setProperty(PutKafka.TOPIC, topicName);
+        runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+        runner.setProperty(PutKafka.KEY, "key1");
+        runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
+        runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+        runner.setProperty(PutKafka.PARTITION, "${partition}");
+
+        runner.assertValid();
+
+        final Map<String, String> attributes = new HashMap<>();
+        attributes.put("partition", "123");
+        runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
+        runner.run(1, false);
+
+        assertTrue("Error message should be logged", runner.getLogger().getErrorMessages().size() > 0);
+        runner.assertTransferCount(PutKafka.REL_SUCCESS, 0);
+        runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+
+        runner.shutdown();
+    }
+
     private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
         Properties props = new Properties();
         props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());