You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/27 13:14:02 UTC

nifi git commit: NIFI-5420: Allow StandardProcessSession to calculate duration for provenance events

Repository: nifi
Updated Branches:
  refs/heads/master b1f78d58a -> 473221368


NIFI-5420: Allow StandardProcessSession to calculate duration for provenance events

This closes #2886.

Signed-off-by: Mark Payne <ma...@hotmail.com>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/47322136
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/47322136
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/47322136

Branch: refs/heads/master
Commit: 473221368c75d371cbabf7e68b0bfb62fbe56c00
Parents: b1f78d5
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Jul 12 17:13:32 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 27 09:13:50 2018 -0400

----------------------------------------------------------------------
 .../repository/ProvenanceEventEnricher.java     |  3 ++-
 .../repository/StandardProcessSession.java      | 28 +++++++++++---------
 .../repository/StandardProvenanceReporter.java  |  6 +++--
 .../repository/TestStandardProcessSession.java  | 23 ++++++++++++++++
 .../repository/StandardRepositoryRecord.java    |  5 ++++
 5 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
index 323bfb0..f0f6bc4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
@@ -26,8 +26,9 @@ public interface ProvenanceEventEnricher {
      *
      * @param record record
      * @param flowFile flowfile
+     * @param commitNanos the time (in nanoseconds) when the associated session was committed
      * @return new event record
      */
-    ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile);
+    ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile, long commitNanos);
 
 }

http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index ea4969c..7255645 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -737,6 +737,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
         }
 
+        final long commitNanos = System.nanoTime();
         final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
         final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
             final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator();
@@ -761,9 +762,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                             // the representation of the FlowFile as it is committed, as this is the only way in which it really
                             // exists in our system -- all other representations are volatile representations that have not been
                             // exposed.
-                            return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND);
+                            return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND, commitNanos);
                         } else if (autoTermIterator != null && autoTermIterator.hasNext()) {
-                            return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true);
+                            return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos);
                         }
 
                         throw new NoSuchElementException();
@@ -796,7 +797,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     @Override
-    public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile) {
+    public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile, final long commitNanos) {
         verifyTaskActive();
 
         final StandardRepositoryRecord repoRecord = records.get(flowFile);
@@ -829,11 +830,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         }
 
         recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
+        if (rawEvent.getEventDuration() < 0) {
+            recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
+        }
         return recordBuilder.build();
     }
 
     private StandardProvenanceEventRecord enrich(
-        final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
+        final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records,
+        final boolean updateAttributes, final long commitNanos) {
         final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
         final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
         if (eventFlowFile != null) {
@@ -861,18 +866,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             if (originalQueue != null) {
                 recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
             }
-        }
 
-        if (updateAttributes) {
-            final FlowFileRecord flowFileRecord = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
-            if (flowFileRecord != null) {
-                final StandardRepositoryRecord record = records.get(flowFileRecord);
-                if (record != null) {
-                    recordBuilder.setAttributes(record.getOriginalAttributes(), record.getUpdatedAttributes());
-                }
+            if (updateAttributes) {
+                recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
             }
-        }
 
+            if (rawEvent.getEventDuration() < 0) {
+                recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
+            }
+        }
         return recordBuilder.build();
     }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index 7d14ee3..e956d50 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -209,7 +209,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
     public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) {
         try {
             final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
-            final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile);
+            // If the transmissionMillis field has been populated, use zero as the value of commitNanos (the call to System.nanoTime() is expensive but the value will be ignored).
+            final long commitNanos = transmissionMillis < 0 ? System.nanoTime() : 0L;
+            final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile, commitNanos);
 
             if (force) {
                 repository.registerEvent(enriched);
@@ -226,7 +228,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
 
     @Override
     public void send(final FlowFile flowFile, final String transitUri, final boolean force) {
-        send(flowFile, transitUri, -1L, true);
+        send(flowFile, transitUri, -1L, force);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 4059557..5667094 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -929,6 +929,29 @@ public class TestStandardProcessSession {
     }
 
     @Test
+    public void testProvenanceEventsHaveDurationFromSession() throws IOException {
+        final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+                .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+                .entryDate(System.currentTimeMillis())
+                .build();
+
+        flowFileQueue.put(flowFileRecord);
+
+        final FlowFile orig = session.get();
+        final FlowFile newFlowFile = session.create(orig);
+        session.getProvenanceReporter().fork(orig, Collections.singletonList(newFlowFile), 0L);
+        session.getProvenanceReporter().fetch(newFlowFile, "nowhere://");
+        session.getProvenanceReporter().send(newFlowFile, "nowhere://");
+        session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
+        session.commit();
+
+        List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 100000);
+        assertNotNull(events);
+        assertEquals(3, events.size()); // FETCH, SEND, and FORK
+        events.forEach((event) -> assertTrue(event.getEventDuration() > -1));
+    }
+
+    @Test
     public void testUuidAttributeCannotBeUpdated() {
         String originalUuid = "11111111-1111-1111-1111-111111111111";
         final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()

http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index 8aa1caf..6f045e5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -38,6 +38,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
     private final Map<String, String> updatedAttributes = new HashMap<>();
     private final Map<String, String> originalAttributes;
     private List<ContentClaim> transientClaims;
+    private final long startNanos = System.nanoTime();
 
     /**
      * Creates a new record which has no original claim or flow file - it is entirely new
@@ -218,4 +219,8 @@ public class StandardRepositoryRecord implements RepositoryRecord {
         }
         transientClaims.add(claim);
     }
+
+    public long getStartNanos() {
+        return startNanos;
+    }
 }