You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/12/18 06:08:39 UTC

[nifi] branch master updated: NIFI-3988: Add fragment attributes to SplitRecord

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7548d7c  NIFI-3988: Add fragment attributes to SplitRecord
7548d7c is described below

commit 7548d7c85914d9c64efeccce62d61de522170286
Author: Matthew Burgess <ma...@apache.org>
AuthorDate: Thu Dec 13 14:02:14 2018 -0500

    NIFI-3988: Add fragment attributes to SplitRecord
    
    This closes #3217.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../nifi/processors/standard/SplitRecord.java      | 25 ++++++++++++++++++++--
 .../nifi/processors/standard/TestSplitRecord.java  | 15 ++++++++++++-
 2 files changed, 37 insertions(+), 3 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
index 2a5679d..d9e7bb5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitRecord.java
@@ -26,6 +26,7 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.InputRequirement;
@@ -40,6 +41,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.flowfile.attributes.CoreAttributes;
+import org.apache.nifi.flowfile.attributes.FragmentAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -66,11 +68,20 @@ import org.apache.nifi.serialization.record.RecordSet;
 @Tags({"split", "generic", "schema", "json", "csv", "avro", "log", "logs", "freeform", "text"})
 @WritesAttributes({
     @WritesAttribute(attribute = "mime.type", description = "Sets the mime.type attribute to the MIME Type specified by the Record Writer for the FlowFiles routed to the 'splits' Relationship."),
-    @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship.")
+    @WritesAttribute(attribute = "record.count", description = "The number of records in the FlowFile. This is added to FlowFiles that are routed to the 'splits' Relationship."),
+    @WritesAttribute(attribute = "fragment.identifier", description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
+    @WritesAttribute(attribute = "fragment.index", description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
+    @WritesAttribute(attribute = "fragment.count", description = "The number of split FlowFiles generated from the parent FlowFile"),
+    @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")
 })
 @CapabilityDescription("Splits up an input FlowFile that is in a record-oriented data format into multiple smaller FlowFiles")
 public class SplitRecord extends AbstractProcessor {
 
+    public static final String FRAGMENT_ID = FragmentAttributes.FRAGMENT_ID.key();
+    public static final String FRAGMENT_INDEX = FragmentAttributes.FRAGMENT_INDEX.key();
+    public static final String FRAGMENT_COUNT = FragmentAttributes.FRAGMENT_COUNT.key();
+    public static final String SEGMENT_ORIGINAL_FILENAME = FragmentAttributes.SEGMENT_ORIGINAL_FILENAME.key();
+
     static final PropertyDescriptor RECORD_READER = new PropertyDescriptor.Builder()
         .name("Record Reader")
         .description("Specifies the Controller Service to use for reading incoming data")
@@ -125,7 +136,7 @@ public class SplitRecord extends AbstractProcessor {
 
     @Override
     public void onTrigger(final ProcessContext context, final ProcessSession session) throws ProcessException {
-        FlowFile original = session.get();
+        final FlowFile original = session.get();
         if (original == null) {
             return;
         }
@@ -137,6 +148,7 @@ public class SplitRecord extends AbstractProcessor {
 
         final List<FlowFile> splits = new ArrayList<>();
         final Map<String, String> originalAttributes = original.getAttributes();
+        final String fragmentId = UUID.randomUUID().toString();
         try {
             session.read(original, new InputStreamCallback() {
                 @Override
@@ -148,6 +160,7 @@ public class SplitRecord extends AbstractProcessor {
                         final RecordSet recordSet = reader.createRecordSet();
                         final PushBackRecordSet pushbackSet = new PushBackRecordSet(recordSet);
 
+                        int fragmentIndex = 0;
                         while (pushbackSet.isAnotherRecord()) {
                             FlowFile split = session.create(original);
 
@@ -167,6 +180,9 @@ public class SplitRecord extends AbstractProcessor {
 
                                         attributes.put("record.count", String.valueOf(writeResult.getRecordCount()));
                                         attributes.put(CoreAttributes.MIME_TYPE.key(), writer.getMimeType());
+                                        attributes.put(FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+                                        attributes.put(FRAGMENT_ID, fragmentId);
+                                        attributes.put(SEGMENT_ORIGINAL_FILENAME, original.getAttribute(CoreAttributes.FILENAME.key()));
                                         attributes.putAll(writeResult.getAttributes());
 
                                         session.adjustCounter("Records Split", writeResult.getRecordCount(), false);
@@ -176,6 +192,7 @@ public class SplitRecord extends AbstractProcessor {
                             } finally {
                                 splits.add(split);
                             }
+                            fragmentIndex++;
                         }
                     } catch (final SchemaNotFoundException | MalformedRecordException e) {
                         throw new ProcessException("Failed to parse incoming data", e);
@@ -190,6 +207,10 @@ public class SplitRecord extends AbstractProcessor {
         }
 
         session.transfer(original, REL_ORIGINAL);
+        // Add the fragment count to each split
+        for(FlowFile split : splits) {
+            session.putAttribute(split, FRAGMENT_COUNT, String.valueOf(splits.size()));
+        }
         session.transfer(splits, REL_SPLITS);
         getLogger().info("Successfully split {} into {} FlowFiles, each containing up to {} records", new Object[] {original, splits.size(), maxRecords});
     }
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
index e2f5005..6ce1165 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitRecord.java
@@ -17,6 +17,7 @@
 
 package org.apache.nifi.processors.standard;
 
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.serialization.record.MockRecordParser;
 import org.apache.nifi.serialization.record.MockRecordWriter;
@@ -55,7 +56,7 @@ public class TestSplitRecord {
         readerService.addRecord("Jane Doe", 47);
         readerService.addRecord("Jimmy Doe", 14);
 
-        runner.enqueue("");
+        final MockFlowFile inputFlowFile = runner.enqueue("");
         runner.run();
 
         runner.assertTransferCount(SplitRecord.REL_SPLITS, 3);
@@ -63,11 +64,23 @@ public class TestSplitRecord {
         runner.assertTransferCount(SplitRecord.REL_FAILURE, 0);
         final List<MockFlowFile> out = runner.getFlowFilesForRelationship(SplitRecord.REL_SPLITS);
 
+        int fragmentIndex = 0;
+        String fragmentUUID = null;
         for (final MockFlowFile mff : out) {
+            if (fragmentUUID == null) {
+                fragmentUUID = mff.getAttribute(SplitRecord.FRAGMENT_ID);
+            } else {
+                mff.assertAttributeEquals(SplitRecord.FRAGMENT_ID, fragmentUUID);
+            }
+            mff.assertAttributeEquals(SplitRecord.FRAGMENT_COUNT, "3");
+            mff.assertAttributeEquals(SplitRecord.FRAGMENT_INDEX, String.valueOf(fragmentIndex));
+            mff.assertAttributeEquals(SplitRecord.SEGMENT_ORIGINAL_FILENAME, inputFlowFile.getAttribute(CoreAttributes.FILENAME.key()));
             mff.assertAttributeEquals("record.count", "1");
             mff.assertAttributeEquals("mime.type", "text/plain");
+            fragmentIndex++;
         }
 
+
         assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJohn Doe,48\n")).count());
         assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJane Doe,47\n")).count());
         assertEquals(1, out.stream().filter(mff -> mff.isContentEqual("header\nJimmy Doe,14\n")).count());