You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2014/12/16 17:22:00 UTC
[05/27] incubator-nifi git commit: NIFI-72: Added unit tests and
fixed bug that caused CONTENT_MODIFIED to be emitted for newly created
FlowFiles
NIFI-72: Added unit tests and fixed bug that caused CONTENT_MODIFIED to be emitted for newly created FlowFiles
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/cbea1f19
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/cbea1f19
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/cbea1f19
Branch: refs/heads/nifi-27
Commit: cbea1f193620f934d6186d167d0c0fe0723fad7c
Parents: 6b0a5e8
Author: Mark Payne <ma...@hotmail.com>
Authored: Thu Dec 11 09:16:48 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Thu Dec 11 09:16:48 2014 -0500
----------------------------------------------------------------------
.../repository/StandardProcessSession.java | 3 +-
.../repository/TestStandardProcessSession.java | 68 ++++++++++++++++++++
2 files changed, 70 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cbea1f19/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 4ba45aa..11172a8 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -579,7 +579,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
continue;
}
- if ( contentChanged ) {
+ final boolean newFlowFile = repoRecord.getOriginal() == null;
+ if ( contentChanged && !newFlowFile ) {
recordsToSubmit.add(provenanceReporter.build(curFlowFile, ProvenanceEventType.CONTENT_MODIFIED).build());
addEventType(eventTypesPerFlowFileId, flowFileId, ProvenanceEventType.CONTENT_MODIFIED);
eventAdded = true;
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/cbea1f19/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
index 3dbbcf3..060bbd9 100644
--- a/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
+++ b/nar-bundles/framework-bundle/framework/core/src/test/java/org/apache/nifi/controller/repository/TestStandardProcessSession.java
@@ -846,6 +846,74 @@ public class TestStandardProcessSession {
assertEquals(ProvenanceEventType.CREATE, event.getEventType());
}
+ @Test
+ public void testContentModifiedNotEmittedForCreate() throws IOException {
+ FlowFile newFlowFile = session.create();
+ newFlowFile = session.write(newFlowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ }
+ });
+ session.transfer(newFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+ assertFalse(events.isEmpty());
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals(ProvenanceEventType.CREATE, event.getEventType());
+ }
+
+ @Test
+ public void testContentModifiedEmittedAndNotAttributesModified() throws IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
+ this.flowFileQueue.put(flowFile);
+
+ FlowFile existingFlowFile = session.get();
+ existingFlowFile = session.write(existingFlowFile, new OutputStreamCallback() {
+ @Override
+ public void process(OutputStream out) throws IOException {
+ }
+ });
+ existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
+ session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+ assertFalse(events.isEmpty());
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals(ProvenanceEventType.CONTENT_MODIFIED, event.getEventType());
+ }
+
+ @Test
+ public void testAttributesModifiedEmitted() throws IOException {
+ final FlowFileRecord flowFile = new StandardFlowFileRecord.Builder()
+ .id(1L)
+ .addAttribute("uuid", "000000000000-0000-0000-0000-00000000")
+ .build();
+ this.flowFileQueue.put(flowFile);
+
+ FlowFile existingFlowFile = session.get();
+ existingFlowFile = session.putAttribute(existingFlowFile, "attr", "a");
+ session.transfer(existingFlowFile, new Relationship.Builder().name("A").build());
+ session.commit();
+
+ final List<ProvenanceEventRecord> events = provenanceRepo.getEvents(0L, 10000);
+ assertFalse(events.isEmpty());
+ assertEquals(1, events.size());
+
+ final ProvenanceEventRecord event = events.get(0);
+ assertEquals(ProvenanceEventType.ATTRIBUTES_MODIFIED, event.getEventType());
+ }
+
+
+
private static class MockFlowFileRepository implements FlowFileRepository {
private final AtomicLong idGenerator = new AtomicLong(0L);