You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/01/02 19:49:32 UTC
[4/4] nifi git commit: NIFI-4707: Fixed ProcessGroup tree
NIFI-4707: Fixed ProcessGroup tree
- Removed duplicated creation of a ParentProcessGroupSearchNode for the
root ProcessGroup.
- Removed duplicated creation of a ParentProcessGroupSearchNode for each
component inside a ProcessGroup.
- Fixed ProcessGroup id hierarchy.
- Fixed filtering logic.
- Added unit tests for filtering by ProcessGroupId and Remote
Input/Output ports.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #2351
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/84cecfbe
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/84cecfbe
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/84cecfbe
Branch: refs/heads/master
Commit: 84cecfbeea82e81faef77c5b9de76f54bd965316
Parents: 97dc20e
Author: Koji Kawamura <ij...@apache.org>
Authored: Mon Dec 25 16:17:27 2017 +0900
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jan 2 14:49:00 2018 -0500
----------------------------------------------------------------------
.../util/provenance/ComponentMapHolder.java | 28 ++--
.../provenance/ProvenanceEventConsumer.java | 19 ++-
.../TestSiteToSiteProvenanceReportingTask.java | 163 ++++++++++++++++++-
3 files changed, 180 insertions(+), 30 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/84cecfbe/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
index 43372e1..e85eca3 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
@@ -60,7 +60,11 @@ public class ComponentMapHolder {
return componentToParentGroupMap.get(componentId);
}
- public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisNode) {
+ public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
+ return createComponentMap(status, new ParentProcessGroupSearchNode(status.getId(), null));
+ }
+
+ private static ComponentMapHolder createComponentMap(final ProcessGroupStatus status, final ParentProcessGroupSearchNode thisProcessGroupNode) {
final ComponentMapHolder holder = new ComponentMapHolder();
final Map<String,String> componentNameMap = holder.componentNameMap;
final Map<String,ParentProcessGroupSearchNode> componentToParentGroupMap = holder.componentToParentGroupMap;
@@ -68,37 +72,31 @@ public class ComponentMapHolder {
final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
if (status != null) {
- ParentProcessGroupSearchNode parentNode = thisNode;
componentNameMap.put(status.getId(), status.getName());
- // Put a root entry in if one does not yet exist
- if (parentNode == null) {
- parentNode = new ParentProcessGroupSearchNode(status.getId(), null);
- componentToParentGroupMap.put(status.getId(), parentNode);
- }
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
componentNameMap.put(procStatus.getId(), procStatus.getName());
- componentToParentGroupMap.put(procStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+ componentToParentGroupMap.put(procStatus.getId(), thisProcessGroupNode);
}
for (final PortStatus portStatus : status.getInputPortStatus()) {
componentNameMap.put(portStatus.getId(), portStatus.getName());
- componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+ componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode);
}
for (final PortStatus portStatus : status.getOutputPortStatus()) {
componentNameMap.put(portStatus.getId(), portStatus.getName());
- componentToParentGroupMap.put(portStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+ componentToParentGroupMap.put(portStatus.getId(), thisProcessGroupNode);
}
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
- componentToParentGroupMap.put(rpgStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+ componentToParentGroupMap.put(rpgStatus.getId(), thisProcessGroupNode);
}
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
- componentToParentGroupMap.put(connectionStatus.getId(), new ParentProcessGroupSearchNode(status.getId(), parentNode));
+ componentToParentGroupMap.put(connectionStatus.getId(), thisProcessGroupNode);
// Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
@@ -108,9 +106,9 @@ public class ComponentMapHolder {
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
componentNameMap.put(childGroup.getId(), childGroup.getName());
- ParentProcessGroupSearchNode node = new ParentProcessGroupSearchNode(status.getId(), parentNode);
- componentToParentGroupMap.put(childGroup.getId(), node);
- holder.putAll(createComponentMap(childGroup, node));
+ ParentProcessGroupSearchNode childProcessGroupNode = new ParentProcessGroupSearchNode(childGroup.getId(), thisProcessGroupNode);
+ componentToParentGroupMap.put(childGroup.getId(), thisProcessGroupNode);
+ holder.putAll(createComponentMap(childGroup, childProcessGroupNode));
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/84cecfbe/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index feb302a..18a8afe 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -122,8 +122,7 @@ public class ProvenanceEventConsumer {
}
final EventAccess eventAccess = context.getEventAccess();
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
- final ParentProcessGroupSearchNode rootNode = new ParentProcessGroupSearchNode(procGroupStatus.getId(), null);
- final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus, rootNode);
+ final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
final StateManager stateManager = context.getStateManager();
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
@@ -246,13 +245,15 @@ public class ProvenanceEventConsumer {
if (StringUtils.isEmpty(processGroupId)) {
continue;
}
- // Check if any parent process group has the specified component ID
- ParentProcessGroupSearchNode matchedComponent = componentMapHolder.getProcessGroupParent(componentId);
- while (matchedComponent != null && !matchedComponent.getId().equals(processGroupId) && !componentIds.contains(matchedComponent.getId())) {
- matchedComponent = matchedComponent.getParent();
- }
- if (matchedComponent == null) {
- continue;
+ // Check if the process group or any parent process group is specified as a target component ID.
+ if (!componentIds.contains(processGroupId)) {
+ ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
+ while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) {
+ parentProcessGroup = parentProcessGroup.getParent();
+ }
+ if (parentProcessGroup == null) {
+ continue;
+ }
}
}
if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/84cecfbe/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index 201361f..26d5877 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -18,11 +18,15 @@
package org.apache.nifi.reporting;
+import com.fasterxml.jackson.databind.JsonNode;
+import com.fasterxml.jackson.databind.ObjectMapper;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
import org.apache.nifi.components.state.Scope;
import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
+import org.apache.nifi.controller.status.ProcessorStatus;
import org.apache.nifi.flowfile.FlowFile;
import org.apache.nifi.logging.ComponentLog;
import org.apache.nifi.provenance.ProvenanceEventBuilder;
@@ -49,6 +53,7 @@ import java.io.ByteArrayInputStream;
import java.io.IOException;
import java.nio.charset.StandardCharsets;
import java.util.ArrayList;
+import java.util.Collection;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -65,6 +70,10 @@ public class TestSiteToSiteProvenanceReportingTask {
private final ConfigurationContext confContext = Mockito.mock(ConfigurationContext.class);
private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties) throws IOException {
+ return setup(event, properties, 2500);
+ }
+
+ private MockSiteToSiteProvenanceReportingTask setup(ProvenanceEventRecord event, Map<PropertyDescriptor, String> properties, long maxEventId) throws IOException {
final MockSiteToSiteProvenanceReportingTask task = new MockSiteToSiteProvenanceReportingTask();
when(context.getStateManager())
@@ -85,7 +94,6 @@ public class TestSiteToSiteProvenanceReportingTask {
}
}).when(confContext).getProperty(Mockito.any(PropertyDescriptor.class));
- final long maxEventId = 2500;
final AtomicInteger totalEvents = new AtomicInteger(0);
final EventAccess eventAccess = Mockito.mock(EventAccess.class);
@@ -106,9 +114,65 @@ public class TestSiteToSiteProvenanceReportingTask {
return eventsToReturn;
}
}).when(eventAccess).getProvenanceEvents(Mockito.anyLong(), Mockito.anyInt());
- ProcessGroupStatus processGroupStatus = new ProcessGroupStatus();
- processGroupStatus.setId("root");
- when(eventAccess.getControllerStatus()).thenReturn(processGroupStatus);
+ ProcessGroupStatus pgRoot = new ProcessGroupStatus();
+ pgRoot.setId("root");
+ when(eventAccess.getControllerStatus()).thenReturn(pgRoot);
+
+ // Add child Process Groups.
+ // Root -> (A, B -> (B2 -> (B3)))
+ final ProcessGroupStatus pgA = new ProcessGroupStatus();
+ pgA.setId("pgA");
+ final ProcessGroupStatus pgB = new ProcessGroupStatus();
+ pgB.setId("pgB");
+ final ProcessGroupStatus pgB2 = new ProcessGroupStatus();
+ pgB2.setId("pgB2");
+ final ProcessGroupStatus pgB3 = new ProcessGroupStatus();
+ pgB3.setId("pgB3");
+ final Collection<ProcessGroupStatus> childPGs = pgRoot.getProcessGroupStatus();
+ childPGs.add(pgA);
+ childPGs.add(pgB);
+ pgB.getProcessGroupStatus().add(pgB2);
+ pgB2.getProcessGroupStatus().add(pgB3);
+
+ // Add Processors.
+ final ProcessorStatus prcRoot = new ProcessorStatus();
+ prcRoot.setId("1234");
+ pgRoot.getProcessorStatus().add(prcRoot);
+
+ final ProcessorStatus prcA = new ProcessorStatus();
+ prcA.setId("A001");
+ prcA.setName("Processor in PGA");
+ pgA.getProcessorStatus().add(prcA);
+
+ final ProcessorStatus prcB = new ProcessorStatus();
+ prcB.setId("B001");
+ prcB.setName("Processor in PGB");
+ pgB.getProcessorStatus().add(prcB);
+
+ final ProcessorStatus prcB2 = new ProcessorStatus();
+ prcB2.setId("B201");
+ prcB2.setName("Processor in PGB2");
+ pgB2.getProcessorStatus().add(prcB2);
+
+ final ProcessorStatus prcB3 = new ProcessorStatus();
+ prcB3.setId("B301");
+ prcB3.setName("Processor in PGB3");
+ pgB3.getProcessorStatus().add(prcB3);
+
+ // Add connection status to test Remote Input/Output Ports
+ final ConnectionStatus b2RemoteInputPort = new ConnectionStatus();
+ b2RemoteInputPort.setGroupId("pgB2");
+ b2RemoteInputPort.setSourceId("B201");
+ b2RemoteInputPort.setDestinationId("riB2");
+ b2RemoteInputPort.setDestinationName("Remote Input Port name");
+ pgB2.getConnectionStatus().add(b2RemoteInputPort);
+
+ final ConnectionStatus b3RemoteOutputPort = new ConnectionStatus();
+ b3RemoteOutputPort.setGroupId("pgB3");
+ b3RemoteOutputPort.setSourceId("roB3");
+ b3RemoteOutputPort.setSourceName("Remote Output Port name");
+ b3RemoteOutputPort.setDestinationId("B301");
+ pgB3.getConnectionStatus().add(b3RemoteOutputPort);
final ProvenanceEventRepository provenanceRepository = Mockito.mock(ProvenanceEventRepository.class);
Mockito.doAnswer(new Answer<Long>() {
@@ -308,6 +372,90 @@ public class TestSiteToSiteProvenanceReportingTask {
}
@Test
+ public void testFilterProcessGroupId() throws IOException, InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "pgB2");
+
+
+ // B201 belongs to ProcessGroup B2, so it should be picked.
+ ProvenanceEventRecord event = createProvenanceEventRecord("B201", "dummy");
+ MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(1, task.dataSent.size());
+ JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+ assertEquals("B201", reportedEvent.get("componentId").asText());
+ assertEquals("Processor in PGB2", reportedEvent.get("componentName").asText());
+
+
+ // B301 belongs to PG B3, whose parent is PGB2, so it should be picked, too.
+ event = createProvenanceEventRecord("B301", "dummy");
+ task = setup(event, properties, 1);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(1, task.dataSent.size());
+ reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+ assertEquals("B301", reportedEvent.get("componentId").asText());
+ assertEquals("Processor in PGB3", reportedEvent.get("componentName").asText());
+
+ // A001 belongs to PG A, whose parent is the root PG, so it should be filtered out.
+ event = createProvenanceEventRecord("A001", "dummy");
+ task = setup(event, properties, 1);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(0, task.dataSent.size());
+ }
+
+ @Test
+ public void testRemotePorts() throws IOException, InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_ID, "riB2,roB3");
+
+
+ // riB2 is a Remote Input Port in Process Group B2.
+ ProvenanceEventRecord event = createProvenanceEventRecord("riB2", "Remote Input Port");
+ MockSiteToSiteProvenanceReportingTask task = setup(event, properties, 1);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(1, task.dataSent.size());
+ JsonNode reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+ assertEquals("riB2", reportedEvent.get("componentId").asText());
+ assertEquals("Remote Input Port name", reportedEvent.get("componentName").asText());
+ assertEquals("pgB2", reportedEvent.get("processGroupId").asText());
+
+
+ // roB3 is a Remote Output Port in Process Group B3.
+ event = createProvenanceEventRecord("roB3", "Remote Output Port");
+ task = setup(event, properties, 1);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(1, task.dataSent.size());
+ reportedEvent = new ObjectMapper().readTree(task.dataSent.get(0)).get(0);
+ assertEquals("roB3", reportedEvent.get("componentId").asText());
+ assertEquals("Remote Output Port name", reportedEvent.get("componentName").asText());
+ assertEquals("pgB3", reportedEvent.get("processGroupId").asText());
+
+ }
+
+ @Test
public void testWhenProvenanceMaxIdEqualToLastEventIdInStateManager() throws IOException, InitializationException {
final long maxEventId = 2500;
@@ -353,6 +501,9 @@ public class TestSiteToSiteProvenanceReportingTask {
}
private ProvenanceEventRecord createProvenanceEventRecord() {
+ return createProvenanceEventRecord("1234", "dummy processor");
+ }
+ private ProvenanceEventRecord createProvenanceEventRecord(final String componentId, final String componentType) {
final String uuid = "10000000-0000-0000-0000-000000000000";
final Map<String, String> attributes = new HashMap<>();
attributes.put("abc", "xyz");
@@ -369,8 +520,8 @@ public class TestSiteToSiteProvenanceReportingTask {
attributes.put("uuid", uuid);
builder.fromFlowFile(createFlowFile(3L, attributes));
builder.setAttributes(prevAttrs, attributes);
- builder.setComponentId("1234");
- builder.setComponentType("dummy processor");
+ builder.setComponentId(componentId);
+ builder.setComponentType(componentType);
return builder.build();
}