You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/01/06 20:01:54 UTC
[nifi] branch master updated: NIFI-6822: Ensure that when we manage
a Map of ID -> Count,
that we properly merge those maps during a checkpoint instead of overwriting
existing values
This is an automated email from the ASF dual-hosted git repository.
markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c958deb NIFI-6822: Ensure that when we manage a Map of ID -> Count, that we properly merge those maps during a checkpoint instead of overwriting existing values
new e4bdc79 Merge pull request #3853 from markap14/NIFI-6822
c958deb is described below
commit c958deb5b01bf26c856b5d6a2b93cee87163c10f
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 29 12:23:49 2019 -0400
NIFI-6822: Ensure that when we manage a Map of ID -> Count, that we properly merge those maps during a checkpoint instead of overwriting existing values
---
.../repository/StandardProcessSession.java | 58 +++++++++++++++-------
.../repository/metrics/StandardFlowFileEvent.java | 28 +++++++++++
.../repository/TestStandardProcessSession.java | 48 ++++++++++++++++--
3 files changed, 111 insertions(+), 23 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index c930b36..da7a6ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -86,6 +86,8 @@ import java.util.UUID;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
import java.util.regex.Pattern;
import java.util.stream.Collectors;
@@ -3365,7 +3367,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<>();
private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>();
- private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>();
private Map<String, Long> countersOnCommit = new HashMap<>();
private Map<String, Long> immediateCounters = new HashMap<>();
@@ -3392,24 +3393,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.reportedEvents.addAll(session.provenanceReporter.getEvents());
this.records.putAll(session.records);
- this.connectionCounts.putAll(session.connectionCounts);
- this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
- if (session.countersOnCommit != null) {
- if (this.countersOnCommit.isEmpty()) {
- this.countersOnCommit.putAll(session.countersOnCommit);
- } else {
- session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, Long::sum));
- }
- }
-
- if (session.immediateCounters != null) {
- if (this.immediateCounters.isEmpty()) {
- this.immediateCounters.putAll(session.immediateCounters);
- } else {
- session.immediateCounters.forEach((key, value) -> this.immediateCounters.merge(key, value, Long::sum));
- }
- }
+ mergeMapsWithMutableValue(this.connectionCounts, session.connectionCounts, (destination, toMerge) -> destination.add(toMerge));
+ mergeMaps(this.countersOnCommit, session.countersOnCommit, Long::sum);
+ mergeMaps(this.immediateCounters, session.immediateCounters, Long::sum);
this.deleteOnCommit.putAll(session.deleteOnCommit);
this.removedFlowFiles.addAll(session.removedFlowFiles);
@@ -3425,6 +3412,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
this.contentSizeOut += session.contentSizeOut;
}
+ private <K, V> void mergeMaps(final Map<K, V> destination, final Map<K, V> toMerge, final BiFunction<? super V, ? super V, ? extends V> merger) {
+ if (toMerge == null) {
+ return;
+ }
+
+ if (destination.isEmpty()) {
+ destination.putAll(toMerge);
+ } else {
+ toMerge.forEach((key, value) -> destination.merge(key, value, merger));
+ }
+ }
+
+ private <K, V> void mergeMapsWithMutableValue(final Map<K, V> destination, final Map<K, V> toMerge, final BiConsumer<? super V, ? super V> merger) {
+ if (toMerge == null) {
+ return;
+ }
+
+ if (destination.isEmpty()) {
+ destination.putAll(toMerge);
+ return;
+ }
+
+ for (final Map.Entry<K, V> entry : toMerge.entrySet()) {
+ final K key = entry.getKey();
+ final V value = entry.getValue();
+
+ final V destinationValue = destination.get(key);
+ if (destinationValue == null) {
+ destination.put(key, value);
+ } else {
+ merger.accept(destinationValue, value);
+ }
+ }
+ }
+
private StandardRepositoryRecord getRecord(final FlowFile flowFile) {
return records.get(flowFile.getId());
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
index fc00675..a6e12c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository.metrics;
import org.apache.nifi.controller.repository.FlowFileEvent;
+import java.util.HashMap;
import java.util.Map;
public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
@@ -194,4 +195,31 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
public void setCounters(final Map<String, Long> counters) {
this.counters = counters;
}
+
+ public void add(final FlowFileEvent event) {
+ flowFilesIn += event.getFlowFilesIn();
+ flowFilesOut += event.getFlowFilesOut();
+ flowFilesRemoved += event.getFlowFilesRemoved();
+ contentSizeIn += event.getContentSizeIn();
+ contentSizeOut += event.getContentSizeOut();
+ contentSizeRemoved += event.getContentSizeRemoved();
+ bytesRead += event.getBytesRead();
+ bytesWritten += event.getBytesWritten();
+ processingNanos += event.getProcessingNanoseconds();
+ aggregateLineageMillis += event.getAggregateLineageMillis();
+ flowFilesReceived += event.getFlowFilesReceived();
+ bytesReceived += event.getBytesReceived();
+ flowFilesSent += event.getFlowFilesSent();
+ bytesSent += event.getBytesSent();
+ invocations += event.getInvocations();
+
+ final Map<String, Long> eventCounters = event.getCounters();
+ if (eventCounters != null) {
+ if (counters == null) {
+ counters = new HashMap<>();
+ }
+
+ eventCounters.forEach((k, v) -> counters.merge(k, v, Long::sum));
+ }
+ }
}
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 445a48a..5e3b1b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -115,7 +115,7 @@ public class TestStandardProcessSession {
private ProvenanceEventRepository provenanceRepo;
private MockFlowFileRepository flowFileRepo;
private CounterRepository counterRepository;
- private FlowFileEventRepository flowFileEventRepo;
+ private FlowFileEventRepository flowFileEventRepository;
private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
private static StandardResourceClaimManager resourceClaimManager;
@@ -157,7 +157,7 @@ public class TestStandardProcessSession {
resourceClaimManager = new StandardResourceClaimManager();
System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
- flowFileEventRepo = new RingBufferEventRepository(1);
+ flowFileEventRepository = new RingBufferEventRepository(1);
counterRepository = new StandardCounterRepository();
provenanceRepo = new MockProvenanceRepository();
@@ -198,7 +198,7 @@ public class TestStandardProcessSession {
contentRepo.initialize(new StandardResourceClaimManager());
flowFileRepo = new MockFlowFileRepository(contentRepo);
- context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepository, provenanceRepo);
+ context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo);
session = new StandardProcessSession(context, () -> false);
}
@@ -399,7 +399,7 @@ public class TestStandardProcessSession {
session.transfer(flowFile);
session.commit();
- final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+ final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
assertEquals(1, bytesRead);
}
@@ -430,11 +430,49 @@ public class TestStandardProcessSession {
session.transfer(flowFile);
session.commit();
- final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+ final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
assertEquals(1, bytesRead);
}
+ public void testCheckpointMergesMaps() {
+ for (int i=0; i < 2; i++) {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .id(i)
+ .entryDate(System.currentTimeMillis())
+ .size(0L)
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+ }
+
+ final Relationship relationship = new Relationship.Builder().name("A").build();
+
+ for (int i=0; i < 2; i++) {
+ FlowFile ff1 = session.get();
+ assertNotNull(ff1);
+ session.transfer(ff1, relationship);
+ session.adjustCounter("counter", 1, false);
+ session.adjustCounter("counter", 1, true);
+ session.checkpoint();
+ }
+
+ session.commit();
+
+ final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
+ final FlowFileEvent queueFlowFileEvent = report.getReportEntry("conn-uuid");
+ assertNotNull(queueFlowFileEvent);
+ assertEquals(2, queueFlowFileEvent.getFlowFilesOut());
+ assertEquals(0L, queueFlowFileEvent.getContentSizeOut());
+
+ final FlowFileEvent componentFlowFileEvent = report.getReportEntry("connectable-1");
+ final Map<String, Long> counters = componentFlowFileEvent.getCounters();
+ assertNotNull(counters);
+ assertEquals(1, counters.size());
+ assertTrue(counters.containsKey("counter"));
+ assertEquals(4L, counters.get("counter").longValue()); // increment twice for each FlowFile, once immediate, once not.
+ }
+
@Test
public void testHandlingOfMultipleFlowFilesWithSameId() {
// Add two FlowFiles with the same ID