You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/01 05:09:25 UTC

incubator-nifi git commit: NIFI-478: Fixed bug that caused byte sequence to be dropped for last split under certain circumstances; added new unit tests

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop e9647717e -> 4752e284f


NIFI-478: Fixed bug that caused byte sequence to be dropped for last split under certain circumstances; added new unit tests

Signed-off-by: joewitt <jo...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4752e284
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4752e284
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4752e284

Branch: refs/heads/develop
Commit: 4752e284f6b4c4d3ae082af6b6f66d393c2a1ad3
Parents: e964771
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Mar 31 08:34:23 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Mar 31 23:03:25 2015 -0400

----------------------------------------------------------------------
 .../nifi/processors/standard/SplitContent.java  |  93 +++++++++++---
 .../processors/standard/TestSplitContent.java   | 123 +++++++++++++++++++
 2 files changed, 202 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4752e284/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
index 9838af7..419e12d 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
@@ -18,7 +18,9 @@ package org.apache.nifi.processors.standard;
 
 import java.io.IOException;
 import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.Collections;
 import java.util.HashMap;
 import java.util.HashSet;
@@ -28,6 +30,7 @@ import java.util.Set;
 import java.util.UUID;
 import java.util.concurrent.atomic.AtomicReference;
 
+import org.apache.commons.codec.DecoderException;
 import org.apache.commons.codec.binary.Hex;
 import org.apache.nifi.annotation.behavior.EventDriven;
 import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -35,8 +38,10 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.SeeAlso;
 import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.annotation.behavior.WritesAttribute;
 import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.AllowableValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.ValidationContext;
 import org.apache.nifi.components.ValidationResult;
@@ -50,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession;
 import org.apache.nifi.processor.ProcessorInitializationContext;
 import org.apache.nifi.processor.Relationship;
 import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
 import org.apache.nifi.stream.io.BufferedInputStream;
 import org.apache.nifi.util.NaiveSearchRingBuffer;
 import org.apache.nifi.util.Tuple;
@@ -72,19 +78,39 @@ public class SplitContent extends AbstractProcessor {
     public static final String FRAGMENT_COUNT = "fragment.count";
     public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
 
+    static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes");
+    static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text");
+    
+    static final AllowableValue TRAILING_POSITION = new AllowableValue("Trailing", "Trailing", "Keep the Byte Sequence at the end of the first split if <Keep Byte Sequence> is true");
+    static final AllowableValue LEADING_POSITION = new AllowableValue("Leading", "Leading", "Keep the Byte Sequence at the beginning of the second split if <Keep Byte Sequence> is true");
+    
+    public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
+            .name("Byte Sequence Format")
+            .description("Specifies how the <Byte Sequence> property should be interpreted")
+            .required(true)
+            .allowableValues(HEX_FORMAT, UTF8_FORMAT)
+            .defaultValue(HEX_FORMAT.getValue())
+            .build();
     public static final PropertyDescriptor BYTE_SEQUENCE = new PropertyDescriptor.Builder()
             .name("Byte Sequence")
-            .description("A hex representation of bytes to look for and upon which to split the source file into separate files")
-            .addValidator(new HexStringPropertyValidator())
+            .description("A representation of bytes to look for and upon which to split the source file into separate files")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
             .required(true)
             .build();
     public static final PropertyDescriptor KEEP_SEQUENCE = new PropertyDescriptor.Builder()
             .name("Keep Byte Sequence")
-            .description("Determines whether or not the Byte Sequence should be included at the end of each Split")
+            .description("Determines whether or not the Byte Sequence should be included with each Split")
             .required(true)
             .allowableValues("true", "false")
             .defaultValue("false")
             .build();
+    public static final PropertyDescriptor BYTE_SEQUENCE_LOCATION = new PropertyDescriptor.Builder()
+            .name("Byte Sequence Location")
+            .description("If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; if <Keep Byte Sequence> is false, this property is ignored.")
+            .required(true)
+            .allowableValues(TRAILING_POSITION, LEADING_POSITION)
+            .defaultValue(TRAILING_POSITION.getValue())
+            .build();
 
     public static final Relationship REL_SPLITS = new Relationship.Builder()
             .name("splits")
@@ -108,8 +134,10 @@ public class SplitContent extends AbstractProcessor {
         this.relationships = Collections.unmodifiableSet(relationships);
 
         final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(FORMAT);
         properties.add(BYTE_SEQUENCE);
         properties.add(KEEP_SEQUENCE);
+        properties.add(BYTE_SEQUENCE_LOCATION);
         this.properties = Collections.unmodifiableList(properties);
     }
 
@@ -124,13 +152,27 @@ public class SplitContent extends AbstractProcessor {
     }
 
     @Override
-    public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
-        if (descriptor.equals(BYTE_SEQUENCE)) {
-            try {
-                this.byteSequence.set(Hex.decodeHex(newValue.toCharArray()));
-            } catch (final Exception e) {
-                this.byteSequence.set(null);
-            }
+    protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+        final List<ValidationResult> results = new ArrayList<>(1);
+        final String format = validationContext.getProperty(FORMAT).getValue();
+        if ( HEX_FORMAT.getValue().equals(format) ) {
+            final String byteSequence = validationContext.getProperty(BYTE_SEQUENCE).getValue();
+            final ValidationResult result = new HexStringPropertyValidator().validate(BYTE_SEQUENCE.getName(), byteSequence, validationContext);
+            results.add(result);
+        }
+        return results;
+    }
+    
+
+    @OnScheduled
+    public void initializeByteSequence(final ProcessContext context) throws DecoderException {
+        final String bytePattern = context.getProperty(BYTE_SEQUENCE).getValue();
+        
+        final String format = context.getProperty(FORMAT).getValue();
+        if ( HEX_FORMAT.getValue().equals(format) ) {
+            this.byteSequence.set(Hex.decodeHex(bytePattern.toCharArray()));
+        } else {
+            this.byteSequence.set(bytePattern.getBytes(StandardCharsets.UTF_8));
         }
     }
 
@@ -143,6 +185,21 @@ public class SplitContent extends AbstractProcessor {
 
         final ProcessorLog logger = getLogger();
         final boolean keepSequence = context.getProperty(KEEP_SEQUENCE).asBoolean();
+        final boolean keepTrailingSequence;
+        final boolean keepLeadingSequence;
+        if ( keepSequence ) {
+            if ( context.getProperty(BYTE_SEQUENCE_LOCATION).getValue().equals(TRAILING_POSITION.getValue()) ) {
+                keepTrailingSequence = true;
+                keepLeadingSequence = false;
+            } else {
+                keepTrailingSequence = false;
+                keepLeadingSequence = true;
+            }
+        } else {
+            keepTrailingSequence = false;
+            keepLeadingSequence = false;
+        }
+        
         final byte[] byteSequence = this.byteSequence.get();
         if (byteSequence == null) {   // should never happen. But just in case...
             logger.error("{} Unable to obtain Byte Sequence", new Object[]{this});
@@ -169,15 +226,20 @@ public class SplitContent extends AbstractProcessor {
                         bytesRead++;
                         boolean matched = buffer.addAndCompare((byte) (nextByte & 0xFF));
                         if (matched) {
-                            final long splitLength;
+                            long splitLength;
 
-                            if (keepSequence) {
+                            if (keepTrailingSequence) {
                                 splitLength = bytesRead - startOffset;
                             } else {
                                 splitLength = bytesRead - startOffset - byteSequence.length;
                             }
 
-                            splits.add(new Tuple<>(startOffset, splitLength));
+                            if ( keepLeadingSequence && startOffset > 0 ) {
+                                splitLength += byteSequence.length;
+                            }
+                            
+                            final long splitStart = (keepLeadingSequence && startOffset > 0) ? startOffset - byteSequence.length : startOffset;
+                            splits.add(new Tuple<>(splitStart, splitLength));
                             startOffset = bytesRead;
                             buffer.clear();
                         }
@@ -207,8 +269,11 @@ public class SplitContent extends AbstractProcessor {
             lastOffsetPlusSize = offset + size;
         }
 
+        // lastOffsetPlusSize indicates the ending position of the last split.
+        // if the data didn't end with the byte sequence, we need one final split to run from the end
+        // of the last split to the end of the content.
         long finalSplitOffset = lastOffsetPlusSize;
-        if (!keepSequence) {
+        if (!keepTrailingSequence && !keepLeadingSequence) {
             finalSplitOffset += byteSequence.length;
         }
         if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) {

http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4752e284/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
index 2e08062..07c255b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
@@ -30,6 +30,129 @@ import org.junit.Test;
 public class TestSplitContent {
 
     @Test
+    public void testTextFormatLeadingPosition() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
+        runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
+        runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub");
+        runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+        runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
+
+        runner.enqueue("rub-a-dub-dub".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+
+        runner.assertQueueEmpty();
+        final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+        splits.get(0).assertContentEquals("r");
+        splits.get(1).assertContentEquals("ub-a-d");
+        splits.get(2).assertContentEquals("ub-d");
+        splits.get(3).assertContentEquals("ub");
+    }
+    
+    
+    @Test
+    public void testTextFormatSplits() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
+        runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
+        runner.setProperty(SplitContent.BYTE_SEQUENCE, "test");
+        runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+        runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
+
+        final byte[] input = "This is a test. This is another test. And this is yet another test. Finally this is the last Test.".getBytes();
+        runner.enqueue(input);
+        runner.run();
+
+        runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+
+        runner.assertQueueEmpty();
+        List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+        splits.get(0).assertContentEquals("This is a ");
+        splits.get(1).assertContentEquals("test. This is another ");
+        splits.get(2).assertContentEquals("test. And this is yet another ");
+        splits.get(3).assertContentEquals("test. Finally this is the last Test.");
+        runner.clearTransferState();
+        
+        runner.setProperty(SplitContent.KEEP_SEQUENCE, "false");
+        runner.enqueue(input);
+        runner.run();
+        runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+        splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+        splits.get(0).assertContentEquals("This is a ");
+        splits.get(1).assertContentEquals(". This is another ");
+        splits.get(2).assertContentEquals(". And this is yet another ");
+        splits.get(3).assertContentEquals(". Finally this is the last Test.");
+        runner.clearTransferState();
+        
+        runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+        runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
+        runner.enqueue(input);
+        runner.run();
+        runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+        splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+        splits.get(0).assertContentEquals("This is a test");
+        splits.get(1).assertContentEquals(". This is another test");
+        splits.get(2).assertContentEquals(". And this is yet another test");
+        splits.get(3).assertContentEquals(". Finally this is the last Test.");
+        runner.clearTransferState();
+        
+        runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+        runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
+        runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
+        runner.run();
+        runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+        splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+        splits.get(0).assertContentEquals("This is a test");
+        splits.get(1).assertContentEquals(". This is another test");
+        splits.get(2).assertContentEquals(". And this is yet another test");
+        splits.get(3).assertContentEquals(". Finally this is the last test");
+        runner.clearTransferState();
+        
+        runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+        runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
+        runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
+        runner.run();
+        runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitContent.REL_SPLITS, 5);
+        splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+        splits.get(0).assertContentEquals("This is a ");
+        splits.get(1).assertContentEquals("test. This is another ");
+        splits.get(2).assertContentEquals("test. And this is yet another ");
+        splits.get(3).assertContentEquals("test. Finally this is the last ");
+        splits.get(4).assertContentEquals("test");
+        
+        runner.clearTransferState();
+    }
+    
+    
+    @Test
+    public void testTextFormatTrailingPosition() {
+        final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
+        runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
+        runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub");
+        runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+        runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
+
+        runner.enqueue("rub-a-dub-dub".getBytes());
+        runner.run();
+
+        runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+        runner.assertTransferCount(SplitContent.REL_SPLITS, 3);
+
+        runner.assertQueueEmpty();
+        final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+        splits.get(0).assertContentEquals("rub");
+        splits.get(1).assertContentEquals("-a-dub");
+        splits.get(2).assertContentEquals("-dub");
+    }
+    
+    
+    @Test
     public void testSmallSplits() throws IOException {
         final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
         runner.setProperty(SplitContent.KEEP_SEQUENCE, "false");