You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/11/05 12:01:08 UTC

[nifi] branch master updated: NIFI-6395: Thread-safety bug fixed and added new flag property to handle the way to adjust the counters

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 7f96fa1  NIFI-6395: Thread-safety bug fixed and added new flag property to handle the way to adjust the counters
7f96fa1 is described below

commit 7f96fa1d0dbcfd061fba1e061c891ac35da4333d
Author: Ivan Ezequiel Rodriguez <iv...@claro.com.ar>
AuthorDate: Wed Jun 26 02:10:13 2019 -0300

    NIFI-6395: Thread-safety bug fixed and added new flag property to handle the way to adjust the counters
    
    Update CountText.java
    
    Local variable changes by AtomicInteger
    
    NIFI-6395 - Fix line is longer than 200 characters
    
    This closes #3552.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../apache/nifi/processors/standard/CountText.java | 59 +++++++++++++---------
 1 file changed, 34 insertions(+), 25 deletions(-)

diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
index 20195bd..d624580 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/CountText.java
@@ -31,6 +31,7 @@ import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 import org.apache.nifi.annotation.behavior.EventDriven;
@@ -141,7 +142,16 @@ public class CountText extends AbstractProcessor {
             .allowableValues(getStandardCharsetNames())
             .defaultValue(StandardCharsets.UTF_8.displayName())
             .build();
-
+    public static final PropertyDescriptor ADJUST_IMMEDIATELY = new PropertyDescriptor.Builder()
+            .name("ajust-immediately")
+            .displayName("Call Immediate Adjustment")
+            .description("If true, the counter will be updated immediately, without regard to whether the ProcessSession is commit or rolled back;" +
+                    "otherwise, the counter will be incremented only if and when the ProcessSession is committed.")
+            .required(true)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .addValidator(StandardValidators.BOOLEAN_VALIDATOR)
+            .build();
     private static Set<String> getStandardCharsetNames() {
         return STANDARD_CHARSETS.stream().map(c -> c.displayName()).collect(Collectors.toSet());
     }
@@ -164,7 +174,8 @@ public class CountText extends AbstractProcessor {
                 TEXT_WORD_COUNT_PD,
                 TEXT_CHARACTER_COUNT_PD,
                 SPLIT_WORDS_ON_SYMBOLS_PD,
-                CHARACTER_ENCODING_PD));
+                CHARACTER_ENCODING_PD,
+                ADJUST_IMMEDIATELY));
 
         relationships = Collections.unmodifiableSet(new HashSet<>(Arrays.asList(REL_SUCCESS,
                 REL_FAILURE)));
@@ -175,13 +186,9 @@ public class CountText extends AbstractProcessor {
     private volatile boolean countWords;
     private volatile boolean countCharacters;
     private volatile boolean splitWordsOnSymbols;
+    private volatile boolean adjustImmediately;
     private volatile String characterEncoding = StandardCharsets.UTF_8.name();
 
-    private volatile int lineCount;
-    private volatile int lineNonEmptyCount;
-    private volatile int wordCount;
-    private volatile int characterCount;
-
     @Override
     public Set<Relationship> getRelationships() {
         return relationships;
@@ -199,6 +206,8 @@ public class CountText extends AbstractProcessor {
                 ? context.getProperty(TEXT_CHARACTER_COUNT_PD).asBoolean() : false;
         this.splitWordsOnSymbols = context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).isSet()
                 ? context.getProperty(SPLIT_WORDS_ON_SYMBOLS_PD).asBoolean() : false;
+        this.adjustImmediately = context.getProperty(ADJUST_IMMEDIATELY).isSet()
+                ? context.getProperty(ADJUST_IMMEDIATELY).asBoolean() : false;
         this.characterEncoding = context.getProperty(CHARACTER_ENCODING_PD).getValue();
     }
 
@@ -213,10 +222,10 @@ public class CountText extends AbstractProcessor {
         }
         AtomicBoolean error = new AtomicBoolean();
 
-        lineCount = 0;
-        lineNonEmptyCount = 0;
-        wordCount = 0;
-        characterCount = 0;
+        final AtomicInteger lineCount = new AtomicInteger(0);
+        final AtomicInteger lineNonEmptyCount = new AtomicInteger(0);
+        final AtomicInteger wordCount = new AtomicInteger(0);
+        final AtomicInteger characterCount = new AtomicInteger(0);
 
         processSession.read(sourceFlowFile, in -> {
             long start = System.nanoTime();
@@ -227,21 +236,21 @@ public class CountText extends AbstractProcessor {
                 String line;
                 while ((line = bufferedReader.readLine()) != null) {
                     if (countLines) {
-                        lineCount++;
+                        lineCount.incrementAndGet();
                     }
 
                     if (countLinesNonEmpty) {
                         if (line.trim().length() > 0) {
-                            lineNonEmptyCount++;
+                            lineNonEmptyCount.incrementAndGet();
                         }
                     }
 
                     if (countWords) {
-                        wordCount += countWordsInLine(line, splitWordsOnSymbols);
+                        wordCount.addAndGet(countWordsInLine(line, splitWordsOnSymbols));
                     }
 
                     if (countCharacters) {
-                        characterCount += line.length();
+                        characterCount.addAndGet(line.length());
                     }
                 }
                 long stop = System.nanoTime();
@@ -251,15 +260,15 @@ public class CountText extends AbstractProcessor {
                     getLogger().debug("Computed metrics in " + durationNanos + " nanoseconds (" + df.format(durationNanos / 1_000_000_000.0) + " seconds).");
                 }
                 if (getLogger().isInfoEnabled()) {
-                    String message = generateMetricsMessage();
+                    String message = generateMetricsMessage(lineCount.get(), lineNonEmptyCount.get(), wordCount.get(), characterCount.get());
                     getLogger().info(message);
                 }
 
                 // Update session counters
-                processSession.adjustCounter("Lines Counted", (long) lineCount, false);
-                processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount, false);
-                processSession.adjustCounter("Words Counted", (long) wordCount, false);
-                processSession.adjustCounter("Characters Counted", (long) characterCount, false);
+                processSession.adjustCounter("Lines Counted", (long) lineCount.get(), adjustImmediately);
+                processSession.adjustCounter("Lines (non-empty) Counted", (long) lineNonEmptyCount.get(), adjustImmediately);
+                processSession.adjustCounter("Words Counted", (long) wordCount.get(), adjustImmediately);
+                processSession.adjustCounter("Characters Counted", (long) characterCount.get(), adjustImmediately);
             } catch (IOException e) {
                 error.set(true);
                 getLogger().error(e.getMessage() + " Routing to failure.", e);
@@ -271,23 +280,23 @@ public class CountText extends AbstractProcessor {
         } else {
             Map<String, String> metricAttributes = new HashMap<>();
             if (countLines) {
-                metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount));
+                metricAttributes.put(TEXT_LINE_COUNT, String.valueOf(lineCount.get()));
             }
             if (countLinesNonEmpty) {
-                metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount));
+                metricAttributes.put(TEXT_LINE_NONEMPTY_COUNT, String.valueOf(lineNonEmptyCount.get()));
             }
             if (countWords) {
-                metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount));
+                metricAttributes.put(TEXT_WORD_COUNT, String.valueOf(wordCount.get()));
             }
             if (countCharacters) {
-                metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount));
+                metricAttributes.put(TEXT_CHARACTER_COUNT, String.valueOf(characterCount.get()));
             }
             FlowFile updatedFlowFile = processSession.putAllAttributes(sourceFlowFile, metricAttributes);
             processSession.transfer(updatedFlowFile, REL_SUCCESS);
         }
     }
 
-    private String generateMetricsMessage() {
+    private String generateMetricsMessage(int lineCount, int lineNonEmptyCount, int wordCount, int characterCount) {
         StringBuilder sb = new StringBuilder("Counted ");
         List<String> metrics = new ArrayList<>();
         if (countLines) {