You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/10/29 14:38:24 UTC
[nifi] branch master updated (dccbde4 -> ce5eae5)
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git.
from dccbde4 NIFI-5452: Allow ignore block locality in HDFS Fixed formatting. Fallback to autoboxing
new 67f1677 NIFI-6800 - Fix hashmap counter overwritten in highThroughputSession
new ce5eae5 NIFI-6800: Added unit test to verify behavior of contribution
The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails. The revisions
listed as "add" were already present in the repository and have only
been added to this reference.
Summary of changes:
.../repository/StandardProcessSession.java | 6 ++-
.../repository/TestStandardProcessSession.java | 48 +++++++++++++++++++++-
2 files changed, 51 insertions(+), 3 deletions(-)
[nifi] 02/02: NIFI-6800: Added unit test to verify behavior of
contribution
Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit ce5eae5b2cd5cf49a90d2980c58cd6647f4af292
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 29 10:17:27 2019 -0400
NIFI-6800: Added unit test to verify behavior of contribution
---
.../repository/TestStandardProcessSession.java | 48 +++++++++++++++++++++-
1 file changed, 46 insertions(+), 2 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index b2e1870..267f22d 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -19,6 +19,7 @@ package org.apache.nifi.controller.repository;
import org.apache.nifi.connectable.Connectable;
import org.apache.nifi.connectable.ConnectableType;
import org.apache.nifi.connectable.Connection;
+import org.apache.nifi.controller.Counter;
import org.apache.nifi.controller.ProcessScheduler;
import org.apache.nifi.controller.queue.FlowFileQueue;
import org.apache.nifi.controller.queue.NopConnectionEventListener;
@@ -112,6 +113,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
+ private CounterRepository counterRepository;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private static StandardResourceClaimManager resourceClaimManager;
@@ -154,7 +156,7 @@ public class TestStandardProcessSession {
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
final FlowFileEventRepository flowFileEventRepo = Mockito.mock(FlowFileEventRepository.class);
- final CounterRepository counterRepo = Mockito.mock(CounterRepository.class);
+ counterRepository = new StandardCounterRepository();
provenanceRepo = new MockProvenanceRepository();
final Connection connection = createConnection();
@@ -194,7 +196,7 @@ public class TestStandardProcessSession {
contentRepo.initialize(new StandardResourceClaimManager());
flowFileRepo = new MockFlowFileRepository(contentRepo);
- context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepo, provenanceRepo);
+ context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepository, provenanceRepo);
session = new StandardProcessSession(context, () -> false);
}
@@ -297,6 +299,48 @@ public class TestStandardProcessSession {
}
@Test
+ public void testCheckpointMergesCounters() {
+ final Relationship relationship = new Relationship.Builder().name("A").build();
+
+ FlowFile flowFile = session.create();
+ session.transfer(flowFile, relationship);
+ session.adjustCounter("a", 1, false);
+ session.checkpoint();
+
+ flowFile = session.create();
+ session.transfer(flowFile, relationship);
+ session.adjustCounter("a", 1, false);
+ session.adjustCounter("b", 3, false);
+ session.checkpoint();
+
+ assertEquals(0, counterRepository.getCounters().size());
+ session.commit();
+
+ // We should have 2 different counters with the name "a" and 2 different counters with the name "b" -
+ // one for the "All Instances" context and one for the individual instance's context.
+ final List<Counter> counters = counterRepository.getCounters();
+ assertEquals(4, counters.size());
+
+ int aCounters = 0;
+ int bCounters = 0;
+ for (final Counter counter : counters) {
+ switch (counter.getName()) {
+ case "a":
+ assertEquals(2, counter.getValue());
+ aCounters++;
+ break;
+ case "b":
+ assertEquals(3, counter.getValue());
+ bCounters++;
+ break;
+ }
+ }
+
+ assertEquals(2, aCounters);
+ assertEquals(2, bCounters);
+ }
+
+ @Test
public void testHandlingOfMultipleFlowFilesWithSameId() {
// Add two FlowFiles with the same ID
for (int i=0; i < 2; i++) {
[nifi] 01/02: NIFI-6800 - Fix hashmap counter overwritten in
highThroughputSession
Posted by ma...@apache.org.
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
commit 67f1677212368290228b373fdbeb73f0265e2211
Author: Ivan Ezequiel Rodriguez <iv...@claro.com.ar>
AuthorDate: Wed Oct 23 10:07:05 2019 -0300
NIFI-6800 - Fix hashmap counter overwritten in highThroughputSession
This closes #3837.
Signed-off-by: Mark Payne <ma...@hotmail.com>
---
.../apache/nifi/controller/repository/StandardProcessSession.java | 6 +++++-
1 file changed, 5 insertions(+), 1 deletion(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index c2502d3..5684ab6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -3381,7 +3381,11 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
if (session.countersOnCommit != null) {
- this.countersOnCommit.putAll(session.countersOnCommit);
+ if (this.countersOnCommit.isEmpty()) {
+ this.countersOnCommit.putAll(session.countersOnCommit);
+ } else {
+ session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, (v1, v2) -> v1 + v2));
+ }
}
if (session.immediateCounters != null) {