You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by pv...@apache.org on 2016/09/22 19:46:43 UTC

nifi git commit: NIFI-2805: Add fragment attributes to SplitAvro

Repository: nifi
Updated Branches:
  refs/heads/master 937dc71ae -> 49297b725


NIFI-2805: Add fragment attributes to SplitAvro

This closes #1046.


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/49297b72
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/49297b72
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/49297b72

Branch: refs/heads/master
Commit: 49297b725d4770040c2cc2c01514ef49b2680c88
Parents: 937dc71
Author: Matt Burgess <ma...@apache.org>
Authored: Thu Sep 22 11:00:11 2016 -0400
Committer: Pierre Villard <pi...@gmail.com>
Committed: Thu Sep 22 21:46:23 2016 +0200

----------------------------------------------------------------------
 .../apache/nifi/processors/avro/SplitAvro.java  | 26 ++++++++++++++--
 .../nifi/processors/avro/TestSplitAvro.java     | 31 +++++++++++++++-----
 2 files changed, 47 insertions(+), 10 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/49297b72/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
index ac6936f..e3eb6ec 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/main/java/org/apache/nifi/processors/avro/SplitAvro.java
@@ -27,7 +27,9 @@ import java.util.HashSet;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
+import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
+import java.util.stream.IntStream;
 
 import org.apache.avro.file.CodecFactory;
 import org.apache.avro.file.DataFileConstants;
@@ -43,11 +45,14 @@ import org.apache.nifi.annotation.behavior.InputRequirement;
 import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
 import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.processor.AbstractProcessor;
 import org.apache.nifi.processor.ProcessContext;
 import org.apache.nifi.processor.ProcessSession;
@@ -65,6 +70,15 @@ import org.apache.nifi.stream.io.BufferedOutputStream;
 @InputRequirement(Requirement.INPUT_REQUIRED)
 @CapabilityDescription("Splits a binary encoded Avro datafile into smaller files based on the configured Output Size. The Output Strategy determines if " +
         "the smaller files will be Avro datafiles, or bare Avro records with metadata in the FlowFile attributes. The output will always be binary encoded.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "fragment.identifier",
+                description = "All split FlowFiles produced from the same parent FlowFile will have the same randomly generated UUID added for this attribute"),
+        @WritesAttribute(attribute = "fragment.index",
+                description = "A one-up number that indicates the ordering of the split FlowFiles that were created from a single parent FlowFile"),
+        @WritesAttribute(attribute = "fragment.count",
+                description = "The number of split FlowFiles generated from the parent FlowFile"),
+        @WritesAttribute(attribute = "segment.original.filename ", description = "The filename of the parent FlowFile")
+})
 public class SplitAvro extends AbstractProcessor {
 
     public static final String RECORD_SPLIT_VALUE = "Record";
@@ -200,10 +214,18 @@ public class SplitAvro extends AbstractProcessor {
 
         try {
             final List<FlowFile> splits = splitter.split(session, flowFile, splitWriter);
-            session.transfer(splits, REL_SPLIT);
+            final String fragmentIdentifier = UUID.randomUUID().toString();
+            IntStream.range(0, splits.size()).forEach((i) -> {
+                FlowFile split = splits.get(i);
+                split = session.putAttribute(split, "fragment.identifier", fragmentIdentifier);
+                split = session.putAttribute(split, "fragment.index", Integer.toString(i));
+                split = session.putAttribute(split, "segment.original.filename", flowFile.getAttribute(CoreAttributes.FILENAME.key()));
+                split = session.putAttribute(split, "fragment.count", Integer.toString(splits.size()));
+                session.transfer(split, REL_SPLIT);
+            });
             session.transfer(flowFile, REL_ORIGINAL);
         } catch (ProcessException e) {
-            getLogger().error("Failed to split {} due to {}", new Object[] {flowFile, e.getMessage()}, e);
+            getLogger().error("Failed to split {} due to {}", new Object[]{flowFile, e.getMessage()}, e);
             session.transfer(flowFile, REL_FAILURE);
         }
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/49297b72/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
index 73da818..32d43e3 100644
--- a/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
+++ b/nifi-nar-bundles/nifi-avro-bundle/nifi-avro-processors/src/test/java/org/apache/nifi/processors/avro/TestSplitAvro.java
@@ -26,6 +26,7 @@ import org.apache.avro.generic.GenericRecord;
 import org.apache.avro.io.DatumReader;
 import org.apache.avro.io.Decoder;
 import org.apache.avro.io.DecoderFactory;
+import org.apache.nifi.flowfile.attributes.CoreAttributes;
 import org.apache.nifi.stream.io.ByteArrayInputStream;
 import org.apache.nifi.stream.io.ByteArrayOutputStream;
 import org.apache.nifi.util.MockFlowFile;
@@ -39,7 +40,11 @@ import java.io.EOFException;
 import java.io.File;
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.stream.IntStream;
+
+import static org.junit.Assert.assertEquals;
 
 public class TestSplitAvro {
 
@@ -102,15 +107,25 @@ public class TestSplitAvro {
     public void testRecordSplitDatafileOutputWithSingleRecords() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new SplitAvro());
 
-        runner.enqueue(users.toByteArray());
+        final String filename = "users.avro";
+        runner.enqueue(users.toByteArray(), new HashMap<String,String>() {{
+            put(CoreAttributes.FILENAME.key(), filename);
+        }});
         runner.run();
 
         runner.assertTransferCount(SplitAvro.REL_SPLIT, 100);
         runner.assertTransferCount(SplitAvro.REL_ORIGINAL, 1);
         runner.assertTransferCount(SplitAvro.REL_FAILURE, 0);
-
         final List<MockFlowFile> flowFiles = runner.getFlowFilesForRelationship(SplitAvro.REL_SPLIT);
         checkDataFileSplitSize(flowFiles, 1, true);
+        final String fragmentIdentifier = flowFiles.get(0).getAttribute("fragment.identifier");
+        IntStream.range(0, flowFiles.size()).forEach((i) -> {
+            MockFlowFile flowFile = flowFiles.get(i);
+            assertEquals(i, Integer.parseInt(flowFile.getAttribute("fragment.index")));
+            assertEquals(fragmentIdentifier, flowFile.getAttribute("fragment.identifier"));
+            assertEquals(flowFiles.size(), Integer.parseInt(flowFile.getAttribute("fragment.count")));
+            assertEquals(filename, flowFile.getAttribute("segment.original.filename"));
+        });
     }
 
     @Test
@@ -261,7 +276,7 @@ public class TestSplitAvro {
                 } catch (EOFException eof) {
                     // expected
                 }
-                Assert.assertEquals(expectedRecordsPerSplit, count);
+                assertEquals(expectedRecordsPerSplit, count);
             }
 
             if (checkMetadata) {
@@ -285,12 +300,12 @@ public class TestSplitAvro {
                     Assert.assertNotNull(record.get("favorite_number"));
                     count++;
                 }
-                Assert.assertEquals(expectedRecordsPerSplit, count);
+                assertEquals(expectedRecordsPerSplit, count);
 
                 if (checkMetadata) {
-                    Assert.assertEquals(META_VALUE1, reader.getMetaString(META_KEY1));
-                    Assert.assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2));
-                    Assert.assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8"));
+                    assertEquals(META_VALUE1, reader.getMetaString(META_KEY1));
+                    assertEquals(META_VALUE2, reader.getMetaLong(META_KEY2));
+                    assertEquals(META_VALUE3, new String(reader.getMeta(META_KEY3), "UTF-8"));
                 }
             }
         }
@@ -311,7 +326,7 @@ public class TestSplitAvro {
                 }
             }
         }
-        Assert.assertEquals(expectedTotalRecords, count);
+        assertEquals(expectedTotalRecords, count);
     }
 
 }