You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2018/01/02 19:49:30 UTC
[2/4] nifi git commit: NIFI-4707: Improved S2SProvenanceReportingTask
NIFI-4707: Improved S2SProvenanceReportingTask
- Simplified consumeEvents method signature
- Refactored ComponentMapHolder methods visibility
- Renamed componentMap to componentNameMap
- Map more metadata from ConnectionStatus for Remote Input/Output Ports
- Support Process Group hierachy filtering
- Throw an exception when the reporting task fails to send provenance
data to keep current provenance event index so that events can be
consumed again
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/d65e6b25
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/d65e6b25
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/d65e6b25
Branch: refs/heads/master
Commit: d65e6b25630fa918ede2cd6922dc777e816679c3
Parents: 1f79392
Author: Koji Kawamura <ij...@apache.org>
Authored: Wed Dec 20 15:51:58 2017 +0900
Committer: Matthew Burgess <ma...@apache.org>
Committed: Tue Jan 2 14:46:42 2018 -0500
----------------------------------------------------------------------
.../atlas/reporting/ReportLineageToAtlas.java | 2 +-
.../util/provenance/ComponentMapHolder.java | 75 +++++++++++++-------
.../provenance/ProvenanceEventConsumer.java | 13 ++--
.../SiteToSiteProvenanceReportingTask.java | 10 +--
4 files changed, 66 insertions(+), 34 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
index f722b9d..5bb6024 100644
--- a/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
+++ b/nifi-nar-bundles/nifi-atlas-bundle/nifi-atlas-reporting-task/src/main/java/org/apache/nifi/atlas/reporting/ReportLineageToAtlas.java
@@ -640,7 +640,7 @@ public class ReportLineageToAtlas extends AbstractReportingTask {
final AnalysisContext analysisContext = new StandardAnalysisContext(nifiFlow, clusterResolvers,
// FIXME: This class cast shouldn't be necessary to query lineage. Possible refactor target in next major update.
(ProvenanceRepository)eventAccess.getProvenanceRepository());
- consumer.consumeEvents(context, context.getStateManager(), (componentMapHolder, events) -> {
+ consumer.consumeEvents(context, (componentMapHolder, events) -> {
for (ProvenanceEventRecord event : events) {
try {
lineageStrategy.processEvent(analysisContext, nifiFlow, event);
http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
index 495968a..342b5a2 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ComponentMapHolder.java
@@ -24,68 +24,95 @@ import org.apache.nifi.controller.status.RemoteProcessGroupStatus;
import java.util.HashMap;
import java.util.Map;
+import java.util.Stack;
public class ComponentMapHolder {
- final Map<String,String> componentMap = new HashMap<>();
- final Map<String,String> componentToParentGroupMap = new HashMap<>();
-
- public ComponentMapHolder putAll(ComponentMapHolder holder) {
- this.componentMap.putAll(holder.getComponentMap());
- this.componentToParentGroupMap.putAll(holder.getComponentToParentGroupMap());
+ private static final String REMOTE_INPUT_PORT = "Remote Input Port";
+ private static final String REMOTE_OUTPUT_PORT = "Remote Output Port";
+ private final Map<String,String> componentNameMap = new HashMap<>();
+ private final Map<String,String> componentToParentGroupMap = new HashMap<>();
+ private final Map<String,String> sourceToConnectionParentGroupMap = new HashMap<>();
+ private final Map<String,String> destinationToConnectionParentGroupMap = new HashMap<>();
+
+ private ComponentMapHolder putAll(ComponentMapHolder holder) {
+ this.componentNameMap.putAll(holder.componentNameMap);
+ this.componentToParentGroupMap.putAll(holder.componentToParentGroupMap);
+ this.sourceToConnectionParentGroupMap.putAll(holder.sourceToConnectionParentGroupMap);
+ this.destinationToConnectionParentGroupMap.putAll(holder.destinationToConnectionParentGroupMap);
return this;
}
- public Map<String, String> getComponentMap() {
- return componentMap;
- }
-
- public Map<String, String> getComponentToParentGroupMap() {
- return componentToParentGroupMap;
+ public String getComponentName(final String componentId) {
+ return componentNameMap.get(componentId);
}
- public String getComponentName(final String componentId) {
- return componentMap.get(componentId);
+ public Stack<String> getProcessGroupIdStack(final String startingProcessGroupId) {
+ final Stack<String> stack = new Stack<>();
+ String processGroupId = startingProcessGroupId;
+ stack.push(startingProcessGroupId);
+ while (componentToParentGroupMap.containsKey(processGroupId)) {
+ final String parentGroupId = componentToParentGroupMap.get(processGroupId);
+ if (parentGroupId == null || parentGroupId.isEmpty()) {
+ break;
+ }
+ stack.push(parentGroupId);
+ processGroupId = parentGroupId;
+ }
+ return stack;
}
- public String getProcessGroupId(final String componentId) {
+ public String getProcessGroupId(final String componentId, final String componentType) {
+ // Where a Remote Input/Output Port resides is only available at ConnectionStatus.
+ if (REMOTE_INPUT_PORT.equals(componentType)) {
+ return destinationToConnectionParentGroupMap.get(componentId);
+ } else if (REMOTE_OUTPUT_PORT.equals(componentType)) {
+ return sourceToConnectionParentGroupMap.get(componentId);
+ }
return componentToParentGroupMap.get(componentId);
}
public static ComponentMapHolder createComponentMap(final ProcessGroupStatus status) {
final ComponentMapHolder holder = new ComponentMapHolder();
- final Map<String,String> componentMap = holder.getComponentMap();
- final Map<String,String> componentToParentGroupMap = holder.getComponentToParentGroupMap();
+ final Map<String,String> componentNameMap = holder.componentNameMap;
+ final Map<String,String> componentToParentGroupMap = holder.componentToParentGroupMap;
+ final Map<String,String> sourceToConnectionParentGroupMap = holder.sourceToConnectionParentGroupMap;
+ final Map<String,String> destinationToConnectionParentGroupMap = holder.destinationToConnectionParentGroupMap;
if (status != null) {
- componentMap.put(status.getId(), status.getName());
+ componentNameMap.put(status.getId(), status.getName());
for (final ProcessorStatus procStatus : status.getProcessorStatus()) {
- componentMap.put(procStatus.getId(), procStatus.getName());
+ componentNameMap.put(procStatus.getId(), procStatus.getName());
componentToParentGroupMap.put(procStatus.getId(), status.getId());
}
for (final PortStatus portStatus : status.getInputPortStatus()) {
- componentMap.put(portStatus.getId(), portStatus.getName());
+ componentNameMap.put(portStatus.getId(), portStatus.getName());
componentToParentGroupMap.put(portStatus.getId(), status.getId());
}
for (final PortStatus portStatus : status.getOutputPortStatus()) {
- componentMap.put(portStatus.getId(), portStatus.getName());
+ componentNameMap.put(portStatus.getId(), portStatus.getName());
componentToParentGroupMap.put(portStatus.getId(), status.getId());
}
for (final RemoteProcessGroupStatus rpgStatus : status.getRemoteProcessGroupStatus()) {
- componentMap.put(rpgStatus.getId(), rpgStatus.getName());
+ componentNameMap.put(rpgStatus.getId(), rpgStatus.getName());
componentToParentGroupMap.put(rpgStatus.getId(), status.getId());
}
for (final ConnectionStatus connectionStatus : status.getConnectionStatus()) {
- componentMap.put(connectionStatus.getId(), connectionStatus.getName());
+ componentNameMap.put(connectionStatus.getId(), connectionStatus.getName());
componentToParentGroupMap.put(connectionStatus.getId(), status.getId());
+ // Add source and destination for Remote Input/Output Ports because metadata for those are only available at ConnectionStatus.
+ componentNameMap.computeIfAbsent(connectionStatus.getSourceId(), k -> connectionStatus.getSourceName());
+ componentNameMap.computeIfAbsent(connectionStatus.getDestinationId(), k -> connectionStatus.getDestinationName());
+ sourceToConnectionParentGroupMap.put(connectionStatus.getSourceId(), connectionStatus.getGroupId());
+ destinationToConnectionParentGroupMap.put(connectionStatus.getDestinationId(), connectionStatus.getGroupId());
}
for (final ProcessGroupStatus childGroup : status.getProcessGroupStatus()) {
- componentMap.put(childGroup.getId(), childGroup.getName());
+ componentNameMap.put(childGroup.getId(), childGroup.getName());
componentToParentGroupMap.put(childGroup.getId(), status.getId());
holder.putAll(createComponentMap(childGroup));
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index 8256626..75c1e60 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -113,7 +113,7 @@ public class ProvenanceEventConsumer {
this.logger = logger;
}
- public void consumeEvents(final ReportingContext context, final StateManager stateManager,
+ public void consumeEvents(final ReportingContext context,
final BiConsumer<ComponentMapHolder, List<ProvenanceEventRecord>> consumer) throws ProcessException {
if (context == null) {
@@ -123,6 +123,7 @@ public class ProvenanceEventConsumer {
final EventAccess eventAccess = context.getEventAccess();
final ProcessGroupStatus procGroupStatus = eventAccess.getControllerStatus();
final ComponentMapHolder componentMapHolder = ComponentMapHolder.createComponentMap(procGroupStatus);
+ final StateManager stateManager = context.getStateManager();
Long currMaxId = eventAccess.getProvenanceRepository().getMaxEventId();
@@ -234,12 +235,16 @@ public class ProvenanceEventConsumer {
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
if(!componentIds.isEmpty() && !componentIds.contains(provenanceEventRecord.getComponentId())) {
- // If we aren't filtering it out based on component ID, let's see if this component has a parent process group ID
+ // If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
// that is being filtered on
- if (componentMapHolder == null || componentMapHolder.getComponentToParentGroupMap().isEmpty()) {
+ if (componentMapHolder == null) {
continue;
}
- if (!componentIds.contains(componentMapHolder.getComponentToParentGroupMap().get(provenanceEventRecord.getComponentId()))) {
+ final String processGroupId = componentMapHolder.getProcessGroupId(provenanceEventRecord.getComponentId(), provenanceEventRecord.getComponentType());
+ if (processGroupId == null || processGroupId.isEmpty()) {
+ continue;
+ }
+ if (componentMapHolder.getProcessGroupIdStack(processGroupId).stream().noneMatch(pgid -> componentIds.contains(pgid))) {
continue;
}
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/d65e6b25/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index c99e9d8..61c8bc4 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -203,14 +203,14 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final DateFormat df = new SimpleDateFormat(TIMESTAMP_FORMAT);
df.setTimeZone(TimeZone.getTimeZone("Z"));
- consumer.consumeEvents(context, context.getStateManager(), (mapHolder, events) -> {
+ consumer.consumeEvents(context, (mapHolder, events) -> {
final long start = System.nanoTime();
// Create a JSON array of all the events in the current batch
final JsonArrayBuilder arrayBuilder = factory.createArrayBuilder();
for (final ProvenanceEventRecord event : events) {
final String componentName = mapHolder.getComponentName(event.getComponentId());
- final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId());
- final String processGroupName = mapHolder.getComponentMap().get(processGroupId);
+ final String processGroupId = mapHolder.getProcessGroupId(event.getComponentId(), event.getComponentType());
+ final String processGroupName = mapHolder.getComponentName(processGroupId);
arrayBuilder.add(serialize(factory, builder, event, df, componentName, processGroupId, processGroupName, hostname, url, rootGroupName, platform, nodeId));
}
final JsonArray jsonArray = arrayBuilder.build();
@@ -219,8 +219,8 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
try {
final Transaction transaction = getClient().createTransaction(TransferDirection.SEND);
if (transaction == null) {
- getLogger().debug("All destination nodes are penalized; will attempt to send data later");
- return;
+ // Throw an exception to avoid provenance event id will not proceed so that those can be consumed again.
+ throw new ProcessException("All destination nodes are penalized; will attempt to send data later");
}
final Map<String, String> attributes = new HashMap<>();