You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/07/27 13:14:02 UTC
nifi git commit: NIFI-5420: Allow StandardProcessSession to calculate
duration for provenance events
Repository: nifi
Updated Branches:
refs/heads/master b1f78d58a -> 473221368
NIFI-5420: Allow StandardProcessSession to calculate duration for provenance events
This closes #2886.
Signed-off-by: Mark Payne <ma...@hotmail.com>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/47322136
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/47322136
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/47322136
Branch: refs/heads/master
Commit: 473221368c75d371cbabf7e68b0bfb62fbe56c00
Parents: b1f78d5
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Jul 12 17:13:32 2018 -0400
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Jul 27 09:13:50 2018 -0400
----------------------------------------------------------------------
.../repository/ProvenanceEventEnricher.java | 3 ++-
.../repository/StandardProcessSession.java | 28 +++++++++++---------
.../repository/StandardProvenanceReporter.java | 6 +++--
.../repository/TestStandardProcessSession.java | 23 ++++++++++++++++
.../repository/StandardRepositoryRecord.java | 5 ++++
5 files changed, 49 insertions(+), 16 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
index 323bfb0..f0f6bc4 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/ProvenanceEventEnricher.java
@@ -26,8 +26,9 @@ public interface ProvenanceEventEnricher {
*
* @param record record
* @param flowFile flowfile
+ * @param commitNanos the time (in nanoseconds) when the associated session was committed
* @return new event record
*/
- ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile);
+ ProvenanceEventRecord enrich(ProvenanceEventRecord record, FlowFile flowFile, long commitNanos);
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index ea4969c..7255645 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -737,6 +737,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
flowFileRecordMap.put(flowFile.getAttribute(CoreAttributes.UUID.key()), flowFile);
}
+ final long commitNanos = System.nanoTime();
final List<ProvenanceEventRecord> autoTermEvents = checkpoint.autoTerminatedEvents;
final Iterable<ProvenanceEventRecord> iterable = new Iterable<ProvenanceEventRecord>() {
final Iterator<ProvenanceEventRecord> recordsToSubmitIterator = recordsToSubmit.iterator();
@@ -761,9 +762,9 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
// the representation of the FlowFile as it is committed, as this is the only way in which it really
// exists in our system -- all other representations are volatile representations that have not been
// exposed.
- return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND);
+ return enrich(rawEvent, flowFileRecordMap, checkpoint.records, rawEvent.getEventType() != ProvenanceEventType.SEND, commitNanos);
} else if (autoTermIterator != null && autoTermIterator.hasNext()) {
- return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true);
+ return enrich(autoTermIterator.next(), flowFileRecordMap, checkpoint.records, true, commitNanos);
}
throw new NoSuchElementException();
@@ -796,7 +797,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
- public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile) {
+ public StandardProvenanceEventRecord enrich(final ProvenanceEventRecord rawEvent, final FlowFile flowFile, final long commitNanos) {
verifyTaskActive();
final StandardRepositoryRecord repoRecord = records.get(flowFile);
@@ -829,11 +830,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
+ if (rawEvent.getEventDuration() < 0) {
+ recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
+ }
return recordBuilder.build();
}
private StandardProvenanceEventRecord enrich(
- final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records, final boolean updateAttributes) {
+ final ProvenanceEventRecord rawEvent, final Map<String, FlowFileRecord> flowFileRecordMap, final Map<FlowFileRecord, StandardRepositoryRecord> records,
+ final boolean updateAttributes, final long commitNanos) {
final StandardProvenanceEventRecord.Builder recordBuilder = new StandardProvenanceEventRecord.Builder().fromEvent(rawEvent);
final FlowFileRecord eventFlowFile = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
if (eventFlowFile != null) {
@@ -861,18 +866,15 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (originalQueue != null) {
recordBuilder.setSourceQueueIdentifier(originalQueue.getIdentifier());
}
- }
- if (updateAttributes) {
- final FlowFileRecord flowFileRecord = flowFileRecordMap.get(rawEvent.getFlowFileUuid());
- if (flowFileRecord != null) {
- final StandardRepositoryRecord record = records.get(flowFileRecord);
- if (record != null) {
- recordBuilder.setAttributes(record.getOriginalAttributes(), record.getUpdatedAttributes());
- }
+ if (updateAttributes) {
+ recordBuilder.setAttributes(repoRecord.getOriginalAttributes(), repoRecord.getUpdatedAttributes());
}
- }
+ if (rawEvent.getEventDuration() < 0) {
+ recordBuilder.setEventDuration(TimeUnit.NANOSECONDS.toMillis(commitNanos - repoRecord.getStartNanos()));
+ }
+ }
return recordBuilder.build();
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
index 7d14ee3..e956d50 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProvenanceReporter.java
@@ -209,7 +209,9 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
public void send(final FlowFile flowFile, final String transitUri, final String details, final long transmissionMillis, final boolean force) {
try {
final ProvenanceEventRecord record = build(flowFile, ProvenanceEventType.SEND).setTransitUri(transitUri).setEventDuration(transmissionMillis).setDetails(details).build();
- final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile);
+ // If the transmissionMillis field has been populated, use zero as the value of commitNanos (the call to System.nanoTime() is expensive but the value will be ignored).
+ final long commitNanos = transmissionMillis < 0 ? System.nanoTime() : 0L;
+ final ProvenanceEventRecord enriched = eventEnricher == null ? record : eventEnricher.enrich(record, flowFile, commitNanos);
if (force) {
repository.registerEvent(enriched);
@@ -226,7 +228,7 @@ public class StandardProvenanceReporter implements ProvenanceReporter {
@Override
public void send(final FlowFile flowFile, final String transitUri, final boolean force) {
- send(flowFile, transitUri, -1L, true);
+ send(flowFile, transitUri, -1L, force);
}
@Override
http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 4059557..5667094 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -929,6 +929,29 @@ public class TestStandardProcessSession {
}
@Test
+ public void testProvenanceEventsHaveDurationFromSession() throws IOException {
+ final FlowFileRecord flowFileRecord = new StandardFlowFileRecord.Builder()
+ .addAttribute("uuid", "12345678-1234-1234-1234-123456789012")
+ .entryDate(System.currentTimeMillis())
+ .build();
+
+ flowFileQueue.put(flowFileRecord);
+
+ final FlowFile orig = session.get();
+ final FlowFile newFlowFile = session.create(orig);
+ session.getProvenanceReporter().fork(orig, Collections.singletonList(newFlowFile), 0L);
+ session.getProvenanceReporter().fetch(newFlowFile, "nowhere://");
+ session.getProvenanceReporter().send(newFlowFile, "nowhere://");
+ session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 100000);
+ assertNotNull(events);
+ assertEquals(3, events.size()); // FETCH, SEND, and FORK
+ events.forEach((event) -> assertTrue(event.getEventDuration() > -1));
+ }
+
+ @Test
public void testUuidAttributeCannotBeUpdated() {
String originalUuid = "11111111-1111-1111-1111-111111111111";
final FlowFileRecord flowFileRecord1 = new StandardFlowFileRecord.Builder()
http://git-wip-us.apache.org/repos/asf/nifi/blob/47322136/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
index 8aa1caf..6f045e5 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-repository-models/src/main/java/org/apache/nifi/controller/repository/StandardRepositoryRecord.java
@@ -38,6 +38,7 @@ public class StandardRepositoryRecord implements RepositoryRecord {
private final Map<String, String> updatedAttributes = new HashMap<>();
private final Map<String, String> originalAttributes;
private List<ContentClaim> transientClaims;
+ private final long startNanos = System.nanoTime();
/**
* Creates a new record which has no original claim or flow file - it is entirely new
@@ -218,4 +219,8 @@ public class StandardRepositoryRecord implements RepositoryRecord {
}
transientClaims.add(claim);
}
+
+ public long getStartNanos() {
+ return startNanos;
+ }
}