You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by jo...@apache.org on 2015/04/01 05:09:25 UTC
incubator-nifi git commit: NIFI-478: Fixed bug that caused byte
sequence to be dropped for last split under certain circumstances;
added new unit tests
Repository: incubator-nifi
Updated Branches:
refs/heads/develop e9647717e -> 4752e284f
NIFI-478: Fixed bug that caused byte sequence to be dropped for last split under certain circumstances; added new unit tests
Signed-off-by: joewitt <jo...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/4752e284
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/4752e284
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/4752e284
Branch: refs/heads/develop
Commit: 4752e284f6b4c4d3ae082af6b6f66d393c2a1ad3
Parents: e964771
Author: Mark Payne <ma...@hotmail.com>
Authored: Tue Mar 31 08:34:23 2015 -0400
Committer: joewitt <jo...@apache.org>
Committed: Tue Mar 31 23:03:25 2015 -0400
----------------------------------------------------------------------
.../nifi/processors/standard/SplitContent.java | 93 +++++++++++---
.../processors/standard/TestSplitContent.java | 123 +++++++++++++++++++
2 files changed, 202 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4752e284/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
index 9838af7..419e12d 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/SplitContent.java
@@ -18,7 +18,9 @@ package org.apache.nifi.processors.standard;
import java.io.IOException;
import java.io.InputStream;
+import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.Collections;
import java.util.HashMap;
import java.util.HashSet;
@@ -28,6 +30,7 @@ import java.util.Set;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicReference;
+import org.apache.commons.codec.DecoderException;
import org.apache.commons.codec.binary.Hex;
import org.apache.nifi.annotation.behavior.EventDriven;
import org.apache.nifi.annotation.behavior.SideEffectFree;
@@ -35,8 +38,10 @@ import org.apache.nifi.annotation.behavior.SupportsBatching;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.SeeAlso;
import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.annotation.behavior.WritesAttribute;
import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.components.AllowableValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.ValidationContext;
import org.apache.nifi.components.ValidationResult;
@@ -50,6 +55,7 @@ import org.apache.nifi.processor.ProcessSession;
import org.apache.nifi.processor.ProcessorInitializationContext;
import org.apache.nifi.processor.Relationship;
import org.apache.nifi.processor.io.InputStreamCallback;
+import org.apache.nifi.processor.util.StandardValidators;
import org.apache.nifi.stream.io.BufferedInputStream;
import org.apache.nifi.util.NaiveSearchRingBuffer;
import org.apache.nifi.util.Tuple;
@@ -72,19 +78,39 @@ public class SplitContent extends AbstractProcessor {
public static final String FRAGMENT_COUNT = "fragment.count";
public static final String SEGMENT_ORIGINAL_FILENAME = "segment.original.filename";
+ static final AllowableValue HEX_FORMAT = new AllowableValue("Hexadecimal", "Hexadecimal", "The Byte Sequence will be interpreted as a hexadecimal representation of bytes");
+ static final AllowableValue UTF8_FORMAT = new AllowableValue("Text", "Text", "The Byte Sequence will be interpreted as UTF-8 Encoded text");
+
+ static final AllowableValue TRAILING_POSITION = new AllowableValue("Trailing", "Trailing", "Keep the Byte Sequence at the end of the first split if <Keep Byte Sequence> is true");
+ static final AllowableValue LEADING_POSITION = new AllowableValue("Leading", "Leading", "Keep the Byte Sequence at the beginning of the second split if <Keep Byte Sequence> is true");
+
+ public static final PropertyDescriptor FORMAT = new PropertyDescriptor.Builder()
+ .name("Byte Sequence Format")
+ .description("Specifies how the <Byte Sequence> property should be interpreted")
+ .required(true)
+ .allowableValues(HEX_FORMAT, UTF8_FORMAT)
+ .defaultValue(HEX_FORMAT.getValue())
+ .build();
public static final PropertyDescriptor BYTE_SEQUENCE = new PropertyDescriptor.Builder()
.name("Byte Sequence")
- .description("A hex representation of bytes to look for and upon which to split the source file into separate files")
- .addValidator(new HexStringPropertyValidator())
+ .description("A representation of bytes to look for and upon which to split the source file into separate files")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.required(true)
.build();
public static final PropertyDescriptor KEEP_SEQUENCE = new PropertyDescriptor.Builder()
.name("Keep Byte Sequence")
- .description("Determines whether or not the Byte Sequence should be included at the end of each Split")
+ .description("Determines whether or not the Byte Sequence should be included with each Split")
.required(true)
.allowableValues("true", "false")
.defaultValue("false")
.build();
+ public static final PropertyDescriptor BYTE_SEQUENCE_LOCATION = new PropertyDescriptor.Builder()
+ .name("Byte Sequence Location")
+ .description("If <Keep Byte Sequence> is set to true, specifies whether the byte sequence should be added to the end of the first split or the beginning of the second; if <Keep Byte Sequence> is false, this property is ignored.")
+ .required(true)
+ .allowableValues(TRAILING_POSITION, LEADING_POSITION)
+ .defaultValue(TRAILING_POSITION.getValue())
+ .build();
public static final Relationship REL_SPLITS = new Relationship.Builder()
.name("splits")
@@ -108,8 +134,10 @@ public class SplitContent extends AbstractProcessor {
this.relationships = Collections.unmodifiableSet(relationships);
final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(FORMAT);
properties.add(BYTE_SEQUENCE);
properties.add(KEEP_SEQUENCE);
+ properties.add(BYTE_SEQUENCE_LOCATION);
this.properties = Collections.unmodifiableList(properties);
}
@@ -124,13 +152,27 @@ public class SplitContent extends AbstractProcessor {
}
@Override
- public void onPropertyModified(final PropertyDescriptor descriptor, final String oldValue, final String newValue) {
- if (descriptor.equals(BYTE_SEQUENCE)) {
- try {
- this.byteSequence.set(Hex.decodeHex(newValue.toCharArray()));
- } catch (final Exception e) {
- this.byteSequence.set(null);
- }
+ protected Collection<ValidationResult> customValidate(final ValidationContext validationContext) {
+ final List<ValidationResult> results = new ArrayList<>(1);
+ final String format = validationContext.getProperty(FORMAT).getValue();
+ if ( HEX_FORMAT.getValue().equals(format) ) {
+ final String byteSequence = validationContext.getProperty(BYTE_SEQUENCE).getValue();
+ final ValidationResult result = new HexStringPropertyValidator().validate(BYTE_SEQUENCE.getName(), byteSequence, validationContext);
+ results.add(result);
+ }
+ return results;
+ }
+
+
+ @OnScheduled
+ public void initializeByteSequence(final ProcessContext context) throws DecoderException {
+ final String bytePattern = context.getProperty(BYTE_SEQUENCE).getValue();
+
+ final String format = context.getProperty(FORMAT).getValue();
+ if ( HEX_FORMAT.getValue().equals(format) ) {
+ this.byteSequence.set(Hex.decodeHex(bytePattern.toCharArray()));
+ } else {
+ this.byteSequence.set(bytePattern.getBytes(StandardCharsets.UTF_8));
}
}
@@ -143,6 +185,21 @@ public class SplitContent extends AbstractProcessor {
final ProcessorLog logger = getLogger();
final boolean keepSequence = context.getProperty(KEEP_SEQUENCE).asBoolean();
+ final boolean keepTrailingSequence;
+ final boolean keepLeadingSequence;
+ if ( keepSequence ) {
+ if ( context.getProperty(BYTE_SEQUENCE_LOCATION).getValue().equals(TRAILING_POSITION.getValue()) ) {
+ keepTrailingSequence = true;
+ keepLeadingSequence = false;
+ } else {
+ keepTrailingSequence = false;
+ keepLeadingSequence = true;
+ }
+ } else {
+ keepTrailingSequence = false;
+ keepLeadingSequence = false;
+ }
+
final byte[] byteSequence = this.byteSequence.get();
if (byteSequence == null) { // should never happen. But just in case...
logger.error("{} Unable to obtain Byte Sequence", new Object[]{this});
@@ -169,15 +226,20 @@ public class SplitContent extends AbstractProcessor {
bytesRead++;
boolean matched = buffer.addAndCompare((byte) (nextByte & 0xFF));
if (matched) {
- final long splitLength;
+ long splitLength;
- if (keepSequence) {
+ if (keepTrailingSequence) {
splitLength = bytesRead - startOffset;
} else {
splitLength = bytesRead - startOffset - byteSequence.length;
}
- splits.add(new Tuple<>(startOffset, splitLength));
+ if ( keepLeadingSequence && startOffset > 0 ) {
+ splitLength += byteSequence.length;
+ }
+
+ final long splitStart = (keepLeadingSequence && startOffset > 0) ? startOffset - byteSequence.length : startOffset;
+ splits.add(new Tuple<>(splitStart, splitLength));
startOffset = bytesRead;
buffer.clear();
}
@@ -207,8 +269,11 @@ public class SplitContent extends AbstractProcessor {
lastOffsetPlusSize = offset + size;
}
+ // lastOffsetPlusSize indicates the ending position of the last split.
+ // if the data didn't end with the byte sequence, we need one final split to run from the end
+ // of the last split to the end of the content.
long finalSplitOffset = lastOffsetPlusSize;
- if (!keepSequence) {
+ if (!keepTrailingSequence && !keepLeadingSequence) {
finalSplitOffset += byteSequence.length;
}
if (finalSplitOffset > -1L && finalSplitOffset < flowFile.getSize()) {
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/4752e284/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
----------------------------------------------------------------------
diff --git a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
index 2e08062..07c255b 100644
--- a/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
+++ b/nifi/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestSplitContent.java
@@ -30,6 +30,129 @@ import org.junit.Test;
public class TestSplitContent {
@Test
+ public void testTextFormatLeadingPosition() {
+ final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
+ runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
+ runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub");
+ runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+ runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
+
+ runner.enqueue("rub-a-dub-dub".getBytes());
+ runner.run();
+
+ runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+
+ runner.assertQueueEmpty();
+ final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+ splits.get(0).assertContentEquals("r");
+ splits.get(1).assertContentEquals("ub-a-d");
+ splits.get(2).assertContentEquals("ub-d");
+ splits.get(3).assertContentEquals("ub");
+ }
+
+
+ @Test
+ public void testTextFormatSplits() {
+ final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
+ runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
+ runner.setProperty(SplitContent.BYTE_SEQUENCE, "test");
+ runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+ runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
+
+ final byte[] input = "This is a test. This is another test. And this is yet another test. Finally this is the last Test.".getBytes();
+ runner.enqueue(input);
+ runner.run();
+
+ runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+
+ runner.assertQueueEmpty();
+ List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+ splits.get(0).assertContentEquals("This is a ");
+ splits.get(1).assertContentEquals("test. This is another ");
+ splits.get(2).assertContentEquals("test. And this is yet another ");
+ splits.get(3).assertContentEquals("test. Finally this is the last Test.");
+ runner.clearTransferState();
+
+ runner.setProperty(SplitContent.KEEP_SEQUENCE, "false");
+ runner.enqueue(input);
+ runner.run();
+ runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+ splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+ splits.get(0).assertContentEquals("This is a ");
+ splits.get(1).assertContentEquals(". This is another ");
+ splits.get(2).assertContentEquals(". And this is yet another ");
+ splits.get(3).assertContentEquals(". Finally this is the last Test.");
+ runner.clearTransferState();
+
+ runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+ runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
+ runner.enqueue(input);
+ runner.run();
+ runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+ splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+ splits.get(0).assertContentEquals("This is a test");
+ splits.get(1).assertContentEquals(". This is another test");
+ splits.get(2).assertContentEquals(". And this is yet another test");
+ splits.get(3).assertContentEquals(". Finally this is the last Test.");
+ runner.clearTransferState();
+
+ runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+ runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
+ runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
+ runner.run();
+ runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.assertTransferCount(SplitContent.REL_SPLITS, 4);
+ splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+ splits.get(0).assertContentEquals("This is a test");
+ splits.get(1).assertContentEquals(". This is another test");
+ splits.get(2).assertContentEquals(". And this is yet another test");
+ splits.get(3).assertContentEquals(". Finally this is the last test");
+ runner.clearTransferState();
+
+ runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+ runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.LEADING_POSITION.getValue());
+ runner.enqueue("This is a test. This is another test. And this is yet another test. Finally this is the last test".getBytes());
+ runner.run();
+ runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.assertTransferCount(SplitContent.REL_SPLITS, 5);
+ splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+ splits.get(0).assertContentEquals("This is a ");
+ splits.get(1).assertContentEquals("test. This is another ");
+ splits.get(2).assertContentEquals("test. And this is yet another ");
+ splits.get(3).assertContentEquals("test. Finally this is the last ");
+ splits.get(4).assertContentEquals("test");
+
+ runner.clearTransferState();
+ }
+
+
+ @Test
+ public void testTextFormatTrailingPosition() {
+ final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
+ runner.setProperty(SplitContent.FORMAT, SplitContent.UTF8_FORMAT.getValue());
+ runner.setProperty(SplitContent.BYTE_SEQUENCE, "ub");
+ runner.setProperty(SplitContent.KEEP_SEQUENCE, "true");
+ runner.setProperty(SplitContent.BYTE_SEQUENCE_LOCATION, SplitContent.TRAILING_POSITION.getValue());
+
+ runner.enqueue("rub-a-dub-dub".getBytes());
+ runner.run();
+
+ runner.assertTransferCount(SplitContent.REL_ORIGINAL, 1);
+ runner.assertTransferCount(SplitContent.REL_SPLITS, 3);
+
+ runner.assertQueueEmpty();
+ final List<MockFlowFile> splits = runner.getFlowFilesForRelationship(SplitContent.REL_SPLITS);
+ splits.get(0).assertContentEquals("rub");
+ splits.get(1).assertContentEquals("-a-dub");
+ splits.get(2).assertContentEquals("-dub");
+ }
+
+
+ @Test
public void testSmallSplits() throws IOException {
final TestRunner runner = TestRunners.newTestRunner(new SplitContent());
runner.setProperty(SplitContent.KEEP_SEQUENCE, "false");