You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2020/01/06 20:01:54 UTC

[nifi] branch master updated: NIFI-6822: Ensure that when we manage a Map of ID -> Count, that we properly merge those maps during a checkpoint instead of overwriting existing values

This is an automated email from the ASF dual-hosted git repository.

markap14 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c958deb  NIFI-6822: Ensure that when we manage a Map of ID -> Count, that we properly merge those maps during a checkpoint instead of overwriting existing values
     new e4bdc79  Merge pull request #3853 from markap14/NIFI-6822
c958deb is described below

commit c958deb5b01bf26c856b5d6a2b93cee87163c10f
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Oct 29 12:23:49 2019 -0400

    NIFI-6822: Ensure that when we manage a Map of ID -> Count, that we properly merge those maps during a checkpoint instead of overwriting existing values
---
 .../repository/StandardProcessSession.java         | 58 +++++++++++++++-------
 .../repository/metrics/StandardFlowFileEvent.java  | 28 +++++++++++
 .../repository/TestStandardProcessSession.java     | 48 ++++++++++++++++--
 3 files changed, 111 insertions(+), 23 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index c930b36..da7a6ee 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -86,6 +86,8 @@ import java.util.UUID;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicLong;
+import java.util.function.BiConsumer;
+import java.util.function.BiFunction;
 import java.util.regex.Pattern;
 import java.util.stream.Collectors;
 
@@ -3365,7 +3367,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
         private final Map<Long, StandardRepositoryRecord> records = new ConcurrentHashMap<>();
         private final Map<String, StandardFlowFileEvent> connectionCounts = new ConcurrentHashMap<>();
-        private final Map<FlowFileQueue, Set<FlowFileRecord>> unacknowledgedFlowFiles = new ConcurrentHashMap<>();
 
         private Map<String, Long> countersOnCommit = new HashMap<>();
         private Map<String, Long> immediateCounters = new HashMap<>();
@@ -3392,24 +3393,10 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             this.reportedEvents.addAll(session.provenanceReporter.getEvents());
 
             this.records.putAll(session.records);
-            this.connectionCounts.putAll(session.connectionCounts);
-            this.unacknowledgedFlowFiles.putAll(session.unacknowledgedFlowFiles);
 
-            if (session.countersOnCommit != null) {
-                if (this.countersOnCommit.isEmpty()) {
-                    this.countersOnCommit.putAll(session.countersOnCommit);
-                } else {
-                    session.countersOnCommit.forEach((key, value) -> this.countersOnCommit.merge(key, value, Long::sum));
-                }
-            }
-
-            if (session.immediateCounters != null) {
-                if (this.immediateCounters.isEmpty()) {
-                    this.immediateCounters.putAll(session.immediateCounters);
-                } else {
-                    session.immediateCounters.forEach((key, value) -> this.immediateCounters.merge(key, value, Long::sum));
-                }
-            }
+            mergeMapsWithMutableValue(this.connectionCounts, session.connectionCounts, (destination, toMerge) -> destination.add(toMerge));
+            mergeMaps(this.countersOnCommit, session.countersOnCommit, Long::sum);
+            mergeMaps(this.immediateCounters, session.immediateCounters, Long::sum);
 
             this.deleteOnCommit.putAll(session.deleteOnCommit);
             this.removedFlowFiles.addAll(session.removedFlowFiles);
@@ -3425,6 +3412,41 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             this.contentSizeOut += session.contentSizeOut;
         }
 
+        private <K, V> void mergeMaps(final Map<K, V> destination, final Map<K, V> toMerge, final BiFunction<? super V, ? super V, ? extends V> merger) {
+            if (toMerge == null) {
+                return;
+            }
+
+            if (destination.isEmpty()) {
+                destination.putAll(toMerge);
+            } else {
+                toMerge.forEach((key, value) -> destination.merge(key, value, merger));
+            }
+        }
+
+        private <K, V> void mergeMapsWithMutableValue(final Map<K, V> destination, final Map<K, V> toMerge, final BiConsumer<? super V, ? super V> merger) {
+            if (toMerge == null) {
+                return;
+            }
+
+            if (destination.isEmpty()) {
+                destination.putAll(toMerge);
+                return;
+            }
+
+            for (final Map.Entry<K, V> entry : toMerge.entrySet()) {
+                final K key = entry.getKey();
+                final V value = entry.getValue();
+
+                final V destinationValue = destination.get(key);
+                if (destinationValue == null) {
+                    destination.put(key, value);
+                } else {
+                    merger.accept(destinationValue, value);
+                }
+            }
+        }
+
         private StandardRepositoryRecord getRecord(final FlowFile flowFile) {
             return records.get(flowFile.getId());
         }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
index fc00675..a6e12c9 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/metrics/StandardFlowFileEvent.java
@@ -18,6 +18,7 @@ package org.apache.nifi.controller.repository.metrics;
 
 import org.apache.nifi.controller.repository.FlowFileEvent;
 
+import java.util.HashMap;
 import java.util.Map;
 
 public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
@@ -194,4 +195,31 @@ public final class StandardFlowFileEvent implements FlowFileEvent, Cloneable {
     public void setCounters(final Map<String, Long> counters) {
         this.counters = counters;
     }
+
+    public void add(final FlowFileEvent event) {
+        flowFilesIn += event.getFlowFilesIn();
+        flowFilesOut += event.getFlowFilesOut();
+        flowFilesRemoved += event.getFlowFilesRemoved();
+        contentSizeIn += event.getContentSizeIn();
+        contentSizeOut += event.getContentSizeOut();
+        contentSizeRemoved += event.getContentSizeRemoved();
+        bytesRead += event.getBytesRead();
+        bytesWritten += event.getBytesWritten();
+        processingNanos += event.getProcessingNanoseconds();
+        aggregateLineageMillis += event.getAggregateLineageMillis();
+        flowFilesReceived += event.getFlowFilesReceived();
+        bytesReceived += event.getBytesReceived();
+        flowFilesSent += event.getFlowFilesSent();
+        bytesSent += event.getBytesSent();
+        invocations += event.getInvocations();
+
+        final Map<String, Long> eventCounters = event.getCounters();
+        if (eventCounters != null) {
+            if (counters == null) {
+                counters = new HashMap<>();
+            }
+
+            eventCounters.forEach((k, v) -> counters.merge(k, v, Long::sum));
+        }
+    }
 }
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 445a48a..5e3b1b6 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -115,7 +115,7 @@ public class TestStandardProcessSession {
     private ProvenanceEventRepository provenanceRepo;
     private MockFlowFileRepository flowFileRepo;
     private CounterRepository counterRepository;
-    private FlowFileEventRepository flowFileEventRepo;
+    private FlowFileEventRepository flowFileEventRepository;
     private final Relationship FAKE_RELATIONSHIP = new Relationship.Builder().name("FAKE").build();
     private static StandardResourceClaimManager resourceClaimManager;
 
@@ -157,7 +157,7 @@ public class TestStandardProcessSession {
         resourceClaimManager = new StandardResourceClaimManager();
 
         System.setProperty(NiFiProperties.PROPERTIES_FILE_PATH, TestStandardProcessSession.class.getResource("/conf/nifi.properties").getFile());
-        flowFileEventRepo = new RingBufferEventRepository(1);
+        flowFileEventRepository = new RingBufferEventRepository(1);
         counterRepository = new StandardCounterRepository();
         provenanceRepo = new MockProvenanceRepository();
 
@@ -198,7 +198,7 @@ public class TestStandardProcessSession {
         contentRepo.initialize(new StandardResourceClaimManager());
         flowFileRepo = new MockFlowFileRepository(contentRepo);
 
-        context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepo, counterRepository, provenanceRepo);
+        context = new RepositoryContext(connectable, new AtomicLong(0L), contentRepo, flowFileRepo, flowFileEventRepository, counterRepository, provenanceRepo);
         session = new StandardProcessSession(context, () -> false);
     }
 
@@ -399,7 +399,7 @@ public class TestStandardProcessSession {
         session.transfer(flowFile);
         session.commit();
 
-        final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+        final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
         final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
         assertEquals(1, bytesRead);
     }
@@ -430,11 +430,49 @@ public class TestStandardProcessSession {
         session.transfer(flowFile);
         session.commit();
 
-        final RepositoryStatusReport report = flowFileEventRepo.reportTransferEvents(0L);
+        final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
         final long bytesRead = report.getReportEntry("connectable-1").getBytesRead();
         assertEquals(1, bytesRead);
     }
 
+    public void testCheckpointMergesMaps() {
+        for (int i=0; i < 2; i++) {
+            final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+                .id(i)
+                .entryDate(System.currentTimeMillis())
+                .size(0L)
+                .build();
+
+            flowFileQueue.put(flowFileRecord);
+        }
+
+        final Relationship relationship = new Relationship.Builder().name("A").build();
+
+        for (int i=0; i < 2; i++) {
+            FlowFile ff1 = session.get();
+            assertNotNull(ff1);
+            session.transfer(ff1, relationship);
+            session.adjustCounter("counter", 1, false);
+            session.adjustCounter("counter", 1, true);
+            session.checkpoint();
+        }
+
+        session.commit();
+
+        final RepositoryStatusReport report = flowFileEventRepository.reportTransferEvents(0L);
+        final FlowFileEvent queueFlowFileEvent = report.getReportEntry("conn-uuid");
+        assertNotNull(queueFlowFileEvent);
+        assertEquals(2, queueFlowFileEvent.getFlowFilesOut());
+        assertEquals(0L, queueFlowFileEvent.getContentSizeOut());
+
+        final FlowFileEvent componentFlowFileEvent = report.getReportEntry("connectable-1");
+        final Map<String, Long> counters = componentFlowFileEvent.getCounters();
+        assertNotNull(counters);
+        assertEquals(1, counters.size());
+        assertTrue(counters.containsKey("counter"));
+        assertEquals(4L, counters.get("counter").longValue()); // increment twice for each FlowFile, once immediate, once not.
+    }
+
     @Test
     public void testHandlingOfMultipleFlowFilesWithSameId() {
         // Add two FlowFiles with the same ID