You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/04/23 08:08:46 UTC

[nifi] branch master updated: NIFI-6220: If FlowFile is created by cloning a relationship, do not create an ATTRIBUTES_MODIFIED provenance event for it.

This is an automated email from the ASF dual-hosted git repository.

ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new 055b3ca  NIFI-6220: If FlowFile is created by cloning a relationship, do not create an ATTRIBUTES_MODIFIED provenance event for it.
055b3ca is described below

commit 055b3cac54e3e234c0ee07189b7b7b85e0582d58
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Apr 16 14:52:34 2019 -0400

    NIFI-6220: If FlowFile is created by cloning a relationship, do not create an ATTRIBUTES_MODIFIED provenance event for it.
    
    NIFI-6220: Updated test name and fixed typo
    
    This closes #3438.
    
    Signed-off-by: Koji Kawamura <ij...@apache.org>
---
 .../repository/StandardProcessSession.java         | 191 +++++++++++----------
 .../integration/provenance/ProvenanceEventsIT.java |  58 +++++++
 2 files changed, 160 insertions(+), 89 deletions(-)

diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 14cb70a..c2502d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -325,6 +325,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                     newRecord.setTransferRelationship(record.getTransferRelationship());
                     // put the mapping into toAdd because adding to records now will cause a ConcurrentModificationException
                     toAdd.put(clone.getId(), newRecord);
+
+                    createdFlowFiles.add(newUuid);
                 }
             }
         }
@@ -639,6 +641,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
 
             recordsToSubmit.add(event);
             addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
+
+            final List<String> childUuids = event.getChildUuids();
+            if (childUuids != null) {
+                for (final String childUuid : childUuids) {
+                    addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
+                }
+            }
         }
 
         // Finally, add any other events that we may have generated.
@@ -684,6 +693,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
                 if (registeredTypes != null) {
                     if (registeredTypes.get(ProvenanceEventType.CREATE.ordinal())
                         || registeredTypes.get(ProvenanceEventType.FORK.ordinal())
+                        || registeredTypes.get(ProvenanceEventType.CLONE.ordinal())
                         || registeredTypes.get(ProvenanceEventType.JOIN.ordinal())
                         || registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal())
                         || registeredTypes.get(ProvenanceEventType.FETCH.ordinal())) {
@@ -771,6 +781,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         provenanceRepo.registerEvents(iterable);
     }
 
+
     private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) {
         final ContentClaim originalClaim = repoRecord.getOriginalClaim();
         if (originalClaim == null) {
@@ -1678,6 +1689,97 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
     }
 
     @Override
+    public FlowFile create(FlowFile parent) {
+        verifyTaskActive();
+        parent = getMostRecent(parent);
+
+        final String uuid = UUID.randomUUID().toString();
+
+        final Map<String, String> newAttributes = new HashMap<>(3);
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+        final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
+
+        // copy all attributes from parent except for the "special" attributes. Copying the special attributes
+        // can cause problems -- especially the ALTERNATE_IDENTIFIER, because copying can cause Provenance Events
+        // to be incorrectly created.
+        for (final Map.Entry<String, String> entry : parent.getAttributes().entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+            if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
+                || CoreAttributes.DISCARD_REASON.key().equals(key)
+                || CoreAttributes.UUID.key().equals(key)) {
+                continue;
+            }
+            newAttributes.put(key, value);
+        }
+
+        fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
+        fFileBuilder.addAttributes(newAttributes);
+
+        final FlowFileRecord fFile = fFileBuilder.build();
+        final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+        record.setWorking(fFile, newAttributes);
+        records.put(fFile.getId(), record);
+        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+        registerForkEvent(parent, fFile);
+        return fFile;
+    }
+
+    @Override
+    public FlowFile create(Collection<FlowFile> parents) {
+        verifyTaskActive();
+
+        parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList());
+
+        final Map<String, String> newAttributes = intersectAttributes(parents);
+        newAttributes.remove(CoreAttributes.UUID.key());
+        newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
+        newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
+
+        // When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers
+        // and use the earliest lineage start date
+        long lineageStartDate = 0L;
+        for (final FlowFile parent : parents) {
+
+            final long parentLineageStartDate = parent.getLineageStartDate();
+            if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) {
+                lineageStartDate = parentLineageStartDate;
+            }
+        }
+
+        // find the smallest lineage start index that has the same lineage start date as the one we've chosen.
+        long lineageStartIndex = 0L;
+        for (final FlowFile parent : parents) {
+            if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
+                lineageStartIndex = parent.getLineageStartIndex();
+            }
+        }
+
+        final String uuid = UUID.randomUUID().toString();
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+        final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
+            .addAttributes(newAttributes)
+            .lineageStart(lineageStartDate, lineageStartIndex)
+            .build();
+
+        final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+        record.setWorking(fFile, newAttributes);
+        records.put(fFile.getId(), record);
+        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+        registerJoinEvent(fFile, parents);
+        return fFile;
+    }
+
+
+    @Override
     public FlowFile clone(FlowFile example) {
         verifyTaskActive();
         example = validateRecordState(example);
@@ -3171,95 +3273,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         return existingRecord == null ? flowFile : existingRecord.getCurrent();
     }
 
-    @Override
-    public FlowFile create(FlowFile parent) {
-        verifyTaskActive();
-        parent = getMostRecent(parent);
-
-        final String uuid = UUID.randomUUID().toString();
-
-        final Map<String, String> newAttributes = new HashMap<>(3);
-        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
-        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
-        newAttributes.put(CoreAttributes.UUID.key(), uuid);
-
-        final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
-
-        // copy all attributes from parent except for the "special" attributes. Copying the special attributes
-        // can cause problems -- especially the ALTERNATE_IDENTIFIER, because copying can cause Provenance Events
-        // to be incorrectly created.
-        for (final Map.Entry<String, String> entry : parent.getAttributes().entrySet()) {
-            final String key = entry.getKey();
-            final String value = entry.getValue();
-            if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
-                || CoreAttributes.DISCARD_REASON.key().equals(key)
-                || CoreAttributes.UUID.key().equals(key)) {
-                continue;
-            }
-            newAttributes.put(key, value);
-        }
-
-        fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
-        fFileBuilder.addAttributes(newAttributes);
-
-        final FlowFileRecord fFile = fFileBuilder.build();
-        final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
-        record.setWorking(fFile, newAttributes);
-        records.put(fFile.getId(), record);
-        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
-
-        registerForkEvent(parent, fFile);
-        return fFile;
-    }
-
-    @Override
-    public FlowFile create(Collection<FlowFile> parents) {
-        verifyTaskActive();
-
-        parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList());
-
-        final Map<String, String> newAttributes = intersectAttributes(parents);
-        newAttributes.remove(CoreAttributes.UUID.key());
-        newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
-        newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
-
-        // When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers
-        // and use the earliest lineage start date
-        long lineageStartDate = 0L;
-        for (final FlowFile parent : parents) {
-
-            final long parentLineageStartDate = parent.getLineageStartDate();
-            if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) {
-                lineageStartDate = parentLineageStartDate;
-            }
-        }
-
-        // find the smallest lineage start index that has the same lineage start date as the one we've chosen.
-        long lineageStartIndex = 0L;
-        for (final FlowFile parent : parents) {
-            if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
-                lineageStartIndex = parent.getLineageStartIndex();
-            }
-        }
-
-        final String uuid = UUID.randomUUID().toString();
-        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
-        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
-        newAttributes.put(CoreAttributes.UUID.key(), uuid);
-
-        final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
-            .addAttributes(newAttributes)
-            .lineageStart(lineageStartDate, lineageStartIndex)
-            .build();
-
-        final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
-        record.setWorking(fFile, newAttributes);
-        records.put(fFile.getId(), record);
-        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
-
-        registerJoinEvent(fFile, parents);
-        return fFile;
-    }
 
     /**
      * Returns the attributes that are common to every FlowFile given. The key
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java
index eb710fb..6ef7bbc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java
@@ -420,4 +420,62 @@ public class ProvenanceEventsIT extends FrameworkIntegrationTest {
         final ProvenanceEventRecord thirdEvent = provRepo.getEvent(2L);
         assertEquals(ProvenanceEventType.DROP, thirdEvent.getEventType());
     }
+
+    @Test
+    public void testCloneOnMultipleConnectionsForRelationship() throws ExecutionException, InterruptedException, IOException {
+        final ProcessorNode generateProcessor = createGenerateProcessor(0);
+        final ProcessorNode passThroughProcessor = createProcessorNode((context, session) -> {
+            FlowFile original = session.get();
+            session.transfer(original, REL_SUCCESS);
+        }, REL_SUCCESS);
+
+        connect(generateProcessor, passThroughProcessor, REL_SUCCESS);
+        connect(passThroughProcessor, getTerminateProcessor(), REL_SUCCESS);
+        connect(passThroughProcessor, getTerminateAllProcessor(), REL_SUCCESS);
+
+        triggerOnce(generateProcessor);
+        triggerOnce(passThroughProcessor);
+
+        final ProvenanceEventRepository provRepo = getProvenanceRepository();
+        assertEquals(1L, provRepo.getMaxEventId().longValue());
+
+        final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L);
+        assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType());
+
+        final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L);
+        assertEquals(ProvenanceEventType.CLONE, secondEvent.getEventType());
+        assertEquals(1, secondEvent.getParentUuids().size());
+        assertEquals(1, secondEvent.getChildUuids().size());
+    }
+
+    @Test
+    public void testCloneOnMultipleConnectionsForRelationshipIncludesUpdatedAttributes() throws ExecutionException, InterruptedException, IOException {
+        final ProcessorNode generateProcessor = createGenerateProcessor(0);
+        final ProcessorNode passThroughProcessor = createProcessorNode((context, session) -> {
+            FlowFile original = session.get();
+            original = session.putAttribute(original, "test", "integration");
+
+            session.transfer(original, REL_SUCCESS);
+        }, REL_SUCCESS);
+
+        connect(generateProcessor, passThroughProcessor, REL_SUCCESS);
+        connect(passThroughProcessor, getTerminateProcessor(), REL_SUCCESS);
+        connect(passThroughProcessor, getTerminateAllProcessor(), REL_SUCCESS);
+
+        triggerOnce(generateProcessor);
+        triggerOnce(passThroughProcessor);
+
+        final ProvenanceEventRepository provRepo = getProvenanceRepository();
+        assertEquals(1L, provRepo.getMaxEventId().longValue());
+
+        final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L);
+        assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType());
+
+        final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L);
+        assertEquals(ProvenanceEventType.CLONE, secondEvent.getEventType());
+        assertEquals(1, secondEvent.getParentUuids().size());
+        assertEquals(1, secondEvent.getChildUuids().size());
+        assertEquals("integration", secondEvent.getAttribute("test"));
+    }
+
 }