You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2017/08/17 00:22:12 UTC

nifi git commit: NIFI-4028 - fix cache update when Wait releases flow files

Repository: nifi
Updated Branches:
  refs/heads/master 9f23ec360 -> c1b99d584


NIFI-4028 - fix cache update when Wait releases flow files

NIFI-4028: Refactored Wait processor.

- Consolidated implementation for the cases of releasableFlowCount is 1 or more, in order to reduce complexity and behavior differences
- Added 'consumed' counter when total counter is used to release incoming FlowFiles
- Fixed method name typo, releaseCandidates

This closes #2055.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/c1b99d58
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/c1b99d58
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/c1b99d58

Branch: refs/heads/master
Commit: c1b99d584d93bb6f26ba0986f2fcaf663b0caef3
Parents: 9f23ec3
Author: Pierre Villard <pi...@gmail.com>
Authored: Thu Aug 3 22:56:27 2017 +0200
Committer: Koji Kawamura <ij...@apache.org>
Committed: Thu Aug 17 08:30:07 2017 +0900

----------------------------------------------------------------------
 .../apache/nifi/processors/standard/Wait.java   |  29 ++---
 .../processors/standard/WaitNotifyProtocol.java |  36 ++++--
 .../nifi/processors/standard/TestWait.java      |  60 +++++++++-
 .../standard/TestWaitNotifyProtocol.java        | 109 ++++++++++++++++++-
 4 files changed, 203 insertions(+), 31 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
index fccd443..4e5ae5d 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/Wait.java
@@ -305,6 +305,8 @@ public class Wait extends AbstractProcessor {
         final String attributeCopyMode = context.getProperty(ATTRIBUTE_COPY_MODE).getValue();
         final boolean replaceOriginalAttributes = ATTRIBUTE_COPY_REPLACE.getValue().equals(attributeCopyMode);
         final AtomicReference<Signal> signalRef = new AtomicReference<>();
+        // This map contains original counts before those are consumed to release incoming FlowFiles.
+        final HashMap<String, Long> originalSignalCounts = new HashMap<>();
 
         final Consumer<FlowFile> transferToFailure = flowFile -> {
             flowFile = session.penalize(flowFile);
@@ -324,7 +326,7 @@ public class Wait extends AbstractProcessor {
             }
 
             final List<FlowFile> flowFilesWithSignalAttributes = routedFlowFiles.getValue().stream()
-                    .map(f -> copySignalAttributes(session, f, signalRef.get(), replaceOriginalAttributes)).collect(Collectors.toList());
+                    .map(f -> copySignalAttributes(session, f, signalRef.get(), originalSignalCounts, replaceOriginalAttributes)).collect(Collectors.toList());
             session.transfer(flowFilesWithSignalAttributes, relationship);
         };
 
@@ -349,6 +351,9 @@ public class Wait extends AbstractProcessor {
         // get notifying signal
         try {
             signal = protocol.getSignal(signalId);
+            if (signal != null) {
+                originalSignalCounts.putAll(signal.getCounts());
+            }
             signalRef.set(signal);
         } catch (final IOException e) {
             throw new ProcessException(String.format("Failed to get signal for %s due to %s", signalId, e), e);
@@ -423,29 +428,20 @@ public class Wait extends AbstractProcessor {
         boolean waitProgressed = false;
         if (signal != null && !candidates.isEmpty()) {
 
-            if (releasableFlowFileCount > 1) {
-                signal.releaseCandidatese(targetCounterName, targetCount, releasableFlowFileCount, candidates,
+            if (releasableFlowFileCount > 0) {
+                signal.releaseCandidates(targetCounterName, targetCount, releasableFlowFileCount, candidates,
                         released -> getFlowFilesFor.apply(REL_SUCCESS).addAll(released),
                         waiting -> getFlowFilesFor.apply(REL_WAIT).addAll(waiting));
+                waitCompleted = signal.getTotalCount() == 0 && signal.getReleasableCount() == 0;
                 waitProgressed = !getFlowFilesFor.apply(REL_SUCCESS).isEmpty();
 
             } else {
-                // releasableFlowFileCount = 0 or 1
                 boolean reachedTargetCount = StringUtils.isBlank(targetCounterName)
                         ? signal.isTotalCountReached(targetCount)
                         : signal.isCountReached(targetCounterName, targetCount);
 
                 if (reachedTargetCount) {
-                    if (releasableFlowFileCount == 0) {
-                        getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
-                    } else {
-                        // releasableFlowFileCount = 1
-                        getFlowFilesFor.apply(REL_SUCCESS).add(candidates.remove(0));
-                        getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
-                        // If releasableFlowFileCount == 0, leave signal as it is,
-                        // so that any number of FlowFile can be released as long as target count condition matches.
-                        waitCompleted = true;
-                    }
+                    getFlowFilesFor.apply(REL_SUCCESS).addAll(candidates);
                 } else {
                     getFlowFilesFor.apply(REL_WAIT).addAll(candidates);
                 }
@@ -470,7 +466,7 @@ public class Wait extends AbstractProcessor {
 
     }
 
-    private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final boolean replaceOriginal) {
+    private FlowFile copySignalAttributes(final ProcessSession session, final FlowFile flowFile, final Signal signal, final Map<String, Long> originalCount, final boolean replaceOriginal) {
         if (signal == null) {
             return flowFile;
         }
@@ -488,8 +484,7 @@ public class Wait extends AbstractProcessor {
         }
 
         // Copy counter attributes
-        final Map<String, Long> counts = signal.getCounts();
-        final long totalCount = counts.entrySet().stream().mapToLong(e -> {
+        final long totalCount = originalCount.entrySet().stream().mapToLong(e -> {
             final Long count = e.getValue();
             attributesToCopy.put("wait.counter." + e.getKey(), String.valueOf(count));
             return count;

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
index 6183455..2c9c9fd 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/main/java/org/apache/nifi/processors/standard/WaitNotifyProtocol.java
@@ -16,6 +16,7 @@
  */
 package org.apache.nifi.processors.standard;
 
+import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.core.JsonParseException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.distributed.cache.client.AtomicCacheEntry;
@@ -45,6 +46,7 @@ public class WaitNotifyProtocol {
     private static final Logger logger = LoggerFactory.getLogger(WaitNotifyProtocol.class);
 
     public static final String DEFAULT_COUNT_NAME = "default";
+    public static final String CONSUMED_COUNT_NAME = "consumed";
     private static final int MAX_REPLACE_RETRY_COUNT = 5;
     private static final int REPLACE_RETRY_WAIT_MILLIS = 10;
 
@@ -86,9 +88,13 @@ public class WaitNotifyProtocol {
             this.attributes = attributes;
         }
 
+        @JsonIgnore
+        public long getTotalCount() {
+            return counts.values().stream().mapToLong(Long::longValue).sum();
+        }
+
         public boolean isTotalCountReached(final long targetCount) {
-            final long totalCount = counts.values().stream().mapToLong(Long::longValue).sum();
-            return totalCount >= targetCount;
+            return getTotalCount() >= targetCount;
         }
 
         public boolean isCountReached(final String counterName, final long targetCount) {
@@ -96,6 +102,10 @@ public class WaitNotifyProtocol {
         }
 
         public long getCount(final String counterName) {
+            if (counterName == null || counterName.isEmpty()) {
+                return getTotalCount();
+            }
+
             final Long count = counts.get(counterName);
             return count != null ? count : 0;
         }
@@ -115,7 +125,7 @@ public class WaitNotifyProtocol {
          * Caller of this method is responsible for updating cache storage after processing released and waiting candidates
          * by calling {@link #replace(Signal)}. Caller should rollback what it processed with these candidates if complete call failed.</p>
          *
-         * @param _counterName signal counter name to consume from.
+         * @param counterName signal counter name to consume from. If not specified, total counter is used, and 'consumed' counter is added to subtract consumed counters from total counter.
          * @param requiredCountForPass number of required signals to acquire a pass.
          * @param releasableCandidateCountPerPass number of releasable candidate per pass.
          * @param candidates candidates waiting for being allowed to pass.
@@ -123,12 +133,9 @@ public class WaitNotifyProtocol {
          * @param waiting function to process candidates those should remain in waiting queue.
          * @param <E> Type of candidate
          */
-        public <E> void releaseCandidatese(final String _counterName, final long requiredCountForPass,
-                                           final int releasableCandidateCountPerPass, final List<E> candidates,
-                                           final Consumer<List<E>> released, final Consumer<List<E>> waiting) {
-
-            // counterName is mandatory otherwise, we can't decide which counter to convert into pass count.
-            final String counterName = _counterName == null || _counterName.length() == 0 ? DEFAULT_COUNT_NAME : _counterName;
+        public <E> void releaseCandidates(final String counterName, final long requiredCountForPass,
+                                          final int releasableCandidateCountPerPass, final List<E> candidates,
+                                          final Consumer<List<E>> released, final Consumer<List<E>> waiting) {
 
             final int candidateSize = candidates.size();
             if (releasableCount < candidateSize) {
@@ -137,11 +144,18 @@ public class WaitNotifyProtocol {
                 final long signalCount = getCount(counterName);
                 releasableCount += (signalCount / requiredCountForPass) * releasableCandidateCountPerPass;
                 final long reducedSignalCount = signalCount % requiredCountForPass;
-                counts.put(counterName, reducedSignalCount);
+                if (counterName != null && !counterName.isEmpty()) {
+                    // Update target counter with reduced count.
+                    counts.put(counterName, reducedSignalCount);
+                } else {
+                    // If target counter name is not specified, add consumed count to subtract from accumulated total count.
+                    Long consumedCount = counts.getOrDefault(CONSUMED_COUNT_NAME, 0L);
+                    consumedCount -= signalCount - reducedSignalCount;
+                    counts.put(CONSUMED_COUNT_NAME, consumedCount);
+                }
             }
 
             int releaseCount = Math.min(releasableCount, candidateSize);
-
             released.accept(candidates.subList(0, releaseCount));
             waiting.accept(candidates.subList(releaseCount, candidateSize));
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
index 71187b6..a4df2f3 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWait.java
@@ -25,6 +25,7 @@ import static org.junit.Assert.fail;
 import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
+import java.util.ConcurrentModificationException;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -459,8 +460,65 @@ public class TestWait {
 
         runner.clearTransferState();
 
-        assertNull("The key no longer exist", protocol.getSignal("key"));
+    }
+
+    @Test
+    public void testDecrementCache() throws ConcurrentModificationException, IOException {
+        Map<String, String> cachedAttributes = new HashMap<>();
+        cachedAttributes.put("both", "notifyValue");
+        cachedAttributes.put("uuid", "notifyUuid");
+        cachedAttributes.put("notify.only", "notifyValue");
+
+        // Setup existing cache entry.
+        final WaitNotifyProtocol protocol = new WaitNotifyProtocol(service);
+
+        // A flow file comes in Notify and increment the counter
+        protocol.notify("key", "counter", 1, cachedAttributes);
+
+        // another flow files comes in Notify and increment the counter
+        protocol.notify("key", "counter", 1, cachedAttributes);
+
+        runner.setProperty(Wait.RELEASE_SIGNAL_IDENTIFIER, "${releaseSignalAttribute}");
+        runner.setProperty(Wait.SIGNAL_COUNTER_NAME, "${signalCounterName}");
+        runner.setProperty(Wait.TARGET_SIGNAL_COUNT, "1");
+        runner.assertValid();
+
+        final Map<String, String> waitAttributes = new HashMap<>();
+        waitAttributes.put("releaseSignalAttribute", "key");
+        waitAttributes.put("signalCounterName", "counter");
+        waitAttributes.put("wait.only", "waitValue");
+        waitAttributes.put("both", "waitValue");
+        waitAttributes.put("uuid", UUID.randomUUID().toString());
+        String flowFileContent = "content";
+        runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
+
+        /*
+         * 1st iteration
+         */
+        runner.run();
+        runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
+        MockFlowFile outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        outputFlowFile.assertAttributeEquals("wait.counter.counter", "2");
+
+        // expect counter to be decremented to 0 and releasable count remains 1.
+        assertEquals("0", Long.toString(protocol.getSignal("key").getCount("counter")));
+        assertEquals("1", Long.toString(protocol.getSignal("key").getReleasableCount()));
 
+        // introduce a second flow file with the same signal attribute
+        runner.enqueue(flowFileContent.getBytes("UTF-8"), waitAttributes);
+
+        /*
+         * 2nd iteration
+         */
+        runner.clearTransferState();
+        runner.run();
+        runner.assertAllFlowFilesTransferred(Wait.REL_SUCCESS, 1);
+        outputFlowFile = runner.getFlowFilesForRelationship(Wait.REL_SUCCESS).get(0);
+        // All counters are consumed.
+        outputFlowFile.assertAttributeEquals("wait.counter.counter", "0");
+
+        assertNull("The key no longer exist", protocol.getSignal("key"));
+        runner.clearTransferState();
     }
 
     private class TestIteration {

http://git-wip-us.apache.org/repos/asf/nifi/blob/c1b99d58/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
index e3f982c..01983d5 100644
--- a/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
+++ b/nifi-nar-bundles/nifi-standard-bundle/nifi-standard-processors/src/test/java/org/apache/nifi/processors/standard/TestWaitNotifyProtocol.java
@@ -37,6 +37,7 @@ import java.util.function.BiConsumer;
 import java.util.stream.Collectors;
 import java.util.stream.IntStream;
 
+import static org.apache.nifi.processors.standard.WaitNotifyProtocol.CONSUMED_COUNT_NAME;
 import static org.apache.nifi.processors.standard.WaitNotifyProtocol.DEFAULT_COUNT_NAME;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertFalse;
@@ -267,12 +268,12 @@ public class TestWaitNotifyProtocol {
         final List<Integer> waiting = new ArrayList<>();
 
         // Test default name.
-        final String counterName = null;
+        final String counterName = DEFAULT_COUNT_NAME;
 
         final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
             released.clear();
             waiting.clear();
-            signal.releaseCandidatese(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
+            signal.releaseCandidates(counterName, requiredCountForPass, releasableCandidatePerPass, candidates,
                     r -> released.addAll(r), w -> waiting.addAll(w));
         };
 
@@ -336,4 +337,108 @@ public class TestWaitNotifyProtocol {
 
     }
 
+
+    @Test
+    public void testReleaseCandidateTotal() throws Exception {
+        final List<Integer> candidates = IntStream.range(0, 10).boxed().collect(Collectors.toList());
+        final Signal signal = new Signal();
+        final List<Integer> released = new ArrayList<>();
+        final List<Integer> waiting = new ArrayList<>();
+
+        // Test empty counter name, should use total counters.
+        final String emptyCounterName = null;
+
+        final BiConsumer<Long, Integer> releaseCandidate = (requiredCountForPass, releasableCandidatePerPass) -> {
+            released.clear();
+            waiting.clear();
+            signal.releaseCandidates(emptyCounterName, requiredCountForPass, releasableCandidatePerPass, candidates,
+                    r -> released.addAll(r), w -> waiting.addAll(w));
+        };
+
+        final String counterA = "counterA";
+        final String counterB = "counterB";
+        final String counterC = "counterC";
+
+        final Field releasableCount = Signal.class.getDeclaredField("releasableCount");
+        releasableCount.setAccessible(true);
+
+        // No counter, should wait.
+        releaseCandidate.accept(3L, 1);
+        assertEquals(0, released.size());
+        assertEquals(10, waiting.size());
+        assertEquals(0, signal.getCount(emptyCounterName));
+        assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter is not enough yet.
+        signal.getCounts().put(counterA, 1L);
+        signal.getCounts().remove(CONSUMED_COUNT_NAME);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(0, released.size());
+        assertEquals(10, waiting.size());
+        assertEquals(1, signal.getCount(emptyCounterName)); // Counter incremented, but not enough
+        assertEquals(0, signal.getCount(CONSUMED_COUNT_NAME));
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target.
+        signal.getCounts().put(counterA, 1L);
+        signal.getCounts().put(counterB, 1L);
+        signal.getCounts().put(counterC, 1L);
+        signal.getCounts().remove(CONSUMED_COUNT_NAME);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(1, released.size());
+        assertEquals(9, waiting.size());
+        assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
+        assertEquals(-3, signal.getCount(CONSUMED_COUNT_NAME));
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target for two candidates.
+        signal.getCounts().put(counterA, 1L);
+        signal.getCounts().put(counterB, 2L);
+        signal.getCounts().put(counterC, 3L);
+        signal.getCounts().remove(CONSUMED_COUNT_NAME);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(2, released.size());
+        assertEquals(8, waiting.size());
+        assertEquals(0, signal.getCount(emptyCounterName)); // Counter 3 was converted into 1 release
+        assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target for two candidates, and reminder is 2.
+        signal.getCounts().put(counterA, 3L);
+        signal.getCounts().put(counterB, 3L);
+        signal.getCounts().put(counterC, 5L);
+        signal.getCounts().remove(CONSUMED_COUNT_NAME);
+        releaseCandidate.accept(3L, 1);
+        assertEquals(3, released.size()); // 11 / 3 = 3
+        assertEquals(7, waiting.size());
+        assertEquals(2, signal.getCount(emptyCounterName));
+        assertEquals(-9, signal.getCount(CONSUMED_COUNT_NAME));
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // Counter reached the target for two pass count and each pass can release 2 candidates.
+        signal.getCounts().put(counterA, 1L);
+        signal.getCounts().put(counterB, 2L);
+        signal.getCounts().put(counterC, 3L);
+        signal.getCounts().remove(CONSUMED_COUNT_NAME);
+        releaseCandidate.accept(3L, 2);
+        assertEquals(4, released.size()); // (6 / 3) * 2 = 4
+        assertEquals(6, waiting.size());
+        assertEquals(0, signal.getCount(emptyCounterName));
+        assertEquals(-6, signal.getCount(CONSUMED_COUNT_NAME));
+        assertEquals(0, releasableCount.getInt(signal));
+
+        // If there are counts more than enough to release current candidates, unused releasableCount should remain.
+        signal.getCounts().put(counterA, 10L);
+        signal.getCounts().put(counterB, 20L);
+        signal.getCounts().put(counterC, 20L);
+        signal.getCounts().remove(CONSUMED_COUNT_NAME);
+        releaseCandidate.accept(3L, 2);
+        assertEquals(10, released.size()); // (50 / 3) * 2 = 32. Used 10.
+        assertEquals(0, waiting.size());
+        assertEquals(2, signal.getCount(emptyCounterName)); // 50 % 3 = 2.
+        assertEquals(-48, signal.getCount(CONSUMED_COUNT_NAME)); // 50 % 3 = 2.
+        assertEquals(22, releasableCount.getInt(signal)); // 32 - 10 = 22.
+
+    }
 }
\ No newline at end of file