You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/01/02 19:49:31 UTC

[3/4] nifi git commit: NIFI-4707: Changed process group parent stack to tree

NIFI-4707: Changed process group parent stack to tree


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/97dc20e2
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/97dc20e2
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/97dc20e2

Branch: refs/heads/master
Commit: 97dc20e2d95057b890b3fecbe6f6e8877923e6b3
Parents: d65e6b2
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Dec 21 15:06:51 2017 -0500
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jan 2 14:46:48 2018 -0500

----------------------------------------------------------------------
 .../util/provenance/ComponentMapHolder.java     | 48 +++++++++-----------
 .../ParentProcessGroupSearchNode.java           | 37 +++++++++++++++
 .../provenance/ProvenanceEventConsumer.java     | 33 ++++++++------
 .../TestSiteToSiteProvenanceReportingTask.java  | 19 +++++---
 4 files changed, 91 insertions(+), 46 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/97dc20e2/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
index 342b5a2..43372e1 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
@@ -24,13 +24,12 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
 
 import java.util.HashMap;
 import java.util.Map;
-import java.util.Stack;
 
 public class ComponentMapHolder {
     private static final String REMOTE_INPUT_PORT = "Remote Input Port";
     private static final String REMOTE_OUTPUT_PORT = "Remote Output Port";
     private final Map<String,String> componentNameMap = new HashMap<>();
-    private final Map<String,String> componentToParentGroupMap = new HashMap<>();
+    private final Map<String,ParentProcessGroupSearchNode> componentToParentGroupMap = new HashMap<>();
     private final Map<String,String> sourceToConnectionParentGroupMap = new HashMap<>();
     private final Map<String,String> destinationToConnectionParentGroupMap = new HashMap<>();
 
@@ -46,21 +45,6 @@ public class ComponentMapHolder {
         return componentNameMap.get(componentId);
     }
 
-    public Stack<String> getProcessGroupIdStack(final String startingProcessGroupId) {
-        final Stack<String> stack = new Stack<>();
-        String processGroupId = startingProcessGroupId;
-        stack.push(startingProcessGroupId);
-        while (componentToParentGroupMap.containsKey(processGroupId)) {
-            final String parentGroupId = componentToParentGroupMap.get(processGroupId);
-            if (parentGroupId == null || parentGroupId.isEmpty()) {
-                break;
-            }
-            stack.push(parentGroupId);
-            processGroupId = parentGroupId;
-        }
-        return stack;
-    }
-
     public String getProcessGroupId(final String componentId, final String componentType) {
         // Where a Remote Input/Output Port resides is only available at ConnectionStatus.
         if (REMOTE_INPUT_PORT.equals(componentType)) {
@@ -68,42 +52,53 @@ public class ComponentMapHolder {
         } else if (REMOTE_OUTPUT_PORT.equals(componentType)) {
             return sourceToConnectionParentGroupMap.get(componentId);
         }
+        ParentProcessGroupSearchNode parentNode = componentToParentGroupMap.get(componentId);
+        return parentNode == null ? null : parentNode.getId();
+    }
+
+    public ParentProcessGroupSearchNode getProcessGroupParent(final String componentId) {
         return componentToParentGroupMap.get(componentId);
     }
 
-    public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
+    public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisNode) {
         final ComponentMapHolder holder = new ComponentMapHolder();
         final Map<String,String> componentNameMap = holder.componentNameMap;
-        final Map<String,String> componentToParentGroupMap = holder.componentToParentGroupMap;
+        final Map<String,ParentProcessGroupSearchNode> componentToParentGroupMap = holder.componentToParentGroupMap;
         final Map<String,String> sourceToConnectionParentGroupMap = holder.sourceToConnectionParentGroupMap;
         final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
 
         if (status != null) {
+            ParentProcessGroupSearchNode parentNode = thisNode;
             componentNameMap.put(status.getId(), status.getName());
+            // Put a root entry in if one does not yet exist
+            if (parentNode == null) {
+                parentNode = new ParentProcessGroupSearchNode(status.getId(), null);
+                componentToParentGroupMap.put(status.getId(), parentNode);
+            }
 
             for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
                 componentNameMap.put(procStatus.getId(), procStatus.getName());
-                componentToParentGroupMap.put(procStatus.getId(), status.getId());
+                componentToParentGroupMap.put(procStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
             }
 
             for (final PortStatus portStatus : status.getInputPortStatus()) {
                 componentNameMap.put(portStatus.getId(), portStatus.getName());
-                componentToParentGroupMap.put(portStatus.getId(), status.getId());
+                componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
             }
 
             for (final PortStatus portStatus : status.getOutputPortStatus()) {
                 componentNameMap.put(portStatus.getId(), portStatus.getName());
-                componentToParentGroupMap.put(portStatus.getId(), status.getId());
+                componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
             }
 
             for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
                 componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
-                componentToParentGroupMap.put(rpgStatus.getId(), status.getId());
+                componentToParentGroupMap.put(rpgStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
             }
 
             for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
                 componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
-                componentToParentGroupMap.put(connectionStatus.getId(), status.getId());
+                componentToParentGroupMap.put(connectionStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
                 // Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
                 componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
                 componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
@@ -113,8 +108,9 @@ public class ComponentMapHolder {
 
             for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
                 componentNameMap.put(childGroup.getId(), childGroup.getName());
-                componentToParentGroupMap.put(childGroup.getId(), status.getId());
-                holder.putAll(createComponentMap(childGroup));
+                ParentProcessGroupSearchNode node = new ParentProcessGroupSearchNode(status.getId(), parentNode);
+                componentToParentGroupMap.put(childGroup.getId(), node);
+                holder.putAll(createComponentMap(childGroup, node));
             }
         }
 

http://git-wip-us.apache.org/repos/asf/nifi/blob/97dc20e2/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ParentProcessGroupSearchNode.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ParentProcessGroupSearchNode.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ParentProcessGroupSearchNode.java
new file mode 100644
index 0000000..2cf1f48
--- /dev/null
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ParentProcessGroupSearchNode.java
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.util.provenance;
+
+
+public class ParentProcessGroupSearchNode {
+
+    private final String id;
+    private final ParentProcessGroupSearchNode parent;
+
+    public ParentProcessGroupSearchNode(String id, ParentProcessGroupSearchNode parent) {
+        this.id = id;
+        this.parent = parent;
+    }
+
+    public ParentProcessGroupSearchNode getParent() {
+        return parent;
+    }
+
+    public String getId() {
+        return id;
+    }
+}

http://git-wip-us.apache.org/repos/asf/nifi/blob/97dc20e2/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index 75c1e60..feb302a 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -89,13 +89,13 @@ public class ProvenanceEventConsumer {
         }
     }
 
-    public void addTargetEventType(final ProvenanceEventType ... types) {
+    public void addTargetEventType(final ProvenanceEventType... types) {
         for (ProvenanceEventType type : types) {
             eventTypes.add(type);
         }
     }
 
-    public void addTargetComponentId(final String ... ids) {
+    public void addTargetComponentId(final String... ids) {
         for (String id : ids) {
             componentIds.add(id);
         }
@@ -122,12 +122,13 @@ public class ProvenanceEventConsumer {
         }
         final EventAccess eventAccess = context.getEventAccess();
         final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
-        final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
+        final ParentProcessGroupSearchNode rootNode = new ParentProcessGroupSearchNode(procGroupStatus.getId(), null);
+        final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus, rootNode);
         final StateManager stateManager = context.getStateManager();
 
         Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
 
-        if(currMaxId == null) {
+        if (currMaxId == null) {
             logger.debug("No events to send because no events have been created yet.");
             return;
         }
@@ -156,7 +157,7 @@ public class ProvenanceEventConsumer {
                     firstEventId = -1;
                 } else {
                     logger.warn("Current provenance max id is {} which is less than what was stored in state as the last queried event, which was {}. This means the provenance restarted its " +
-                            "ids. Restarting querying from the latest event in the Provenance Repository.", new Object[] {currMaxId, firstEventId});
+                            "ids. Restarting querying from the latest event in the Provenance Repository.", new Object[]{currMaxId, firstEventId});
                     firstEventId = currMaxId;
                 }
             }
@@ -218,7 +219,7 @@ public class ProvenanceEventConsumer {
             stateManager.setState(newMapOfState, Scope.LOCAL);
         } catch (final IOException ioe) {
             logger.error("Failed to update state to {} due to {}; this could result in events being re-sent after a restart. The message of {} was: {}",
-                    new Object[] {lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
+                    new Object[]{lastEventId, ioe, ioe, ioe.getMessage()}, ioe);
         }
 
         return lastEvent.getEventId() + 1;
@@ -230,28 +231,34 @@ public class ProvenanceEventConsumer {
     }
 
     private List<ProvenanceEventRecord> filterEvents(ComponentMapHolder componentMapHolder, List<ProvenanceEventRecord> provenanceEvents) {
-        if(isFilteringEnabled()) {
+        if (isFilteringEnabled()) {
             List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
 
             for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
-                if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
+                final String componentId = provenanceEventRecord.getComponentId();
+                if (!componentIds.isEmpty() && !componentIds.contains(componentId)) {
                     // If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
                     // that is being filtered on
                     if (componentMapHolder == null) {
                         continue;
                     }
-                    final String processGroupId = componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), provenanceEventRecord.getComponentType());
-                    if (processGroupId == null || processGroupId.isEmpty()) {
+                    final String processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType());
+                    if (StringUtils.isEmpty(processGroupId)) {
                         continue;
                     }
-                    if (componentMapHolder.getProcessGroupIdStack(processGroupId).stream().noneMatch(pgid -> componentIds.contains(pgid))) {
+                    // Check if any parent process group has the specified component ID
+                    ParentProcessGroupSearchNode matchedComponent = componentMapHolder.getProcessGroupParent(componentId);
+                    while (matchedComponent != null && !matchedComponent.getId().equals(processGroupId) && !componentIds.contains(matchedComponent.getId())) {
+                        matchedComponent = matchedComponent.getParent();
+                    }
+                    if (matchedComponent == null) {
                         continue;
                     }
                 }
-                if(!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
+                if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
                     continue;
                 }
-                if(componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {
+                if (componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {
                     continue;
                 }
                 filteredEvents.add(provenanceEventRecord);

http://git-wip-us.apache.org/repos/asf/nifi/blob/97dc20e2/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index ec2e301..201361f 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -22,6 +22,7 @@ import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
 import org.apache.nifi.components.state.Scope;
 import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.flowfile.FlowFile;
 import org.apache.nifi.logging.ComponentLog;
 import org.apache.nifi.provenance.ProvenanceEventBuilder;
@@ -55,6 +56,7 @@ import java.util.UUID;
 import java.util.concurrent.atomic.AtomicInteger;
 
 import static org.junit.Assert.assertEquals;
+import static org.mockito.Mockito.when;
 
 public class TestSiteToSiteProvenanceReportingTask {
 
@@ -65,7 +67,7 @@ public class TestSiteToSiteProvenanceReportingTask {
     private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
         final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
 
-        Mockito.when(context.getStateManager())
+        when(context.getStateManager())
                 .thenReturn(new MockStateManager(task));
         Mockito.doAnswer(new Answer<PropertyValue>() {
             @Override
@@ -104,6 +106,9 @@ public class TestSiteToSiteProvenanceReportingTask {
                 return eventsToReturn;
             }
         }).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
+        ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
+        processGroupStatus.setId("root");
+        when(eventAccess.getControllerStatus()).thenReturn(processGroupStatus);
 
         final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
         Mockito.doAnswer(new Answer<Long>() {
@@ -113,12 +118,12 @@ public class TestSiteToSiteProvenanceReportingTask {
             }
         }).when(provenanceRepository).getMaxEventId();
 
-        Mockito.when(context.getEventAccess()).thenReturn(eventAccess);
-        Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+        when(context.getEventAccess()).thenReturn(eventAccess);
+        when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
 
         final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
-        Mockito.when(initContext.getLogger()).thenReturn(logger);
+        when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
+        when(initContext.getLogger()).thenReturn(logger);
 
         return task;
     }
@@ -331,7 +336,7 @@ public class TestSiteToSiteProvenanceReportingTask {
 
         // setup the mock EventAccess to return the mock provenance repository
         final EventAccess eventAccess = Mockito.mock(EventAccess.class);
-        Mockito.when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
+        when(eventAccess.getProvenanceRepository()).thenReturn(provenanceRepository);
 
         task.initialize(initContext);
 
@@ -388,7 +393,7 @@ public class TestSiteToSiteProvenanceReportingTask {
                     }
                 }).when(transaction).send(Mockito.any(byte[].class), Mockito.any(Map.class));
 
-                Mockito.when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
+                when(client.createTransaction(Mockito.any(TransferDirection.class))).thenReturn(transaction);
             } catch (final Exception e) {
                 e.printStackTrace();
                 Assert.fail(e.toString());