You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2017/01/27 17:44:38 UTC
nifi git commit: NIFI-3363: PutKafka NPE with User-Defined partition
Repository: nifi
Updated Branches:
refs/heads/0.x 3a0948188 -> 008bffd9c
NIFI-3363: PutKafka NPE with User-Defined partition
- Marked PutKafka Partition Strategy property as deprecated, as
Kafka 0.8 client doesn't use 'partitioner.class' as producer property, we don't have to specify it.
- Changed Partition Strategy property from a required one to a dynamic property, so that existing processor config can stay in valid state.
- Fixed partition property to work.
- Route a flow file if it failed to be published due to invalid partition.
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/008bffd9
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/008bffd9
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/008bffd9
Branch: refs/heads/0.x
Commit: 008bffd9cd1787295840b411f1498439265bc8c5
Parents: 3a09481
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Jan 18 17:44:40 2017 +0900
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Fri Jan 27 12:43:24 2017 -0500
----------------------------------------------------------------------
.../apache/nifi/processors/kafka/PutKafka.java | 77 ++++++++++----------
.../nifi/processors/kafka/PutKafkaTest.java | 59 ++++++++++++++-
2 files changed, 95 insertions(+), 41 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/008bffd9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index abdf73d..38ec20c 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -54,8 +54,7 @@ import org.apache.nifi.processors.kafka.KafkaPublisher.KafkaPublisherResult;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub", "0.8.x"})
-@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, , specifically for 0.8.x versions. " +
- "The messages to send may be individual FlowFiles or may be delimited, using a "
+@CapabilityDescription("Sends the contents of a FlowFile as a message to Apache Kafka, specifically for 0.8.x versions. The messages to send may be individual FlowFiles or may be delimited, using a "
+ "user-specified delimiter, such as a new-line. The complementary NiFi processor for fetching messages is GetKafka.")
@DynamicProperty(name = "The name of a Kafka configuration property.", value = "The value of a given Kafka configuration property.",
description = "These properties will be added on the Kafka configuration after loading any provided configuration properties."
@@ -98,11 +97,20 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
public static final AllowableValue COMPRESSION_CODEC_SNAPPY = new AllowableValue("snappy", "Snappy",
"Compress messages using Snappy");
+ /**
+ * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
+ */
static final AllowableValue ROUND_ROBIN_PARTITIONING = new AllowableValue("Round Robin", "Round Robin",
"Messages will be assigned partitions in a round-robin fashion, sending the first message to Partition 1, "
+ "the next Partition to Partition 2, and so on, wrapping as necessary.");
+ /**
+ * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
+ */
static final AllowableValue RANDOM_PARTITIONING = new AllowableValue("Random Robin", "Random",
"Messages will be assigned to random partitions.");
+ /**
+ * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property. To specify partition, simply configure the 'partition' property.
+ */
static final AllowableValue USER_DEFINED_PARTITIONING = new AllowableValue("User-Defined", "User-Defined",
"The <Partition> property will be used to determine the partition. All messages within the same FlowFile will be "
+ "assigned to the same partition.");
@@ -121,19 +129,22 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
.build();
+ /**
+ * @deprecated Kafka 0.8.x producer doesn't use 'partitioner.class' property.
+ * This property is still valid as a dynamic property, so that existing processor configuration can stay valid.
+ */
static final PropertyDescriptor PARTITION_STRATEGY = new PropertyDescriptor.Builder()
.name("Partition Strategy")
- .description("Specifies how messages should be partitioned when sent to Kafka")
+ .description("Deprecated. Used to specify how messages should be partitioned when sent to Kafka, but it's no longer used.")
.allowableValues(ROUND_ROBIN_PARTITIONING, RANDOM_PARTITIONING, USER_DEFINED_PARTITIONING)
- .defaultValue(ROUND_ROBIN_PARTITIONING.getValue())
- .required(true)
+ .dynamic(true)
.build();
public static final PropertyDescriptor PARTITION = new PropertyDescriptor.Builder()
.name("Partition")
.description("Specifies which Kafka Partition to add the message to. If using a message delimiter, all messages "
+ "in the same FlowFile will be sent to the same partition. If a partition is specified but is not valid, "
- + "then all messages within the same FlowFile will use the same partition but it remains undefined which partition is used.")
- .addValidator(StandardValidators.POSITIVE_INTEGER_VALIDATOR)
+ + "then the FlowFile will be routed to failure relationship.")
+ .addValidator(StandardValidators.NON_NEGATIVE_INTEGER_VALIDATOR)
.expressionLanguageSupported(true)
.required(false)
.build();
@@ -250,7 +261,6 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
List<PropertyDescriptor> _propertyDescriptors = new ArrayList<>();
_propertyDescriptors.add(SEED_BROKERS);
_propertyDescriptors.add(TOPIC);
- _propertyDescriptors.add(PARTITION_STRATEGY);
_propertyDescriptors.add(PARTITION);
_propertyDescriptors.add(KEY);
_propertyDescriptors.add(DELIVERY_GUARANTEE);
@@ -313,7 +323,14 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
@Override
public void process(InputStream contentStream) throws IOException {
PublishingContext publishingContext = PutKafka.this.buildPublishingContext(flowFile, context, contentStream);
- KafkaPublisherResult result = PutKafka.this.kafkaResource.publish(publishingContext);
+ KafkaPublisherResult result = null;
+ try {
+ result = PutKafka.this.kafkaResource.publish(publishingContext);
+ } catch (final IllegalArgumentException e) {
+ getLogger().error("Failed to publish {}, due to {}", new Object[]{flowFile, e}, e);
+ result = new KafkaPublisherResult(0, -1);
+
+ }
publishResultRef.set(result);
}
});
@@ -402,26 +419,16 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
@Override
protected PropertyDescriptor getSupportedDynamicPropertyDescriptor(final String propertyDescriptorName) {
+ if (PARTITION_STRATEGY.getName().equals(propertyDescriptorName)) {
+ return PARTITION_STRATEGY;
+ }
+
return new PropertyDescriptor.Builder()
.description("Specifies the value for '" + propertyDescriptorName + "' Kafka Configuration.")
.name(propertyDescriptorName).addValidator(StandardValidators.NON_EMPTY_VALIDATOR).dynamic(true)
.build();
}
- @Override
- protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
- final List<ValidationResult> results = new ArrayList<>();
-
- final String partitionStrategy = validationContext.getProperty(PARTITION_STRATEGY).getValue();
- if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())
- && !validationContext.getProperty(PARTITION).isSet()) {
- results.add(new ValidationResult.Builder().subject("Partition").valid(false)
- .explanation("The <Partition> property must be set when configured to use the User-Defined Partitioning Strategy")
- .build());
- }
- return results;
- }
-
/**
*
*/
@@ -442,15 +449,11 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
*
*/
private Integer determinePartition(ProcessContext context, FlowFile flowFile) {
- String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
- Integer partitionValue = null;
- if (partitionStrategy.equalsIgnoreCase(USER_DEFINED_PARTITIONING.getValue())) {
- String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
- if (pv != null){
- partitionValue = Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
- }
+ String pv = context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue();
+ if (pv != null){
+ return Integer.parseInt(context.getProperty(PARTITION).evaluateAttributeExpressions(flowFile).getValue());
}
- return partitionValue;
+ return null;
}
/**
@@ -496,19 +499,13 @@ public class PutKafka extends AbstractKafkaProcessor<KafkaPublisher> {
properties.setProperty("timeout.ms", timeout);
properties.setProperty("metadata.fetch.timeout.ms", timeout);
- String partitionStrategy = context.getProperty(PARTITION_STRATEGY).getValue();
- String partitionerClass = null;
- if (partitionStrategy.equalsIgnoreCase(ROUND_ROBIN_PARTITIONING.getValue())) {
- partitionerClass = Partitioners.RoundRobinPartitioner.class.getName();
- } else if (partitionStrategy.equalsIgnoreCase(RANDOM_PARTITIONING.getValue())) {
- partitionerClass = Partitioners.RandomPartitioner.class.getName();
- }
- properties.setProperty("partitioner.class", partitionerClass);
-
// Set Dynamic Properties
for (final Entry<PropertyDescriptor, String> entry : context.getProperties().entrySet()) {
PropertyDescriptor descriptor = entry.getKey();
if (descriptor.isDynamic()) {
+ if (PARTITION_STRATEGY.equals(descriptor)) {
+ continue;
+ }
if (properties.containsKey(descriptor.getName())) {
this.getLogger().warn("Overriding existing property '" + descriptor.getName() + "' which had value of '"
+ properties.getProperty(descriptor.getName()) + "' with dynamically set value '"
http://git-wip-us.apache.org/repos/asf/nifi/blob/008bffd9/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
index fbd2963..77b2bb9 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/test/java/org/apache/nifi/processors/kafka/PutKafkaTest.java
@@ -18,6 +18,7 @@ package org.apache.nifi.processors.kafka;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
import static org.junit.Assert.fail;
import java.nio.charset.StandardCharsets;
@@ -89,7 +90,7 @@ public class PutKafkaTest {
}
@Test
- public void validateMultiCharacterDelimiyedMessages() {
+ public void validateMultiCharacterDelimitedMessages() {
String topicName = "validateMultiCharacterDemarcatedMessagesAndCustomPartitioner";
PutKafka putKafka = new PutKafka();
TestRunner runner = TestRunners.newTestRunner(putKafka);
@@ -210,6 +211,62 @@ public class PutKafkaTest {
runner.shutdown();
}
+ @Test
+ public void validateDeprecatedPartitionStrategy() {
+ String topicName = "validateDeprecatedPartitionStrategy";
+ PutKafka putKafka = new PutKafka();
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PutKafka.TOPIC, topicName);
+ runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+ runner.setProperty(PutKafka.KEY, "key1");
+ runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
+ runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+
+ // Old configuration using deprecated property still work.
+ runner.setProperty(PutKafka.PARTITION_STRATEGY, PutKafka.USER_DEFINED_PARTITIONING);
+ runner.setProperty(PutKafka.PARTITION, "${partition}");
+
+ runner.assertValid();
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("partition", "0");
+ runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
+ runner.run(1, false);
+
+ runner.assertAllFlowFilesTransferred(PutKafka.REL_SUCCESS, 1);
+ ConsumerIterator<byte[], byte[]> consumer = this.buildConsumer(topicName);
+ assertEquals("Hello World", new String(consumer.next().message(), StandardCharsets.UTF_8));
+ assertEquals("Goodbye", new String(consumer.next().message(), StandardCharsets.UTF_8));
+
+ runner.shutdown();
+ }
+
+ @Test
+ public void validatePartitionOutOfBounds() {
+ String topicName = "validatePartitionOutOfBounds";
+ PutKafka putKafka = new PutKafka();
+ TestRunner runner = TestRunners.newTestRunner(putKafka);
+ runner.setProperty(PutKafka.TOPIC, topicName);
+ runner.setProperty(PutKafka.CLIENT_NAME, "foo");
+ runner.setProperty(PutKafka.KEY, "key1");
+ runner.setProperty(PutKafka.SEED_BROKERS, "localhost:" + kafkaLocal.getKafkaPort());
+ runner.setProperty(PutKafka.MESSAGE_DELIMITER, "\n");
+ runner.setProperty(PutKafka.PARTITION, "${partition}");
+
+ runner.assertValid();
+
+ final Map<String, String> attributes = new HashMap<>();
+ attributes.put("partition", "123");
+ runner.enqueue("Hello World\nGoodbye".getBytes(StandardCharsets.UTF_8), attributes);
+ runner.run(1, false);
+
+ assertTrue("Error message should be logged", runner.getLogger().getErrorMessages().size() > 0);
+ runner.assertTransferCount(PutKafka.REL_SUCCESS, 0);
+ runner.assertTransferCount(PutKafka.REL_FAILURE, 1);
+
+ runner.shutdown();
+ }
+
private ConsumerIterator<byte[], byte[]> buildConsumer(String topic) {
Properties props = new Properties();
props.put("zookeeper.connect", "0.0.0.0:" + kafkaLocal.getZookeeperPort());