You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/20 16:33:31 UTC
nifi git commit: NIFI-1672 Improved the Provenance Events emitted by
PutKafka This closes #355
Repository: nifi
Updated Branches:
refs/heads/master dd8c26e35 -> 3d6e66409
NIFI-1672 Improved the Provenance Events emitted by PutKafka
This closes #355
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d6e6640
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d6e6640
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d6e6640
Branch: refs/heads/master
Commit: 3d6e6640972f62bb748c0c741fc74e1f7835a920
Parents: dd8c26e
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Apr 15 15:25:28 2016 +0200
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Wed Apr 20 10:33:03 2016 -0400
----------------------------------------------------------------------
.../nifi/processors/kafka/KafkaPublisher.java | 28 ++++++++++++++++++--
.../apache/nifi/processors/kafka/PutKafka.java | 25 +++++++++++++----
2 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/3d6e6640/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index bcf10a4..afb2cc6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -119,6 +119,30 @@ class KafkaPublisher implements AutoCloseable {
*/
BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey,
int maxBufferSize) {
+ List<Future<RecordMetadata>> sendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize);
+ return this.publish(sendFutures);
+ }
+
+ /**
+ * This method splits (if required) the incoming content stream into
+ * messages to publish to Kafka topic. See publish method for more
+ * details
+ *
+ * @param messageContext
+ * instance of {@link SplittableMessageContext} which hold
+ * context information about the message to be sent
+ * @param contentStream
+ * instance of open {@link InputStream} carrying the content of
+ * the message(s) to be send to Kafka
+ * @param partitionKey
+ * the value of the partition key. Only relevant is user wishes
+ * to provide a custom partition key instead of relying on
+ * variety of provided {@link Partitioner}(s)
+ * @param maxBufferSize maximum message size
+ * @return The list of messages to publish
+ */
+ List<Future<RecordMetadata>> split(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey,
+ int maxBufferSize) {
List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments();
int segmentCounter = 0;
@@ -139,13 +163,13 @@ class KafkaPublisher implements AutoCloseable {
segmentCounter++;
}
}
- return this.processAcks(sendFutures);
+ return sendFutures;
}
/**
*
*/
- private BitSet processAcks(List<Future<RecordMetadata>> sendFutures) {
+ BitSet publish(List<Future<RecordMetadata>> sendFutures) {
int segmentCounter = 0;
BitSet failedSegments = new BitSet();
for (Future<RecordMetadata> future : sendFutures) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/3d6e6640/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 3b5eb4f..2cf0245 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -30,10 +30,12 @@ import java.util.Map;
import java.util.Map.Entry;
import java.util.Properties;
import java.util.Set;
+import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicReference;
import java.util.regex.Pattern;
+import org.apache.kafka.clients.producer.RecordMetadata;
import org.apache.nifi.annotation.behavior.DynamicProperty;
import org.apache.nifi.annotation.behavior.InputRequirement;
import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -54,6 +56,7 @@ import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.exception.ProcessException;
import org.apache.nifi.processor.io.InputStreamCallback;
import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
@InputRequirement(Requirement.INPUT_REQUIRED)
@Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@@ -159,9 +162,9 @@ public class PutKafka extends AbstractProcessor {
+ "If not specified, the entire content of the FlowFile will be used as a single message. If specified, "
+ "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka "
+ "message. Note that if messages are delimited and some messages for a given FlowFile are transferred "
- + "successfully while others are not, the messages will be split into individual FlowFiles, such that those "
- + "messages that were successfully sent are routed to the 'success' relationship while other messages are "
- + "sent to the 'failure' relationship.")
+ + "successfully while others are not, the FlowFile will be transferred to the 'failure' relationship. In "
+ + "case the FlowFile is sent back to this processor, only the messages not previously transferred "
+ + "successfully will be handled by the processor to be retransferred to Kafka.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.expressionLanguageSupported(true)
@@ -292,19 +295,31 @@ public class PutKafka extends AbstractProcessor {
final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session);
final Integer partitionKey = this.determinePartition(messageContext, context, flowFile);
final AtomicReference<BitSet> failedSegmentsRef = new AtomicReference<BitSet>();
+ final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
+
+ StopWatch timer = new StopWatch(true);
session.read(flowFile, new InputStreamCallback() {
@Override
public void process(InputStream contentStream) throws IOException {
int maxRecordSize = context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).intValue();
- failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey, maxRecordSize));
+ sendFutures.addAll(kafkaPublisher.split(messageContext, contentStream, partitionKey, maxRecordSize));
+ failedSegmentsRef.set(kafkaPublisher.publish(sendFutures));
}
});
+ timer.stop();
+ final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
+ final int messagesToSend = sendFutures.size();
+ final int messagesSent = messagesToSend - failedSegmentsRef.get().cardinality();
+ final String details = messagesSent + " message(s) over " + messagesToSend + " sent successfully";
if (failedSegmentsRef.get().isEmpty()) {
- session.getProvenanceReporter().send(flowFile, context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName());
+ session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration);
flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session);
session.transfer(flowFile, REL_SUCCESS);
} else {
+ if(messagesSent != 0) {
+ session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration);
+ }
flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext));
session.transfer(session.penalize(flowFile), REL_FAILURE);
}