You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by mc...@apache.org on 2014/12/16 17:22:09 UTC
[14/27] incubator-nifi git commit: NIFI-70: If ROUTE indicates
FlowFile routed to same Connection it came from (and only that connection!)
then drop the redundant ROUTE event
NIFI-70: If ROUTE indicates FlowFile routed to same Connection it came from (and only that connection!) then drop the redundant ROUTE event
Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bc946251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bc946251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bc946251
Branch: refs/heads/nifi-27
Commit: bc94625142163d60af7aad27ec9de11b79c2ba21
Parents: a872231
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Dec 12 12:02:49 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Dec 12 12:02:49 2014 -0500
----------------------------------------------------------------------
.../repository/StandardProcessSession.java | 48 +++++++++++++++++++-
1 file changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bc946251/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index fbbb29b..60dcdb3 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -538,7 +538,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
continue;
}
-
+ if ( isSpuriousRouteEvent(event, checkpoint.records) ) {
+ continue;
+ }
+
+ // Check if the event indicates that the FlowFile was routed to the same
+ // connection from which it was pulled (and only this connection). If so, discard the event.
+ isSpuriousRouteEvent(event, checkpoint.records);
+
recordsToSubmit.add(event);
addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
}
@@ -776,6 +783,45 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
return false;
}
+
+ /**
+ * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile
+ * was routed to a relationship with only 1 connection and that Connection is the Connection from which
+ * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere.
+ *
+ * @param event
+ * @param records
+ * @return
+ */
+ private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) {
+ if ( event.getEventType() == ProvenanceEventType.ROUTE ) {
+ final String relationshipName = event.getRelationship();
+ final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
+ final Collection<Connection> connectionsForRelationship = this.context.getConnections(relationship);
+
+ // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event,
+ // as it may be cloning the FlowFile and adding to multiple connections.
+ if ( connectionsForRelationship.size() == 1 ) {
+ for ( final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet() ) {
+ final FlowFileRecord flowFileRecord = entry.getKey();
+ if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) {
+ final StandardRepositoryRecord repoRecord = entry.getValue();
+ if ( repoRecord.getOriginalQueue() == null ) {
+ return false;
+ }
+
+ final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier();
+ final Connection destinationConnection = connectionsForRelationship.iterator().next();
+ final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier();
+ return originalQueueId.equals(destinationQueueId);
+ }
+ }
+ }
+ }
+
+ return false;
+ }
+
@Override
public void rollback() {
rollback(false);