You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/01/13 00:40:27 UTC
nifi git commit: NIFI-4768: Add exclusion filters to
S2SProvenanceReportingTask
Repository: nifi
Updated Branches:
refs/heads/master 5e3867011 -> 83d293009
NIFI-4768: Add exclusion filters to S2SProvenanceReportingTask
NIFI-4768: Updated exclusion logic per review comments
This closes #2397.
Signed-off-by: Koji Kawamura <ij...@apache.org>
Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/83d29300
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/83d29300
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/83d29300
Branch: refs/heads/master
Commit: 83d29300953fa86e89cb30c59dcb86ed660557cc
Parents: 5e38670
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Jan 11 15:00:03 2018 -0500
Committer: Koji Kawamura <ij...@apache.org>
Committed: Sat Jan 13 09:47:21 2018 +0900
----------------------------------------------------------------------
.../provenance/ProvenanceEventConsumer.java | 68 ++++++++++++++---
.../SiteToSiteProvenanceReportingTask.java | 57 ++++++++++++++-
.../TestSiteToSiteProvenanceReportingTask.java | 77 ++++++++++++++++++++
3 files changed, 188 insertions(+), 14 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/nifi/blob/83d29300/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index 18a8afe..3473475 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -32,6 +32,7 @@ import org.apache.nifi.reporting.ReportingContext;
import java.io.IOException;
import java.util.ArrayList;
+import java.util.Collections;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
@@ -66,8 +67,11 @@ public class ProvenanceEventConsumer {
private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue();
private Pattern componentTypeRegex;
- private List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
- private List<String> componentIds = new ArrayList<String>();
+ private Pattern componentTypeRegexExclude;
+ private List<ProvenanceEventType> eventTypes = new ArrayList<>();
+ private List<ProvenanceEventType> eventTypesExclude = new ArrayList<>();
+ private List<String> componentIds = new ArrayList<>();
+ private List<String> componentIdsExclude = new ArrayList<>();
private int batchSize = Integer.parseInt(PROVENANCE_BATCH_SIZE.getDefaultValue());
private volatile long firstEventId = -1L;
@@ -89,16 +93,26 @@ public class ProvenanceEventConsumer {
}
}
- public void addTargetEventType(final ProvenanceEventType... types) {
- for (ProvenanceEventType type : types) {
- eventTypes.add(type);
+ public void setComponentTypeRegexExclude(final String componentTypeRegex) {
+ if (!StringUtils.isBlank(componentTypeRegex)) {
+ this.componentTypeRegexExclude = Pattern.compile(componentTypeRegex);
}
}
+ public void addTargetEventType(final ProvenanceEventType... types) {
+ Collections.addAll(eventTypes, types);
+ }
+
+ public void addTargetEventTypeExclude(final ProvenanceEventType... types) {
+ Collections.addAll(eventTypesExclude, types);
+ }
+
public void addTargetComponentId(final String... ids) {
- for (String id : ids) {
- componentIds.add(id);
- }
+ Collections.addAll(componentIds, ids);
+ }
+
+ public void addTargetComponentIdExclude(final String... ids) {
+ Collections.addAll(componentIdsExclude, ids);
}
public void setScheduled(boolean scheduled) {
@@ -226,7 +240,8 @@ public class ProvenanceEventConsumer {
private boolean isFilteringEnabled() {
- return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
+ return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty()
+ || componentTypeRegexExclude != null || !eventTypesExclude.isEmpty() || !componentIdsExclude.isEmpty();
}
private List<ProvenanceEventRecord> filterEvents(ComponentMapHolder componentMapHolder, List<ProvenanceEventRecord> provenanceEvents) {
@@ -234,7 +249,38 @@ public class ProvenanceEventConsumer {
List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
+ if (!eventTypesExclude.isEmpty() && eventTypesExclude.contains(provenanceEventRecord.getEventType())) {
+ continue;
+ }
+ if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
+ continue;
+ }
final String componentId = provenanceEventRecord.getComponentId();
+ if (!componentIdsExclude.isEmpty()) {
+ if (componentIdsExclude.contains(componentId)) {
+ continue;
+ }
+ // If we aren't excluding it based on component ID, let's see if this component has a parent process group IDs
+ // that is being excluded
+ if (componentMapHolder == null) {
+ continue;
+ }
+ final String processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType());
+ if (!StringUtils.isEmpty(processGroupId)) {
+
+ // Check if the process group or any parent process group is specified as a target component ID.
+ if (componentIdsExclude.contains(processGroupId)) {
+ continue;
+ }
+ ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
+ while (parentProcessGroup != null && !componentIdsExclude.contains(parentProcessGroup.getId())) {
+ parentProcessGroup = parentProcessGroup.getParent();
+ }
+ if (parentProcessGroup != null) {
+ continue;
+ }
+ }
+ }
if (!componentIds.isEmpty() && !componentIds.contains(componentId)) {
// If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
// that is being filtered on
@@ -245,7 +291,6 @@ public class ProvenanceEventConsumer {
if (StringUtils.isEmpty(processGroupId)) {
continue;
}
- // Check if the process group or any parent process group is specified as a target component ID.
if (!componentIds.contains(processGroupId)) {
ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) {
@@ -256,7 +301,8 @@ public class ProvenanceEventConsumer {
}
}
}
- if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
+
+ if (componentTypeRegexExclude != null && componentTypeRegexExclude.matcher(provenanceEventRecord.getComponentType()).matches()) {
continue;
}
if (componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {
http://git-wip-us.apache.org/repos/asf/nifi/blob/83d29300/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 61c8bc4..8331b46 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -87,7 +87,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-event-filter")
- .displayName("Event Type")
+ .displayName("Event Type to Include")
.description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
+ "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If "
+ "multiple filters are set, the filters are cumulative.")
@@ -95,24 +95,55 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-event-filter-exclude")
+ .displayName("Event Type to Exclude")
+ .description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. "
+ + "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If "
+ + "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the "
+ + "exclusion takes precedence and the event will not be sent.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
.name("s2s-prov-task-type-filter")
- .displayName("Component Type")
+ .displayName("Component Type to Include")
.description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
+ "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false)
.addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
.build();
+ static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-type-filter-exclude")
+ .displayName("Component Type to Exclude")
+ .description("Regular expression to exclude the provenance events based on the component type. The events matching the regular "
+ + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+ + "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+ .required(false)
+ .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+ .build();
+
static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
.name("s2s-prov-task-id-filter")
- .displayName("Component ID")
+ .displayName("Component ID to Include")
.description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
+ "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
.required(false)
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
.build();
+ static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder()
+ .name("s2s-prov-task-id-filter-exclude")
+ .displayName("Component ID to Exclude")
+ .description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no "
+ + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in "
+ + "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+ .required(false)
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
.name("start-position")
.displayName("Start Position")
@@ -133,6 +164,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
// initialize component type filtering
consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
+ consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).getValue());
final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ','));
if(targetEventTypes != null) {
@@ -145,12 +177,28 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
}
}
+ final String[] targetEventTypesExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE).getValue(), ','));
+ if(targetEventTypesExclude != null) {
+ for(String type : targetEventTypesExclude) {
+ try {
+ consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
+ } catch (Exception e) {
+ getLogger().warn(type + " is not a correct event type, removed from the exclude filtering.");
+ }
+ }
+ }
+
// initialize component ID filtering
final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ','));
if(targetComponentIds != null) {
consumer.addTargetComponentId(targetComponentIds);
}
+ final String[] targetComponentIdsExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE).getValue(), ','));
+ if(targetComponentIdsExclude != null) {
+ consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
+ }
+
consumer.setScheduled(true);
}
@@ -166,8 +214,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
properties.add(PLATFORM);
properties.add(FILTER_EVENT_TYPE);
+ properties.add(FILTER_EVENT_TYPE_EXCLUDE);
properties.add(FILTER_COMPONENT_TYPE);
+ properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
properties.add(FILTER_COMPONENT_ID);
+ properties.add(FILTER_COMPONENT_ID_EXCLUDE);
properties.add(START_POSITION);
return properties;
}
http://git-wip-us.apache.org/repos/asf/nifi/blob/83d29300/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index 26d5877..31054c2 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -273,6 +273,25 @@ public class TestSiteToSiteProvenanceReportingTask {
}
@Test
+ public void testFilterComponentTypeExcludeSuccess() throws IOException, InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
+
+ ProvenanceEventRecord event = createProvenanceEventRecord();
+
+ MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(0, task.dataSent.size());
+ }
+
+ @Test
public void testFilterComponentTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
@@ -292,6 +311,25 @@ public class TestSiteToSiteProvenanceReportingTask {
}
@Test
+ public void testFilterComponentTypeNoResultExcluded() throws IOException, InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "proc.*");
+
+ ProvenanceEventRecord event = createProvenanceEventRecord();
+
+ MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(3, task.dataSent.size());
+ }
+
+ @Test
public void testFilterEventTypeSuccess() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
@@ -311,6 +349,25 @@ public class TestSiteToSiteProvenanceReportingTask {
}
@Test
+ public void testFilterEventTypeExcludeSuccess() throws IOException, InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE_EXCLUDE, "RECEIVE, notExistingType, DROP");
+
+ ProvenanceEventRecord event = createProvenanceEventRecord();
+
+ MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(0, task.dataSent.size());
+ }
+
+ @Test
public void testFilterEventTypeNoResult() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
@@ -372,6 +429,26 @@ public class TestSiteToSiteProvenanceReportingTask {
}
@Test
+ public void testFilterMultiFilterExcludeTakesPrecedence() throws IOException, InitializationException {
+ final Map<PropertyDescriptor, String> properties = new HashMap<>();
+ for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+ properties.put(descriptor, descriptor.getDefaultValue());
+ }
+ properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+ properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
+ properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE");
+
+ ProvenanceEventRecord event = createProvenanceEventRecord();
+
+ MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+ task.initialize(initContext);
+ task.onScheduled(confContext);
+ task.onTrigger(context);
+
+ assertEquals(0, task.dataSent.size());
+ }
+
+ @Test
public void testFilterProcessGroupId() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {