You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by oz...@apache.org on 2016/04/20 16:33:31 UTC

nifi git commit: NIFI-1672 Improved the Provenance Events emitted by PutKafka This closes #355

Repository: nifi
Updated Branches:
  refs/heads/master dd8c26e35 -> 3d6e66409


NIFI-1672 Improved the Provenance Events emitted by PutKafka
This closes #355


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/3d6e6640
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/3d6e6640
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/3d6e6640

Branch: refs/heads/master
Commit: 3d6e6640972f62bb748c0c741fc74e1f7835a920
Parents: dd8c26e
Author: Pierre Villard <pi...@gmail.com>
Authored: Fri Apr 15 15:25:28 2016 +0200
Committer: Oleg Zhurakousky <ol...@suitcase.io>
Committed: Wed Apr 20 10:33:03 2016 -0400

----------------------------------------------------------------------
 .../nifi/processors/kafka/KafkaPublisher.java   | 28 ++++++++++++++++++--
 .../apache/nifi/processors/kafka/PutKafka.java  | 25 +++++++++++++----
 2 files changed, 46 insertions(+), 7 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/3d6e6640/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
index bcf10a4..afb2cc6 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/KafkaPublisher.java
@@ -119,6 +119,30 @@ class KafkaPublisher implements AutoCloseable {
      */
     BitSet publish(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey,
             int maxBufferSize) {
+        List<Future<RecordMetadata>> sendFutures = this.split(messageContext, contentStream, partitionKey, maxBufferSize);
+        return this.publish(sendFutures);
+    }
+
+    /**
+     * This method splits (if required) the incoming content stream into
+     * messages to publish to Kafka topic. See publish method for more
+     * details
+     *
+     * @param messageContext
+     *            instance of {@link SplittableMessageContext} which hold
+     *            context information about the message to be sent
+     * @param contentStream
+     *            instance of open {@link InputStream} carrying the content of
+     *            the message(s) to be send to Kafka
+     * @param partitionKey
+     *            the value of the partition key. Only relevant is user wishes
+     *            to provide a custom partition key instead of relying on
+     *            variety of provided {@link Partitioner}(s)
+     * @param maxBufferSize maximum message size
+     * @return The list of messages to publish
+     */
+    List<Future<RecordMetadata>> split(SplittableMessageContext messageContext, InputStream contentStream, Integer partitionKey,
+            int maxBufferSize) {
         List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
         BitSet prevFailedSegmentIndexes = messageContext.getFailedSegments();
         int segmentCounter = 0;
@@ -139,13 +163,13 @@ class KafkaPublisher implements AutoCloseable {
                 segmentCounter++;
             }
         }
-        return this.processAcks(sendFutures);
+        return sendFutures;
     }
 
     /**
      *
      */
-    private BitSet processAcks(List<Future<RecordMetadata>> sendFutures) {
+    BitSet publish(List<Future<RecordMetadata>> sendFutures) {
         int segmentCounter = 0;
         BitSet failedSegments = new BitSet();
         for (Future<RecordMetadata> future : sendFutures) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/3d6e6640/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
index 3b5eb4f..2cf0245 100644
--- a/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
+++ b/nifi-nar-bundles/nifi-kafka-bundle/nifi-kafka-processors/src/main/java/org/apache/nifi/processors/kafka/PutKafka.java
@@ -30,10 +30,12 @@ import java.util.Map;
 import java.util.Map.Entry;
 import java.util.Properties;
 import java.util.Set;
+import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicReference;
 import java.util.regex.Pattern;
 
+import org.apache.kafka.clients.producer.RecordMetadata;
 import org.apache.nifi.annotation.behavior.DynamicProperty;
 import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
@@ -54,6 +56,7 @@ import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.exception.ProcessException;
 import org.apache.nifi.processor.io.InputStreamCallback;
 import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StopWatch;
 
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @Tags({ "Apache", "Kafka", "Put", "Send", "Message", "PubSub" })
@@ -159,9 +162,9 @@ public class PutKafka extends AbstractProcessor {
                             + "If not specified, the entire content of the FlowFile will be used as a single message. If specified, "
                             + "the contents of the FlowFile will be split on this delimiter and each section sent as a separate Kafka "
                             + "message. Note that if messages are delimited and some messages for a given FlowFile are transferred "
-                            + "successfully while others are not, the messages will be split into individual FlowFiles, such that those "
-                            + "messages that were successfully sent are routed to the 'success' relationship while other messages are "
-                            + "sent to the 'failure' relationship.")
+                            + "successfully while others are not, the FlowFile will be transferred to the 'failure' relationship. In "
+                            + "case the FlowFile is sent back to this processor, only the messages not previously transferred "
+                            + "successfully will be handled by the processor to be retransferred to Kafka.")
             .required(false)
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .expressionLanguageSupported(true)
@@ -292,19 +295,31 @@ public class PutKafka extends AbstractProcessor {
             final SplittableMessageContext messageContext = this.buildMessageContext(flowFile, context, session);
             final Integer partitionKey = this.determinePartition(messageContext, context, flowFile);
             final AtomicReference<BitSet> failedSegmentsRef = new AtomicReference<BitSet>();
+            final List<Future<RecordMetadata>> sendFutures = new ArrayList<>();
+
+            StopWatch timer = new StopWatch(true);
             session.read(flowFile, new InputStreamCallback() {
                 @Override
                 public void process(InputStream contentStream) throws IOException {
                     int maxRecordSize = context.getProperty(MAX_RECORD_SIZE).asDataSize(DataUnit.B).intValue();
-                    failedSegmentsRef.set(kafkaPublisher.publish(messageContext, contentStream, partitionKey, maxRecordSize));
+                    sendFutures.addAll(kafkaPublisher.split(messageContext, contentStream, partitionKey, maxRecordSize));
+                    failedSegmentsRef.set(kafkaPublisher.publish(sendFutures));
                 }
             });
+            timer.stop();
 
+            final long duration = timer.getDuration(TimeUnit.MILLISECONDS);
+            final int messagesToSend = sendFutures.size();
+            final int messagesSent = messagesToSend - failedSegmentsRef.get().cardinality();
+            final String details = messagesSent + " message(s) over " + messagesToSend + " sent successfully";
             if (failedSegmentsRef.get().isEmpty()) {
-                session.getProvenanceReporter().send(flowFile, context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName());
+                session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration);
                 flowFile = this.cleanUpFlowFileIfNecessary(flowFile, session);
                 session.transfer(flowFile, REL_SUCCESS);
             } else {
+                if(messagesSent != 0) {
+                    session.getProvenanceReporter().send(flowFile, "kafka://" + context.getProperty(SEED_BROKERS).getValue() + "/" + messageContext.getTopicName(), details, duration);
+                }
                 flowFile = session.putAllAttributes(flowFile, this.buildFailedFlowFileAttributes(failedSegmentsRef.get(), messageContext));
                 session.transfer(session.penalize(flowFile), REL_FAILURE);
             }