You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2014/12/12 18:02:55 UTC

incubator-nifi git commit: NIFI-70: If ROUTE indicates FlowFile routed to same Connection it came from (and only that connection!) then drop the redundant ROUTE event

Repository: incubator-nifi
Updated Branches:
  refs/heads/develop a8722317a -> bc9462514


NIFI-70: If ROUTE indicates FlowFile routed to same Connection it came from (and only that connection!) then drop the redundant ROUTE event


Project: http://git-wip-us.apache.org/repos/asf/incubator-nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-nifi/commit/bc946251
Tree: http://git-wip-us.apache.org/repos/asf/incubator-nifi/tree/bc946251
Diff: http://git-wip-us.apache.org/repos/asf/incubator-nifi/diff/bc946251

Branch: refs/heads/develop
Commit: bc94625142163d60af7aad27ec9de11b79c2ba21
Parents: a872231
Author: Mark Payne <ma...@hotmail.com>
Authored: Fri Dec 12 12:02:49 2014 -0500
Committer: Mark Payne <ma...@hotmail.com>
Committed: Fri Dec 12 12:02:49 2014 -0500

----------------------------------------------------------------------
 .../repository/StandardProcessSession.java      | 48 +++++++++++++++++++-
 1 file changed, 47 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-nifi/blob/bc946251/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
----------------------------------------------------------------------
diff --git a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
index fbbb29b..60dcdb3 100644
--- a/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
+++ b/nar-bundles/framework-bundle/framework/core/src/main/java/org/apache/nifi/controller/repository/StandardProcessSession.java
@@ -538,7 +538,14 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
             if (isSpuriousForkEvent(event, checkpoint.removedFlowFiles)) {
                 continue;
             }
-
+            if ( isSpuriousRouteEvent(event, checkpoint.records) ) {
+                continue;
+            }
+            
+            // Check if the event indicates that the FlowFile was routed to the same 
+            // connection from which it was pulled (and only this connection). If so, discard the event.
+            isSpuriousRouteEvent(event, checkpoint.records);
+            
             recordsToSubmit.add(event);
             addEventType(eventTypesPerFlowFileId, event.getFlowFileUuid(), event.getEventType());
         }
@@ -776,6 +783,45 @@ public final class StandardProcessSession implements ProcessSession, ProvenanceE
         return false;
     }
 
+    
+    /**
+     * Checks if the given event is a spurious ROUTE, meaning that the ROUTE indicates that a FlowFile
+     * was routed to a relationship with only 1 connection and that Connection is the Connection from which
+     * the FlowFile was pulled. I.e., the FlowFile was really routed nowhere.
+     * 
+     * @param event
+     * @param records
+     * @return
+     */
+    private boolean isSpuriousRouteEvent(final ProvenanceEventRecord event, final Map<FlowFileRecord, StandardRepositoryRecord> records) {
+        if ( event.getEventType() == ProvenanceEventType.ROUTE ) {
+            final String relationshipName = event.getRelationship();
+            final Relationship relationship = new Relationship.Builder().name(relationshipName).build();
+            final Collection<Connection> connectionsForRelationship = this.context.getConnections(relationship);
+            
+            // If the number of connections for this relationship is not 1, then we can't ignore this ROUTE event,
+            // as it may be cloning the FlowFile and adding to multiple connections.
+            if ( connectionsForRelationship.size() == 1 ) {
+                for ( final Map.Entry<FlowFileRecord, StandardRepositoryRecord> entry : records.entrySet() ) {
+                    final FlowFileRecord flowFileRecord = entry.getKey();
+                    if ( event.getFlowFileUuid().equals(flowFileRecord.getAttribute(CoreAttributes.UUID.key())) ) {
+                        final StandardRepositoryRecord repoRecord = entry.getValue();
+                        if ( repoRecord.getOriginalQueue() == null ) {
+                            return false;
+                        }
+                        
+                        final String originalQueueId = repoRecord.getOriginalQueue().getIdentifier();
+                        final Connection destinationConnection = connectionsForRelationship.iterator().next();
+                        final String destinationQueueId = destinationConnection.getFlowFileQueue().getIdentifier();
+                        return originalQueueId.equals(destinationQueueId);
+                    }
+                }
+            }
+        }
+        
+        return false;
+    }
+    
     @Override
     public void rollback() {
         rollback(false);