You are viewing a plain text version of this content. The canonical link for it is here.
Posted to issues@nifi.apache.org by GitBox <gi...@apache.org> on 2019/04/22 14:20:40 UTC

[GitHub] [nifi] markap14 commented on a change in pull request #3438: NIFI-6220: If FlowFile is created by cloning a relationship, do not c…

markap14 commented on a change in pull request #3438: NIFI-6220: If FlowFile is created by cloning a relationship, do not c…
URL: https://github.com/apache/nifi/pull/3438#discussion_r277304397
 
 

 ##########
 File path: nifi-nar-bundles/nifi-framework-bundle/nifi-framework/nifi-framework-core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
 ##########
 @@ -1677,6 +1688,97 @@ public FlowFile create() {
         return fFile;
     }
 
+    @Override
+    public FlowFile create(FlowFile parent) {
+        verifyTaskActive();
+        parent = getMostRecent(parent);
+
+        final String uuid = UUID.randomUUID().toString();
+
+        final Map<String, String> newAttributes = new HashMap<>(3);
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+        final StandardFlowFileRecord.Builder fFileBuilder = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence());
+
+        // copy all attributes from parent except for the "special" attributes. Copying the special attributes
+        // can cause problems -- especially the ALTERNATE_IDENTIFIER, because copying can cause Provenance Events
+        // to be incorrectly created.
+        for (final Map.Entry<String, String> entry : parent.getAttributes().entrySet()) {
+            final String key = entry.getKey();
+            final String value = entry.getValue();
+            if (CoreAttributes.ALTERNATE_IDENTIFIER.key().equals(key)
+                || CoreAttributes.DISCARD_REASON.key().equals(key)
+                || CoreAttributes.UUID.key().equals(key)) {
+                continue;
+            }
+            newAttributes.put(key, value);
+        }
+
+        fFileBuilder.lineageStart(parent.getLineageStartDate(), parent.getLineageStartIndex());
+        fFileBuilder.addAttributes(newAttributes);
+
+        final FlowFileRecord fFile = fFileBuilder.build();
+        final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+        record.setWorking(fFile, newAttributes);
+        records.put(fFile.getId(), record);
+        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+        registerForkEvent(parent, fFile);
+        return fFile;
+    }
+
+    @Override
+    public FlowFile create(Collection<FlowFile> parents) {
+        verifyTaskActive();
+
+        parents = parents.stream().map(this::getMostRecent).collect(Collectors.toList());
+
+        final Map<String, String> newAttributes = intersectAttributes(parents);
+        newAttributes.remove(CoreAttributes.UUID.key());
+        newAttributes.remove(CoreAttributes.ALTERNATE_IDENTIFIER.key());
+        newAttributes.remove(CoreAttributes.DISCARD_REASON.key());
+
+        // When creating a new FlowFile from multiple parents, we need to add all of the Lineage Identifiers
+        // and use the earliest lineage start date
+        long lineageStartDate = 0L;
+        for (final FlowFile parent : parents) {
+
+            final long parentLineageStartDate = parent.getLineageStartDate();
+            if (lineageStartDate == 0L || parentLineageStartDate < lineageStartDate) {
+                lineageStartDate = parentLineageStartDate;
+            }
+        }
+
+        // find the smallest lineage start index that has the same lineage start date as the one we've chosen.
+        long lineageStartIndex = 0L;
+        for (final FlowFile parent : parents) {
+            if (parent.getLineageStartDate() == lineageStartDate && parent.getLineageStartIndex() < lineageStartIndex) {
+                lineageStartIndex = parent.getLineageStartIndex();
+            }
+        }
+
+        final String uuid = UUID.randomUUID().toString();
+        newAttributes.put(CoreAttributes.FILENAME.key(), uuid);
+        newAttributes.put(CoreAttributes.PATH.key(), DEFAULT_FLOWFILE_PATH);
+        newAttributes.put(CoreAttributes.UUID.key(), uuid);
+
+        final FlowFileRecord fFile = new StandardFlowFileRecord.Builder().id(context.getNextFlowFileSequence())
+            .addAttributes(newAttributes)
+            .lineageStart(lineageStartDate, lineageStartIndex)
+            .build();
+
+        final StandardRepositoryRecord record = new StandardRepositoryRecord(null);
+        record.setWorking(fFile, newAttributes);
+        records.put(fFile.getId(), record);
+        createdFlowFiles.add(fFile.getAttribute(CoreAttributes.UUID.key()));
+
+        registerJoinEvent(fFile, parents);
+        return fFile;
+    }
+
+
 
 Review comment:
   No, they are not changed. I just moved them to help clean up the code. We had several overrides of the `create` method, all spread throughout the class, so I colocated them.

----------------------------------------------------------------
This is an automated message from the Apache Git Service.
To respond to the message, please log on to GitHub and use the
URL above to go to the specific comment.
 
For queries about this service, please contact Infrastructure at:
users@infra.apache.org


With regards,
Apache Git Services