You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/10/29 14:38:24 UTC

[nifi] branch master updated (dccbde4 -> ce5eae5)

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.


    from dccbde4  NIFI-5452: Allow ignore block locality in HDFS Fixed formatting. Fallback to autoboxing
     new 67f1677  NIFI-6800 - Fix hashmap counter overwritten in highThroughputSession
     new ce5eae5  NIFI-6800: Added unit test to verify behavior of contribution

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../repository/StandardProcessSession.java         |  6 ++-
 .../repository/TestStandardProcessSession.java     | 48 +++++++++++++++++++++-
 2 files changed, 51 insertions(+), 3 deletions(-)


[nifi] 02/02: NIFI-6800: Added unit test to verify behavior of contribution

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit ce5eae5b2cd5cf49a90d2980c58cd6647f4af292
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 29 10:17:27 2019 -0400

    NIFI-6800: Added unit test to verify behavior of contribution
---
 .../repository/TestStandardProcessSession.java     | 48 +++++++++++++++++++++-
 1 file changed, 46 insertions(+), 2 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index b2e1870..267f22d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
 import org.apache.nifi.connectable.Connectable;
 import org.apache.nifi.connectable.ConnectableType;
 import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.Counter;
 import org.apache.nifi.controller.ProcessScheduler;
 import org.apache.nifi.controller.queue.FlowFileQueue;
 import org.apache.nifi.controller.queue.NopConnectionEventListener;
@@ -112,6 +113,7 @@ public class TestStandardProcessSession {
 
     private ProvenanceEventRepository provenanceRepo;
     private MockFlowFileRepository flowFileRepo;
+    private CounterRepository counterRepository;
     private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
     private static StandardResourceClaimManager resourceClaimManager;
 
@@ -154,7 +156,7 @@ public class TestStandardProcessSession {
 
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
         final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
-        final CounterRepository counterRepo = Mockito.mock(CounterRepository.class);
+        counterRepository = new StandardCounterRepository();
         provenanceRepo = new MockProvenanceRepository();
 
         final Connection connection = createConnection();
@@ -194,7 +196,7 @@ public class TestStandardProcessSession {
         contentRepo.initialize(new StandardResourceClaimManager());
         flowFileRepo = new MockFlowFileRepository(contentRepo);
 
-        context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+        context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepository, provenanceRepo);
         session = new StandardProcessSession(context, () -> false);
     }
 
@@ -297,6 +299,48 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testCheckpointMergesCounters() {
+        final Relationship relationship = new Relationship.Builder().name("A").build();
+
+        FlowFile flowFile = session.create();
+        session.transfer(flowFile, relationship);
+        session.adjustCounter("a", 1, false);
+        session.checkpoint();
+
+        flowFile = session.create();
+        session.transfer(flowFile, relationship);
+        session.adjustCounter("a", 1, false);
+        session.adjustCounter("b", 3, false);
+        session.checkpoint();
+
+        assertEquals(0, counterRepository.getCounters().size());
+        session.commit();
+
+        // We should have 2 different counters with the name "a" and 2 different counters with the name "b" -
+        // one for the "All Instances" context and one for the individual instance's context.
+        final List<Counter> counters = counterRepository.getCounters();
+        assertEquals(4, counters.size());
+
+        int aCounters = 0;
+        int bCounters = 0;
+        for (final Counter counter : counters) {
+            switch (counter.getName()) {
+                case "a":
+                    assertEquals(2, counter.getValue());
+                    aCounters++;
+                    break;
+                case "b":
+                    assertEquals(3, counter.getValue());
+                    bCounters++;
+                    break;
+            }
+        }
+
+        assertEquals(2, aCounters);
+        assertEquals(2, bCounters);
+    }
+
+    @Test
     public void testHandlingOfMultipleFlowFilesWithSameId() {
         // Add two FlowFiles with the same ID
         for (int i=0; i < 2; i++) {


[nifi] 01/02: NIFI-6800 - Fix hashmap counter overwritten in highThroughputSession

Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git

commit 67f1677212368290228b373fdbeb73f0265e2211
Author: Ivan Ezequiel Rodriguez <iv...@claro.com.ar>
AuthorDate: Wed Oct 23 10:07:05 2019 -0300

    NIFI-6800 - Fix hashmap counter overwritten in highThroughputSession
    
    This closes #3837.
    
    Signed-off-by: Mark Payne <ma...@hotmail.com>
---
 .../apache/nifi/controller/repository/StandardProcessSession.java   | 6 +++++-
 1 file changed, 5 insertions(+), 1 deletion(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index c2502d3..5684ab6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -3381,7 +3381,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
 
             if (session.countersOnCommit != null) {
-                this.countersOnCommit.putAll(session.countersOnCommit);
+                if (this.countersOnCommit.isEmpty()) {
+                    this.countersOnCommit.putAll(session.countersOnCommit);
+                } else {
+                    session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, (v1, v2) -> v1 + v2));
+                }
             }
 
             if (session.immediateCounters != null) {