You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by al...@apache.org on 2018/01/08 22:26:23 UTC

nifi git commit: NIFI-4727 Added CountText processor and unit test.

Repository: nifi
Updated Branches:
  refs/heads/master 4196140e4 -> a7f1eb89c


NIFI-4727 Added CountText processor and unit test.

This closes #2371.

Signed-off-by: Kevin Doran <kd...@gmail.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/a7f1eb89
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/a7f1eb89
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/a7f1eb89

Branch: refs/heads/master
Commit: a7f1eb89c23e90f813be12adc4dca99de36c7c15
Parents: 4196140
Author: Andy LoPresto <al...@gmail.com>
Authored: Tue Jan 2 14:47:33 2018 -0500
Committer: Andy LoPresto <al...@apache.org>
Committed: Mon Jan 8 14:26:14 2018 -0800

----------------------------------------------------------------------
 .../nifi-standard-processors/pom.xml            |   1 +
 .../nifi/processors/standard/CountText.java     | 327 ++++++++++++++++++
 .../org.apache.nifi.processor.Processor         |   1 +
 .../processors/standard/CountTextTest.groovy    | 332 +++++++++++++++++++
 .../resources/TestCountText/jabberwocky.txt     |  34 ++
 5 files changed, 695 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
index 2557d1a..89021c6 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/pom.xml
@@ -395,6 +395,7 @@
                         <exclude>src/test/resources/TestConvertJSONToSQL/person-with-null-code.json</exclude>
                         <exclude>src/test/resources/TestConvertJSONToSQL/person-without-id.json</exclude>
                         <exclude>src/test/resources/TestConvertJSONToSQL/person-with-bool.json</exclude>
+                        <exclude>src/test/resources/TestCountText/jabberwocky.txt</exclude>
                         <exclude>src/test/resources/TestModifyBytes/noFooter.txt</exclude>
                         <exclude>src/test/resources/TestModifyBytes/noFooter_noHeader.txt</exclude>
                         <exclude>src/test/resources/TestModifyBytes/noHeader.txt</exclude>

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
new file mode 100644
index 0000000..20195bd
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
@@ -0,0 +1,327 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard;
+
+import java.io.BufferedReader;
+import java.io.IOException;
+import java.io.InputStreamReader;
+import java.nio.charset.Charset;
+import java.nio.charset.StandardCharsets;
+import java.text.DecimalFormat;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.regex.Pattern;
+import java.util.stream.Collectors;
+import org.apache.nifi.annotation.behavior.EventDriven;
+import org.apache.nifi.annotation.behavior.InputRequirement;
+import org.apache.nifi.annotation.behavior.InputRequirement.Requirement;
+import org.apache.nifi.annotation.behavior.SideEffectFree;
+import org.apache.nifi.annotation.behavior.SupportsBatching;
+import org.apache.nifi.annotation.behavior.WritesAttribute;
+import org.apache.nifi.annotation.behavior.WritesAttributes;
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.SeeAlso;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.flowfile.FlowFile;
+import org.apache.nifi.processor.AbstractProcessor;
+import org.apache.nifi.processor.ProcessContext;
+import org.apache.nifi.processor.ProcessSession;
+import org.apache.nifi.processor.Relationship;
+import org.apache.nifi.processor.exception.ProcessException;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.util.StringUtils;
+
+@EventDriven
+@SideEffectFree
+@SupportsBatching
+@Tags({"count", "text", "line", "word", "character"})
+@InputRequirement(Requirement.INPUT_REQUIRED)
+@CapabilityDescription("Counts various metrics on incoming text. The requested results will be recorded as attributes. "
+        + "The resulting flowfile will not have its content modified.")
+@WritesAttributes({
+        @WritesAttribute(attribute = "text.line.count", description = "The number of lines of text present in the FlowFile content"),
+        @WritesAttribute(attribute = "text.line.nonempty.count", description = "The number of lines of text (with at least one non-whitespace character) present in the original FlowFile"),
+        @WritesAttribute(attribute = "text.word.count", description = "The number of words present in the original FlowFile"),
+        @WritesAttribute(attribute = "text.character.count", description = "The number of characters (given the specified character encoding) present in the original FlowFile"),
+})
+@SeeAlso(SplitText.class)
+public class CountText extends AbstractProcessor {
+    private static final List<Charset> STANDARD_CHARSETS = Arrays.asList(
+            StandardCharsets.UTF_8,
+            StandardCharsets.US_ASCII,
+            StandardCharsets.ISO_8859_1,
+            StandardCharsets.UTF_16,
+            StandardCharsets.UTF_16LE,
+            StandardCharsets.UTF_16BE);
+
+    private static final Pattern SYMBOL_PATTERN = Pattern.compile("[\\s-\\._]");
+    private static final Pattern WHITESPACE_ONLY_PATTERN = Pattern.compile("\\s");
+
+    // Attribute keys
+    public static final String TEXT_LINE_COUNT = "text.line.count";
+    public static final String TEXT_LINE_NONEMPTY_COUNT = "text.line.nonempty.count";
+    public static final String TEXT_WORD_COUNT = "text.word.count";
+    public static final String TEXT_CHARACTER_COUNT = "text.character.count";
+
+
+    public static final PropertyDescriptor TEXT_LINE_COUNT_PD = new PropertyDescriptor.Builder()
+            .name("text-line-count")
+            .displayName("Count Lines")
+            .description("If enabled, will count the number of lines present in the incoming text.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("true")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor TEXT_LINE_NONEMPTY_COUNT_PD = new PropertyDescriptor.Builder()
+            .name("text-line-nonempty-count")
+            .displayName("Count Non-Empty Lines")
+            .description("If enabled, will count the number of lines that contain a non-whitespace character present in the incoming text.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor TEXT_WORD_COUNT_PD = new PropertyDescriptor.Builder()
+            .name("text-word-count")
+            .displayName("Count Words")
+            .description("If enabled, will count the number of words (alphanumeric character groups bounded by whitespace)" +
+                    " present in the incoming text. Common logical delimiters [_-.] do not bound a word unless 'Split Words on Symbols' is true.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor TEXT_CHARACTER_COUNT_PD = new PropertyDescriptor.Builder()
+            .name("text-character-count")
+            .displayName("Count Characters")
+            .description("If enabled, will count the number of characters (including whitespace and symbols, but not including newlines and carriage returns) present in the incoming text.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor SPLIT_WORDS_ON_SYMBOLS_PD = new PropertyDescriptor.Builder()
+            .name("split-words-on-symbols")
+            .displayName("Split Words on Symbols")
+            .description("If enabled, the word count will identify strings separated by common logical delimiters [ _ - . ] as independent words (ex. split-words-on-symbols = 4 words).")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
+    public static final PropertyDescriptor CHARACTER_ENCODING_PD = new PropertyDescriptor.Builder()
+            .name("character-encoding")
+            .displayName("Character Encoding")
+            .description("Specifies a character encoding to use.")
+            .required(true)
+            .allowableValues(getStandardCharsetNames())
+            .defaultValue(StandardCharsets.UTF_8.displayName())
+            .build();
+
+    private static Set<String> getStandardCharsetNames() {
+        return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet());
+    }
+
+    public static final Relationship REL_SUCCESS = new Relationship.Builder()
+            .name("success")
+            .description("The flowfile contains the original content with one or more attributes added containing the respective counts")
+            .build();
+    public static final Relationship REL_FAILURE = new Relationship.Builder()
+            .name("failure")
+            .description("If the flowfile text cannot be counted for some reason, the original file will be routed to this destination and nothing will be routed elsewhere")
+            .build();
+
+    private static final List<PropertyDescriptor> properties;
+    private static final Set<Relationship> relationships;
+
+    static {
+        properties = Collections.unmodifiableList(Arrays.asList(TEXT_LINE_COUNT_PD,
+                TEXT_LINE_NONEMPTY_COUNT_PD,
+                TEXT_WORD_COUNT_PD,
+                TEXT_CHARACTER_COUNT_PD,
+                SPLIT_WORDS_ON_SYMBOLS_PD,
+                CHARACTER_ENCODING_PD));
+
+        relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS,
+                REL_FAILURE)));
+    }
+
+    private volatile boolean countLines;
+    private volatile boolean countLinesNonEmpty;
+    private volatile boolean countWords;
+    private volatile boolean countCharacters;
+    private volatile boolean splitWordsOnSymbols;
+    private volatile String characterEncoding = StandardCharsets.UTF_8.name();
+
+    private volatile int lineCount;
+    private volatile int lineNonEmptyCount;
+    private volatile int wordCount;
+    private volatile int characterCount;
+
+    @Override
+    public Set<Relationship> getRelationships() {
+        return relationships;
+    }
+
+    @OnScheduled
+    public void onSchedule(ProcessContext context) {
+        this.countLines = context.getProperty(TEXT_LINE_COUNT_PD).isSet()
+                ? context.getProperty(TEXT_LINE_COUNT_PD).asBoolean() : false;
+        this.countLinesNonEmpty = context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).isSet()
+                ? context.getProperty(TEXT_LINE_NONEMPTY_COUNT_PD).asBoolean() : false;
+        this.countWords = context.getProperty(TEXT_WORD_COUNT_PD).isSet()
+                ? context.getProperty(TEXT_WORD_COUNT_PD).asBoolean() : false;
+        this.countCharacters = context.getProperty(TEXT_CHARACTER_COUNT_PD).isSet()
+                ? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false;
+        this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet()
+                ? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false;
+        this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue();
+    }
+
+    /**
+     * Will count text attributes of the incoming stream.
+     */
+    @Override
+    public void onTrigger(ProcessContext context, ProcessSession processSession) throws ProcessException {
+        FlowFile sourceFlowFile = processSession.get();
+        if (sourceFlowFile == null) {
+            return;
+        }
+        AtomicBoolean error = new AtomicBoolean();
+
+        lineCount = 0;
+        lineNonEmptyCount = 0;
+        wordCount = 0;
+        characterCount = 0;
+
+        processSession.read(sourceFlowFile, in -> {
+            long start = System.nanoTime();
+
+            // Iterate over the lines in the text input
+            try {
+                BufferedReader bufferedReader = new BufferedReader(new InputStreamReader(in, characterEncoding));
+                String line;
+                while ((line = bufferedReader.readLine()) != null) {
+                    if (countLines) {
+                        lineCount++;
+                    }
+
+                    if (countLinesNonEmpty) {
+                        if (line.trim().length() > 0) {
+                            lineNonEmptyCount++;
+                        }
+                    }
+
+                    if (countWords) {
+                        wordCount += countWordsInLine(line, splitWordsOnSymbols);
+                    }
+
+                    if (countCharacters) {
+                        characterCount += line.length();
+                    }
+                }
+                long stop = System.nanoTime();
+                if (getLogger().isDebugEnabled()) {
+                    final long durationNanos = stop - start;
+                    DecimalFormat df = new DecimalFormat("#.###");
+                    getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
+                }
+                if (getLogger().isInfoEnabled()) {
+                    String message = generateMetricsMessage();
+                    getLogger().info(message);
+                }
+
+                // Update session counters
+                processSession.adjustCounter("Lines Counted", (long) lineCount, false);
+                processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount, false);
+                processSession.adjustCounter("Words Counted", (long) wordCount, false);
+                processSession.adjustCounter("Characters Counted", (long) characterCount, false);
+            } catch (IOException e) {
+                error.set(true);
+                getLogger().error(e.getMessage() + " Routing to failure.", e);
+            }
+        });
+
+        if (error.get()) {
+            processSession.transfer(sourceFlowFile, REL_FAILURE);
+        } else {
+            Map<String, String> metricAttributes = new HashMap<>();
+            if (countLines) {
+                metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount));
+            }
+            if (countLinesNonEmpty) {
+                metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount));
+            }
+            if (countWords) {
+                metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount));
+            }
+            if (countCharacters) {
+                metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount));
+            }
+            FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes);
+            processSession.transfer(updatedFlowFile, REL_SUCCESS);
+        }
+    }
+
+    private String generateMetricsMessage() {
+        StringBuilder sb = new StringBuilder("Counted ");
+        List<String> metrics = new ArrayList<>();
+        if (countLines) {
+            metrics.add(lineCount + " lines");
+        }
+        if (countLinesNonEmpty) {
+            metrics.add(lineNonEmptyCount + " non-empty lines");
+        }
+        if (countWords) {
+            metrics.add(wordCount + " words");
+        }
+        if (countCharacters) {
+            metrics.add(characterCount + " characters");
+        }
+        sb.append(StringUtils.join(metrics, ", "));
+        return sb.toString();
+    }
+
+    int countWordsInLine(String line, boolean splitWordsOnSymbols) throws IOException {
+        if (line == null || line.trim().length() == 0) {
+            return 0;
+        } else {
+            Pattern regex = splitWordsOnSymbols ? SYMBOL_PATTERN : WHITESPACE_ONLY_PATTERN;
+            final String[] words = regex.split(line);
+            // TODO: Trim individual words before counting to eliminate whitespace words?
+            if (getLogger().isDebugEnabled()) {
+                getLogger().debug("Split [" + line + "] to [" + StringUtils.join(Arrays.asList(words), ", ") + "] (" + words.length + ")");
+            }
+            return words.length;
+        }
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
index 3fb0de3..a93b37f 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/resources/META-INF/services/org.apache.nifi.processor.Processor
@@ -19,6 +19,7 @@ org.apache.nifi.processors.standard.ControlRate
 org.apache.nifi.processors.standard.ConvertCharacterSet
 org.apache.nifi.processors.standard.ConvertJSONToSQL
 org.apache.nifi.processors.standard.ConvertRecord
+org.apache.nifi.processors.standard.CountText
 org.apache.nifi.processors.standard.DebugFlow
 org.apache.nifi.processors.standard.DetectDuplicate
 org.apache.nifi.processors.standard.DistributeLoad

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy
new file mode 100644
index 0000000..52b6908
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/groovy/org/apache/nifi/processors/standard/CountTextTest.groovy
@@ -0,0 +1,332 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License") you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.processors.standard
+
+import org.apache.nifi.components.PropertyDescriptor
+import org.apache.nifi.flowfile.FlowFile
+import org.apache.nifi.util.MockProcessSession
+import org.apache.nifi.util.TestRunner
+import org.apache.nifi.util.TestRunners
+import org.bouncycastle.jce.provider.BouncyCastleProvider
+import org.junit.After
+import org.junit.Before
+import org.junit.BeforeClass
+import org.junit.Test
+import org.junit.runner.RunWith
+import org.junit.runners.JUnit4
+import org.mockito.Mockito
+import org.slf4j.Logger
+import org.slf4j.LoggerFactory
+
+import java.nio.charset.StandardCharsets
+import java.security.Security
+
+import static org.mockito.Matchers.anyBoolean
+import static org.mockito.Matchers.anyString
+import static org.mockito.Mockito.when
+
+@RunWith(JUnit4.class)
+class CountTextTest extends GroovyTestCase {
+    private static final Logger logger = LoggerFactory.getLogger(CountTextTest.class)
+
+    private static final String TLC = "text.line.count"
+    private static final String TLNEC = "text.line.nonempty.count"
+    private static final String TWC = "text.word.count"
+    private static final String TCC = "text.character.count"
+
+
+    @BeforeClass
+    static void setUpOnce() throws Exception {
+        Security.addProvider(new BouncyCastleProvider())
+
+        logger.metaClass.methodMissing = { String name, args ->
+            logger.info("[${name?.toUpperCase()}] ${(args as List).join(" ")}")
+        }
+    }
+
+    @Before
+    void setUp() throws Exception {
+    }
+
+    @After
+    void tearDown() throws Exception {
+    }
+
+    @Test
+    void testShouldCountAllMetrics() throws Exception {
+        // Arrange
+        final TestRunner runner = TestRunners.newTestRunner(CountText.class)
+
+        runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
+
+        // This text is the same as in src/test/resources/TestCountText/jabberwocky.txt but is copied here
+        // to ensure that reading from a file vs. static text doesn't cause line break issues
+        String INPUT_TEXT = """’Twas brillig, and the slithy toves
+Did gyre and gimble in the wade;
+All mimsy were the borogoves,
+And the mome raths outgrabe.
+
+"Beware the Jabberwock, my son!
+The jaws that bite, the claws that catch!
+Beware the Jubjub bird, and shun
+The frumious Bandersnatch!"
+
+He took his vorpal sword in hand:
+Long time the manxome foe he sought—
+So rested he by the Tumtum tree,
+And stood awhile in thought.
+
+And as in uffish thought he stood,
+The Jabberwock, with eyes of flame,
+Came whiffling through the tulgey wood.
+And burbled as it came!
+
+One, two! One, two! And through and through
+The vorpal blade went snicker-snack!
+He left it dead, and with its head
+He went galumphing back.
+
+"And hast thou slain the Jabberwock?
+Come to my arms, my beamish boy!
+O frabjous day! Callooh! Callay!"
+He chortled in his joy.
+
+’Twas brillig, and the slithy toves
+Did gyre and gimble in the wabe;
+All mimsy were the borogoves,
+And the mome raths outgrabe."""
+
+        runner.enqueue(INPUT_TEXT.bytes)
+
+        // Act
+        runner.run()
+
+        // Assert
+        runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
+        FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
+        assert flowFile.attributes."$TLC" == 34 as String
+        assert flowFile.attributes."$TLNEC" == 28 as String
+        assert flowFile.attributes."$TWC" == 166 as String
+        assert flowFile.attributes."$TCC" == 900 as String
+    }
+
+    @Test
+    void testShouldCountEachMetric() throws Exception {
+        // Arrange
+        final TestRunner runner = TestRunners.newTestRunner(CountText.class)
+        String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
+
+        final def EXPECTED_VALUES = [
+                (TLC)  : 34,
+                (TLNEC): 28,
+                (TWC)  : 166,
+                (TCC)  : 900,
+        ]
+
+        def linesOnly = [(CountText.TEXT_LINE_COUNT_PD): "true"]
+        def linesNonEmptyOnly = [(CountText.TEXT_LINE_NONEMPTY_COUNT_PD): "true"]
+        def wordsOnly = [(CountText.TEXT_WORD_COUNT_PD): "true"]
+        def charactersOnly = [(CountText.TEXT_CHARACTER_COUNT_PD): "true"]
+
+        final List<Map<PropertyDescriptor, String>> SCENARIOS = [linesOnly, linesNonEmptyOnly, wordsOnly, charactersOnly]
+
+        SCENARIOS.each { map ->
+            // Reset the processor properties
+            runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
+            runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
+            runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "false")
+            runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
+
+            // Apply the scenario-specific properties
+            map.each { key, value ->
+                runner.setProperty(key, value)
+            }
+
+            runner.clearProvenanceEvents()
+            runner.clearTransferState()
+            runner.enqueue(INPUT_TEXT.bytes)
+
+            // Act
+            runner.run()
+
+            // Assert
+            runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
+            FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
+            logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
+            EXPECTED_VALUES.each { key, value ->
+                if (flowFile.attributes.containsKey(key)) {
+                    assert flowFile.attributes.get(key) == value as String
+                }
+            }
+        }
+    }
+
+    @Test
+    void testShouldCountWordsSplitOnSymbol() throws Exception {
+        // Arrange
+        final TestRunner runner = TestRunners.newTestRunner(CountText.class)
+        String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
+
+        final int EXPECTED_WORD_COUNT = 167
+
+        // Reset the processor properties
+        runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "false")
+        runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "false")
+        runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "false")
+        runner.setProperty(CountText.SPLIT_WORDS_ON_SYMBOLS_PD, "true")
+
+        runner.clearProvenanceEvents()
+        runner.clearTransferState()
+        runner.enqueue(INPUT_TEXT.bytes)
+
+        // Act
+        runner.run()
+
+        // Assert
+        runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
+        FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
+        logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
+        assert flowFile.attributes.get(CountText.TEXT_WORD_COUNT) == EXPECTED_WORD_COUNT as String
+    }
+
+    @Test
+    void testShouldCountIndependentlyPerFlowFile() throws Exception {
+        // Arrange
+        final TestRunner runner = TestRunners.newTestRunner(CountText.class)
+        String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
+
+        final def EXPECTED_VALUES = [
+                (TLC)  : 34,
+                (TLNEC): 28,
+                (TWC)  : 166,
+                (TCC)  : 900,
+        ]
+
+        // Reset the processor properties
+        runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
+
+        2.times { int i ->
+            runner.clearProvenanceEvents()
+            runner.clearTransferState()
+            runner.enqueue(INPUT_TEXT.bytes)
+
+            // Act
+            runner.run()
+
+            // Assert
+            runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
+            FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
+            logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
+            EXPECTED_VALUES.each { key, value ->
+                if (flowFile.attributes.containsKey(key)) {
+                    assert flowFile.attributes.get(key) == value as String
+                }
+            }
+        }
+    }
+
+    @Test
+    void testShouldTrackSessionCountersAcrossMultipleFlowfiles() throws Exception {
+        // Arrange
+        final TestRunner runner = TestRunners.newTestRunner(CountText.class)
+        String INPUT_TEXT = new File("src/test/resources/TestCountText/jabberwocky.txt").text
+
+        final def EXPECTED_VALUES = [
+                (TLC)  : 34,
+                (TLNEC): 28,
+                (TWC)  : 166,
+                (TCC)  : 900,
+        ]
+
+        // Reset the processor properties
+        runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
+
+        MockProcessSession mockPS = runner.processSessionFactory.createSession()
+        def sessionCounters = mockPS.sharedState.counterMap
+        logger.info("Session counters (0): ${sessionCounters}")
+
+        int n = 2
+
+        n.times { int i ->
+            runner.clearTransferState()
+            runner.enqueue(INPUT_TEXT.bytes)
+
+            // Act
+            runner.run()
+            logger.info("Session counters (${i + 1}): ${sessionCounters}")
+
+            // Assert
+            runner.assertAllFlowFilesTransferred(CountText.REL_SUCCESS, 1)
+            FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_SUCCESS).first()
+            logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
+            EXPECTED_VALUES.each { key, value ->
+                if (flowFile.attributes.containsKey(key)) {
+                    assert flowFile.attributes.get(key) == value as String
+                }
+            }
+        }
+
+        assert sessionCounters.get("Lines Counted").get() == EXPECTED_VALUES[TLC] * n as long
+        assert sessionCounters.get("Lines (non-empty) Counted").get() == EXPECTED_VALUES[TLNEC] * n as long
+        assert sessionCounters.get("Words Counted").get() == EXPECTED_VALUES[TWC] * n as long
+        assert sessionCounters.get("Characters Counted").get() == EXPECTED_VALUES[TCC] * n as long
+    }
+
+    @Test
+    void testShouldHandleInternalError() throws Exception {
+        // Arrange
+        CountText ct = new CountText()
+        ct.countLines = true
+        ct.countLinesNonEmpty = true
+        ct.countWords = true
+        ct.countCharacters = true
+
+        CountText ctSpy = Mockito.spy(ct)
+        when(ctSpy.countWordsInLine(anyString(), anyBoolean())).thenThrow(new IOException("Expected exception"))
+
+        final TestRunner runner = TestRunners.newTestRunner(ctSpy)
+        final String INPUT_TEXT = "This flowfile should throw an error"
+
+        // Reset the processor properties
+        runner.setProperty(CountText.TEXT_LINE_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_LINE_NONEMPTY_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_WORD_COUNT_PD, "true")
+        runner.setProperty(CountText.TEXT_CHARACTER_COUNT_PD, "true")
+        runner.setProperty(CountText.CHARACTER_ENCODING_PD, StandardCharsets.US_ASCII.displayName())
+
+        runner.enqueue(INPUT_TEXT.bytes)
+
+        // Act
+        // Need initialize = true to run #onScheduled()
+        runner.run(1, true, true)
+
+        // Assert
+        runner.assertAllFlowFilesTransferred(CountText.REL_FAILURE, 1)
+        FlowFile flowFile = runner.getFlowFilesForRelationship(CountText.REL_FAILURE).first()
+        logger.info("Generated flowfile: ${flowFile} | ${flowFile.attributes}")
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/a7f1eb89/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt
new file mode 100644
index 0000000..9d76b37
--- /dev/null
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/resources/TestCountText/jabberwocky.txt
@@ -0,0 +1,34 @@
+’Twas brillig, and the slithy toves
+Did gyre and gimble in the wade;
+All mimsy were the borogoves,
+And the mome raths outgrabe.
+
+"Beware the Jabberwock, my son!
+The jaws that bite, the claws that catch!
+Beware the Jubjub bird, and shun
+The frumious Bandersnatch!"
+
+He took his vorpal sword in hand:
+Long time the manxome foe he sought—
+So rested he by the Tumtum tree,
+And stood awhile in thought.
+
+And as in uffish thought he stood,
+The Jabberwock, with eyes of flame,
+Came whiffling through the tulgey wood.
+And burbled as it came!
+
+One, two! One, two! And through and through
+The vorpal blade went snicker-snack!
+He left it dead, and with its head
+He went galumphing back.
+
+"And hast thou slain the Jabberwock?
+Come to my arms, my beamish boy!
+O frabjous day! Callooh! Callay!"
+He chortled in his joy.
+
+’Twas brillig, and the slithy toves
+Did gyre and gimble in the wabe;
+All mimsy were the borogoves,
+And the mome raths outgrabe.
\ No newline at end of file