You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/01/02 19:49:32 UTC

[4/4] nifi git commit: NIFI-4707: Fixed ProcessGroup tree

NIFI-4707: Fixed ProcessGroup tree

- Removed duplicated creation of a ParentProcessGroupSearchNode for the
root ProcessGroup.
- Removed duplicated creation of a ParentProcessGroupSearchNode for each
component inside a ProcessGroup.
- Fixed ProcessGroup id hierarchy.
- Fixed filtering logic.
- Added unit tests for filtering by ProcessGroupId and Remote
Input/Output ports.

Signed-off-by: Matthew Burgess <ma...@apache.org>

This closes #2351


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84cecfbe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84cecfbe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84cecfbe

Branch: refs/heads/master
Commit: 84cecfbeea82e81faef77c5b9de76f54bd965316
Parents: 97dc20e
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Dec 25 16:17:27 2017 +0900
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jan 2 14:49:00 2018 -0500

----------------------------------------------------------------------
 .../util/provenance/ComponentMapHolder.java     |  28 ++--
 .../provenance/ProvenanceEventConsumer.java     |  19 ++-
 .../TestSiteToSiteProvenanceReportingTask.java  | 163 ++++++++++++++++++-
 3 files changed, 180 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/84cecfbe/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
index 43372e1..e85eca3 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
@@ -60,7 +60,11 @@ public class ComponentMapHolder {
         return componentToParentGroupMap.get(componentId);
     }
 
-    public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisNode) {
+    public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
+        return createComponentMap(status, new ParentProcessGroupSearchNode(status.getId(), null));
+    }
+
+    private static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisProcessGroupNode) {
         final ComponentMapHolder holder = new ComponentMapHolder();
         final Map<String,String> componentNameMap = holder.componentNameMap;
         final Map<String,ParentProcessGroupSearchNode> componentToParentGroupMap = holder.componentToParentGroupMap;
@@ -68,37 +72,31 @@ public class ComponentMapHolder {
         final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
 
         if (status != null) {
-            ParentProcessGroupSearchNode parentNode = thisNode;
             componentNameMap.put(status.getId(), status.getName());
-            // Put a root entry in if one does not yet exist
-            if (parentNode == null) {
-                parentNode = new ParentProcessGroupSearchNode(status.getId(), null);
-                componentToParentGroupMap.put(status.getId(), parentNode);
-            }
 
             for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
                 componentNameMap.put(procStatus.getId(), procStatus.getName());
-                componentToParentGroupMap.put(procStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+                componentToParentGroupMap.put(procStatus.getId(), thisProcessGroupNode);
             }
 
             for (final PortStatus portStatus : status.getInputPortStatus()) {
                 componentNameMap.put(portStatus.getId(), portStatus.getName());
-                componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+                componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode);
             }
 
             for (final PortStatus portStatus : status.getOutputPortStatus()) {
                 componentNameMap.put(portStatus.getId(), portStatus.getName());
-                componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+                componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode);
             }
 
             for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
                 componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
-                componentToParentGroupMap.put(rpgStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+                componentToParentGroupMap.put(rpgStatus.getId(), thisProcessGroupNode);
             }
 
             for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
                 componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
-                componentToParentGroupMap.put(connectionStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+                componentToParentGroupMap.put(connectionStatus.getId(), thisProcessGroupNode);
                 // Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
                 componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
                 componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
@@ -108,9 +106,9 @@ public class ComponentMapHolder {
 
             for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
                 componentNameMap.put(childGroup.getId(), childGroup.getName());
-                ParentProcessGroupSearchNode node = new ParentProcessGroupSearchNode(status.getId(), parentNode);
-                componentToParentGroupMap.put(childGroup.getId(), node);
-                holder.putAll(createComponentMap(childGroup, node));
+                ParentProcessGroupSearchNode childProcessGroupNode = new ParentProcessGroupSearchNode(childGroup.getId(), thisProcessGroupNode);
+                componentToParentGroupMap.put(childGroup.getId(), thisProcessGroupNode);
+                holder.putAll(createComponentMap(childGroup, childProcessGroupNode));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/84cecfbe/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index feb302a..18a8afe 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -122,8 +122,7 @@ public class ProvenanceEventConsumer {
         }
         final EventAccess eventAccess = context.getEventAccess();
         final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
-        final ParentProcessGroupSearchNode rootNode = new ParentProcessGroupSearchNode(procGroupStatus.getId(), null);
-        final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus, rootNode);
+        final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
         final StateManager stateManager = context.getStateManager();
 
         Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
@@ -246,13 +245,15 @@ public class ProvenanceEventConsumer {
                     if (StringUtils.isEmpty(processGroupId)) {
                         continue;
                     }
-                    // Check if any parent process group has the specified component ID
-                    ParentProcessGroupSearchNode matchedComponent = componentMapHolder.getProcessGroupParent(componentId);
-                    while (matchedComponent != null && !matchedComponent.getId().equals(processGroupId) && !componentIds.contains(matchedComponent.getId())) {
-                        matchedComponent = matchedComponent.getParent();
-                    }
-                    if (matchedComponent == null) {
-                        continue;
+                    // Check if the process group or any parent process group is specified as a target component ID.
+                    if (!componentIds.contains(processGroupId)) {
+                        ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
+                        while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) {
+                            parentProcessGroup = parentProcessGroup.getParent();
+                        }
+                        if (parentProcessGroup == null) {
+                            continue;
+                        }
                     }
                 }
                 if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/84cecfbe/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index 201361f..26d5877 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -18,11 +18,15 @@
 package org.apache.nifi.reporting;
 
 
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
@@ -49,6 +53,7 @@ import java.io.ByteArrayInputStream;
 import java.io.IOException;
 import java.nio.charset.StandardCharsets;
 import java.util.ArrayList;
+import java.util.Collection;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -65,6 +70,10 @@ public class TestSiteToSiteProvenanceReportingTask {
     private final ConfigurationContext confContext = Mockito.mock(ConfigurationContext.class);
 
     private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
+        return setup(event, properties, 2500);
+    }
+
+    private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties, long maxEventId) throws IOException {
         final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
 
         when(context.getStateManager())
@@ -85,7 +94,6 @@ public class TestSiteToSiteProvenanceReportingTask {
             }
         }).when(confContext).getProperty(Mockito.any(PropertyDescriptor.class));
 
-        final long maxEventId = 2500;
         final AtomicInteger totalEvents = new AtomicInteger(0);
 
         final EventAccess eventAccess = Mockito.mock(EventAccess.class);
@@ -106,9 +114,65 @@ public class TestSiteToSiteProvenanceReportingTask {
                 return eventsToReturn;
             }
         }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
-        ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
-        processGroupStatus.setId("root");
-        when(eventAccess.getControllerStatus()).thenReturn(processGroupStatus);
+        ProcessGroupStatus pgRoot = new ProcessGroupStatus();
+        pgRoot.setId("root");
+        when(eventAccess.getControllerStatus()).thenReturn(pgRoot);
+
+        // Add child Process Groups.
+        // Root -> (A, B -> (B2 -> (B3)))
+        final ProcessGroupStatus pgA = new ProcessGroupStatus();
+        pgA.setId("pgA");
+        final ProcessGroupStatus pgB = new ProcessGroupStatus();
+        pgB.setId("pgB");
+        final ProcessGroupStatus pgB2 = new ProcessGroupStatus();
+        pgB2.setId("pgB2");
+        final ProcessGroupStatus pgB3 = new ProcessGroupStatus();
+        pgB3.setId("pgB3");
+        final Collection<ProcessGroupStatus> childPGs = pgRoot.getProcessGroupStatus();
+        childPGs.add(pgA);
+        childPGs.add(pgB);
+        pgB.getProcessGroupStatus().add(pgB2);
+        pgB2.getProcessGroupStatus().add(pgB3);
+
+        // Add Processors.
+        final ProcessorStatus prcRoot = new ProcessorStatus();
+        prcRoot.setId("1234");
+        pgRoot.getProcessorStatus().add(prcRoot);
+
+        final ProcessorStatus prcA = new ProcessorStatus();
+        prcA.setId("A001");
+        prcA.setName("Processor in PGA");
+        pgA.getProcessorStatus().add(prcA);
+
+        final ProcessorStatus prcB = new ProcessorStatus();
+        prcB.setId("B001");
+        prcB.setName("Processor in PGB");
+        pgB.getProcessorStatus().add(prcB);
+
+        final ProcessorStatus prcB2 = new ProcessorStatus();
+        prcB2.setId("B201");
+        prcB2.setName("Processor in PGB2");
+        pgB2.getProcessorStatus().add(prcB2);
+
+        final ProcessorStatus prcB3 = new ProcessorStatus();
+        prcB3.setId("B301");
+        prcB3.setName("Processor in PGB3");
+        pgB3.getProcessorStatus().add(prcB3);
+
+        // Add connection status to test Remote Input/Output Ports
+        final ConnectionStatus b2RemoteInputPort = new ConnectionStatus();
+        b2RemoteInputPort.setGroupId("pgB2");
+        b2RemoteInputPort.setSourceId("B201");
+        b2RemoteInputPort.setDestinationId("riB2");
+        b2RemoteInputPort.setDestinationName("Remote Input Port name");
+        pgB2.getConnectionStatus().add(b2RemoteInputPort);
+
+        final ConnectionStatus b3RemoteOutputPort = new ConnectionStatus();
+        b3RemoteOutputPort.setGroupId("pgB3");
+        b3RemoteOutputPort.setSourceId("roB3");
+        b3RemoteOutputPort.setSourceName("Remote Output Port name");
+        b3RemoteOutputPort.setDestinationId("B301");
+        pgB3.getConnectionStatus().add(b3RemoteOutputPort);
 
         final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
         Mockito.doAnswer(new Answer<Long>() {
@@ -308,6 +372,90 @@ public class TestSiteToSiteProvenanceReportingTask {
     }
 
     @Test
+    public void testFilterProcessGroupId() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "pgB2");
+
+
+        // B201 belongs to ProcessGroup B2, so it should be picked.
+        ProvenanceEventRecord event = createProvenanceEventRecord("B201", "dummy");
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+        assertEquals("B201", reportedEvent.get("componentId").asText());
+        assertEquals("Processor in PGB2", reportedEvent.get("componentName").asText());
+
+
+        // B301 belongs to PG B3, whose parent is PGB2, so it should be picked, too.
+        event = createProvenanceEventRecord("B301", "dummy");
+        task = setup(event, properties, 1);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+        assertEquals("B301", reportedEvent.get("componentId").asText());
+        assertEquals("Processor in PGB3", reportedEvent.get("componentName").asText());
+
+        // A001 belongs to PG A, whose parent is the root PG, so it should be filtered out.
+        event = createProvenanceEventRecord("A001", "dummy");
+        task = setup(event, properties, 1);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
+    public void testRemotePorts() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "riB2,roB3");
+
+
+        // riB2 is a Remote Input Port in Process Group B2.
+        ProvenanceEventRecord event = createProvenanceEventRecord("riB2", "Remote Input Port");
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+        assertEquals("riB2", reportedEvent.get("componentId").asText());
+        assertEquals("Remote Input Port name", reportedEvent.get("componentName").asText());
+        assertEquals("pgB2", reportedEvent.get("processGroupId").asText());
+
+
+        // roB3 is a Remote Output Port in Process Group B3.
+        event = createProvenanceEventRecord("roB3", "Remote Output Port");
+        task = setup(event, properties, 1);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(1, task.dataSent.size());
+        reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+        assertEquals("roB3", reportedEvent.get("componentId").asText());
+        assertEquals("Remote Output Port name", reportedEvent.get("componentName").asText());
+        assertEquals("pgB3", reportedEvent.get("processGroupId").asText());
+
+    }
+
+    @Test
     public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
         final long maxEventId = 2500;
 
@@ -353,6 +501,9 @@ public class TestSiteToSiteProvenanceReportingTask {
     }
 
     private ProvenanceEventRecord createProvenanceEventRecord() {
+        return createProvenanceEventRecord("1234", "dummy processor");
+    }
+    private ProvenanceEventRecord createProvenanceEventRecord(final String componentId, final String componentType) {
         final String uuid = "10000000-0000-0000-0000-000000000000";
         final Map<String, String> attributes = new HashMap<>();
         attributes.put("abc", "xyz");
@@ -369,8 +520,8 @@ public class TestSiteToSiteProvenanceReportingTask {
         attributes.put("uuid", uuid);
         builder.fromFlowFile(createFlowFile(3L, attributes));
         builder.setAttributes(prevAttrs, attributes);
-        builder.setComponentId("1234");
-        builder.setComponentType("dummy processor");
+        builder.setComponentId(componentId);
+        builder.setComponentType(componentType);
         return builder.build();
     }