You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2019/04/23 08:08:46 UTC
[nifi] branch master updated: NIFI-6220: If FlowFile is created by
cloning a relationship,
do not create an ATTRIBUTES_MODIFIED provenance event for it.
This is an automated email from the ASF dual-hosted git repository.
ijokarumawak pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new 055b3ca NIFI-6220: If FlowFile is created by cloning a relationship, do not create an ATTRIBUTES_MODIFIED provenance event for it.
055b3ca is described below
commit 055b3cac54e3e234c0ee07189b7b7b85e0582d58
Author: Mark Payne <ma...@hotmail.com>
AuthorDate: Tue Apr 16 14:52:34 2019 -0400
NIFI-6220: If FlowFile is created by cloning a relationship, do not create an ATTRIBUTES_MODIFIED provenance event for it.
NIFI-6220: Updated test name and fixed typo
This closes #3438.
Signed-off-by: Koji Kawamura <ij...@apache.org>
---
.../repository/StandardProcessSession.java | 191 +++++++++++----------
.../integration/provenance/ProvenanceEventsIT.java | 58 +++++++
2 files changed, 160 insertions(+), 89 deletions(-)
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index 14cb70a..c2502d3 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -325,6 +325,8 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
newRecord.setTransferRelationship(record.getTransferRelationship());
// put the mapping into toAdd because adding to records now will cause a ConcurrentModificationException
toAdd.put(clone.getId(), newRecord);
+
+ createdFlowFiles.add(newUuid);
}
}
}
@@ -639,6 +641,13 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
+
+ final List<String> childUuids = event.getChildUuids();
+ if (childUuids != null) {
+ for (final String childUuid : childUuids) {
+ addEventType(eventTypesPerFlowFileId, childUuid, event.getEventType());
+ }
+ }
}
// Finally, add any other events that we may have generated.
@@ -684,6 +693,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (registeredTypes != null) {
if (registeredTypes.get(ProvenanceEventType.CREATE.ordinal())
|| registeredTypes.get(ProvenanceEventType.FORK.ordinal())
+ || registeredTypes.get(ProvenanceEventType.CLONE.ordinal())
|| registeredTypes.get(ProvenanceEventType.JOIN.ordinal())
|| registeredTypes.get(ProvenanceEventType.RECEIVE.ordinal())
|| registeredTypes.get(ProvenanceEventType.FETCH.ordinal())) {
@@ -771,6 +781,7 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
provenanceRepo.registerEvents(iterable);
}
+
private void updateEventContentClaims(final ProvenanceEventBuilder builder, final FlowFile flowFile, final StandardRepositoryRecord repoRecord) {
final ContentClaim originalClaim = repoRecord.getOriginalClaim();
if (originalClaim == null) {
@@ -1678,6 +1689,97 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
}
@Override
+ public FlowFile create(FlowFile parent) {
+ verifyTaskActive();
+ parent = getMostRecent(parent);
+
+ final String uuid = UUID.randomUUID().toString();
+
+ final Map<String, String> newAttributes = new HashMap<>(3);
+ newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+ newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+ newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+ final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
+
+ // copy all attributes from parent except for the "special" attributes. Copying the special attributes
+ // can cause problems -- especially the ALTERNATE_IDENTIFIER, because copying can cause Provenance Events
+ // to be incorrectly created.
+ for (final Map.Entry<String, String> entry : parent.getAttributes().entrySet()) {
+ final String key = entry.getKey();
+ final String value = entry.getValue();
+ if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
+ || CoreAttributes.DISCARD_REASON.key().equals(key)
+ || CoreAttributes.UUID.key().equals(key)) {
+ continue;
+ }
+ newAttributes.put(key, value);
+ }
+
+ fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
+ fFileBuilder.addAttributes(newAttributes);
+
+ final FlowFileRecord fFile = fFileBuilder.build();
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+ record.setWorking(fFile, newAttributes);
+ records.put(fFile.getId(), record);
+ createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+ registerForkEvent(parent, fFile);
+ return fFile;
+ }
+
+ @Override
+ public FlowFile create(Collection<FlowFile> parents) {
+ verifyTaskActive();
+
+ parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList());
+
+ final Map<String, String> newAttributes = intersectAttributes(parents);
+ newAttributes.remove(CoreAttributes.UUID.key());
+ newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
+ newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
+
+ // When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers
+ // and use the earliest lineage start date
+ long lineageStartDate = 0L;
+ for (final FlowFile parent : parents) {
+
+ final long parentLineageStartDate = parent.getLineageStartDate();
+ if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) {
+ lineageStartDate = parentLineageStartDate;
+ }
+ }
+
+ // find the smallest lineage start index that has the same lineage start date as the one we've chosen.
+ long lineageStartIndex = 0L;
+ for (final FlowFile parent : parents) {
+ if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
+ lineageStartIndex = parent.getLineageStartIndex();
+ }
+ }
+
+ final String uuid = UUID.randomUUID().toString();
+ newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+ newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+ newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+ final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
+ .addAttributes(newAttributes)
+ .lineageStart(lineageStartDate, lineageStartIndex)
+ .build();
+
+ final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+ record.setWorking(fFile, newAttributes);
+ records.put(fFile.getId(), record);
+ createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+ registerJoinEvent(fFile, parents);
+ return fFile;
+ }
+
+
+ @Override
public FlowFile clone(FlowFile example) {
verifyTaskActive();
example = validateRecordState(example);
@@ -3171,95 +3273,6 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return existingRecord == null ? flowFile : existingRecord.getCurrent();
}
- @Override
- public FlowFile create(FlowFile parent) {
- verifyTaskActive();
- parent = getMostRecent(parent);
-
- final String uuid = UUID.randomUUID().toString();
-
- final Map<String, String> newAttributes = new HashMap<>(3);
- newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
- newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
- newAttributes.put(CoreAttributes.UUID.key(), uuid);
-
- final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
-
- // copy all attributes from parent except for the "special" attributes. Copying the special attributes
- // can cause problems -- especially the ALTERNATE_IDENTIFIER, because copying can cause Provenance Events
- // to be incorrectly created.
- for (final Map.Entry<String, String> entry : parent.getAttributes().entrySet()) {
- final String key = entry.getKey();
- final String value = entry.getValue();
- if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
- || CoreAttributes.DISCARD_REASON.key().equals(key)
- || CoreAttributes.UUID.key().equals(key)) {
- continue;
- }
- newAttributes.put(key, value);
- }
-
- fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
- fFileBuilder.addAttributes(newAttributes);
-
- final FlowFileRecord fFile = fFileBuilder.build();
- final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
- record.setWorking(fFile, newAttributes);
- records.put(fFile.getId(), record);
- createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
-
- registerForkEvent(parent, fFile);
- return fFile;
- }
-
- @Override
- public FlowFile create(Collection<FlowFile> parents) {
- verifyTaskActive();
-
- parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList());
-
- final Map<String, String> newAttributes = intersectAttributes(parents);
- newAttributes.remove(CoreAttributes.UUID.key());
- newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
- newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
-
- // When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers
- // and use the earliest lineage start date
- long lineageStartDate = 0L;
- for (final FlowFile parent : parents) {
-
- final long parentLineageStartDate = parent.getLineageStartDate();
- if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) {
- lineageStartDate = parentLineageStartDate;
- }
- }
-
- // find the smallest lineage start index that has the same lineage start date as the one we've chosen.
- long lineageStartIndex = 0L;
- for (final FlowFile parent : parents) {
- if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
- lineageStartIndex = parent.getLineageStartIndex();
- }
- }
-
- final String uuid = UUID.randomUUID().toString();
- newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
- newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
- newAttributes.put(CoreAttributes.UUID.key(), uuid);
-
- final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
- .addAttributes(newAttributes)
- .lineageStart(lineageStartDate, lineageStartIndex)
- .build();
-
- final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
- record.setWorking(fFile, newAttributes);
- records.put(fFile.getId(), record);
- createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
-
- registerJoinEvent(fFile, parents);
- return fFile;
- }
/**
* Returns the attributes that are common to every FlowFile given. The key
diff --git a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java
index eb710fb..6ef7bbc 100644
--- a/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java
+++ b/nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/test/java/org/apache/nifi/integration/provenance/ProvenanceEventsIT.java
@@ -420,4 +420,62 @@ public class ProvenanceEventsIT extends FrameworkIntegrationTest {
final ProvenanceEventRecord thirdEvent = provRepo.getEvent(2L);
assertEquals(ProvenanceEventType.DROP, thirdEvent.getEventType());
}
+
+ @Test
+ public void testCloneOnMultipleConnectionsForRelationship() throws ExecutionException, InterruptedException, IOException {
+ final ProcessorNode generateProcessor = createGenerateProcessor(0);
+ final ProcessorNode passThroughProcessor = createProcessorNode((context, session) -> {
+ FlowFile original = session.get();
+ session.transfer(original, REL_SUCCESS);
+ }, REL_SUCCESS);
+
+ connect(generateProcessor, passThroughProcessor, REL_SUCCESS);
+ connect(passThroughProcessor, getTerminateProcessor(), REL_SUCCESS);
+ connect(passThroughProcessor, getTerminateAllProcessor(), REL_SUCCESS);
+
+ triggerOnce(generateProcessor);
+ triggerOnce(passThroughProcessor);
+
+ final ProvenanceEventRepository provRepo = getProvenanceRepository();
+ assertEquals(1L, provRepo.getMaxEventId().longValue());
+
+ final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L);
+ assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType());
+
+ final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L);
+ assertEquals(ProvenanceEventType.CLONE, secondEvent.getEventType());
+ assertEquals(1, secondEvent.getParentUuids().size());
+ assertEquals(1, secondEvent.getChildUuids().size());
+ }
+
+ @Test
+ public void testCloneOnMultipleConnectionsForRelationshipIncludesUpdatedAttributes() throws ExecutionException, InterruptedException, IOException {
+ final ProcessorNode generateProcessor = createGenerateProcessor(0);
+ final ProcessorNode passThroughProcessor = createProcessorNode((context, session) -> {
+ FlowFile original = session.get();
+ original = session.putAttribute(original, "test", "integration");
+
+ session.transfer(original, REL_SUCCESS);
+ }, REL_SUCCESS);
+
+ connect(generateProcessor, passThroughProcessor, REL_SUCCESS);
+ connect(passThroughProcessor, getTerminateProcessor(), REL_SUCCESS);
+ connect(passThroughProcessor, getTerminateAllProcessor(), REL_SUCCESS);
+
+ triggerOnce(generateProcessor);
+ triggerOnce(passThroughProcessor);
+
+ final ProvenanceEventRepository provRepo = getProvenanceRepository();
+ assertEquals(1L, provRepo.getMaxEventId().longValue());
+
+ final ProvenanceEventRecord firstEvent = provRepo.getEvent(0L);
+ assertEquals(ProvenanceEventType.CREATE, firstEvent.getEventType());
+
+ final ProvenanceEventRecord secondEvent = provRepo.getEvent(1L);
+ assertEquals(ProvenanceEventType.CLONE, secondEvent.getEventType());
+ assertEquals(1, secondEvent.getParentUuids().size());
+ assertEquals(1, secondEvent.getChildUuids().size());
+ assertEquals("integration", secondEvent.getAttribute("test"));
+ }
+
}