You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ij...@apache.org on 2018/01/13 00:40:27 UTC

nifi git commit: NIFI-4768: Add exclusion filters to S2SProvenanceReportingTask

Repository: nifi
Updated Branches:
  refs/heads/master 5e3867011 -> 83d293009


NIFI-4768: Add exclusion filters to S2SProvenanceReportingTask

NIFI-4768: Updated exclusion logic per review comments

This closes #2397.

Signed-off-by: Koji Kawamura <ij...@apache.org>


Project: http://git-wip-us.apache.org/repos/asf/nifi/repo
Commit: http://git-wip-us.apache.org/repos/asf/nifi/commit/83d29300
Tree: http://git-wip-us.apache.org/repos/asf/nifi/tree/83d29300
Diff: http://git-wip-us.apache.org/repos/asf/nifi/diff/83d29300

Branch: refs/heads/master
Commit: 83d29300953fa86e89cb30c59dcb86ed660557cc
Parents: 5e38670
Author: Matthew Burgess <ma...@apache.org>
Authored: Thu Jan 11 15:00:03 2018 -0500
Committer: Koji Kawamura <ij...@apache.org>
Committed: Sat Jan 13 09:47:21 2018 +0900

----------------------------------------------------------------------
 .../provenance/ProvenanceEventConsumer.java     | 68 ++++++++++++++---
 .../SiteToSiteProvenanceReportingTask.java      | 57 ++++++++++++++-
 .../TestSiteToSiteProvenanceReportingTask.java  | 77 ++++++++++++++++++++
 3 files changed, 188 insertions(+), 14 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/nifi/blob/83d29300/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
index 18a8afe..3473475 100644
--- a/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
+++ b/nifi-nar-bundles/nifi-extension-utils/nifi-reporting-utils/src/main/java/org/apache/nifi/reporting/util/provenance/ProvenanceEventConsumer.java
@@ -32,6 +32,7 @@ import org.apache.nifi.reporting.ReportingContext;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
@@ -66,8 +67,11 @@ public class ProvenanceEventConsumer {
 
     private String startPositionValue = PROVENANCE_START_POSITION.getDefaultValue();
     private Pattern componentTypeRegex;
-    private List<ProvenanceEventType> eventTypes = new ArrayList<ProvenanceEventType>();
-    private List<String> componentIds = new ArrayList<String>();
+    private Pattern componentTypeRegexExclude;
+    private List<ProvenanceEventType> eventTypes = new ArrayList<>();
+    private List<ProvenanceEventType> eventTypesExclude = new ArrayList<>();
+    private List<String> componentIds = new ArrayList<>();
+    private List<String> componentIdsExclude = new ArrayList<>();
     private int batchSize = Integer.parseInt(PROVENANCE_BATCH_SIZE.getDefaultValue());
 
     private volatile long firstEventId = -1L;
@@ -89,16 +93,26 @@ public class ProvenanceEventConsumer {
         }
     }
 
-    public void addTargetEventType(final ProvenanceEventType... types) {
-        for (ProvenanceEventType type : types) {
-            eventTypes.add(type);
+    public void setComponentTypeRegexExclude(final String componentTypeRegex) {
+        if (!StringUtils.isBlank(componentTypeRegex)) {
+            this.componentTypeRegexExclude = Pattern.compile(componentTypeRegex);
         }
     }
 
+    public void addTargetEventType(final ProvenanceEventType... types) {
+        Collections.addAll(eventTypes, types);
+    }
+
+    public void addTargetEventTypeExclude(final ProvenanceEventType... types) {
+        Collections.addAll(eventTypesExclude, types);
+    }
+
     public void addTargetComponentId(final String... ids) {
-        for (String id : ids) {
-            componentIds.add(id);
-        }
+        Collections.addAll(componentIds, ids);
+    }
+
+    public void addTargetComponentIdExclude(final String... ids) {
+        Collections.addAll(componentIdsExclude, ids);
     }
 
     public void setScheduled(boolean scheduled) {
@@ -226,7 +240,8 @@ public class ProvenanceEventConsumer {
 
 
     private boolean isFilteringEnabled() {
-        return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty();
+        return componentTypeRegex != null || !eventTypes.isEmpty() || !componentIds.isEmpty()
+                || componentTypeRegexExclude != null || !eventTypesExclude.isEmpty() || !componentIdsExclude.isEmpty();
     }
 
     private List<ProvenanceEventRecord> filterEvents(ComponentMapHolder componentMapHolder, List<ProvenanceEventRecord> provenanceEvents) {
@@ -234,7 +249,38 @@ public class ProvenanceEventConsumer {
             List<ProvenanceEventRecord> filteredEvents = new ArrayList<>();
 
             for (ProvenanceEventRecord provenanceEventRecord : provenanceEvents) {
+                if (!eventTypesExclude.isEmpty() && eventTypesExclude.contains(provenanceEventRecord.getEventType())) {
+                    continue;
+                }
+                if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
+                    continue;
+                }
                 final String componentId = provenanceEventRecord.getComponentId();
+                if (!componentIdsExclude.isEmpty()) {
+                    if (componentIdsExclude.contains(componentId)) {
+                        continue;
+                    }
+                    // If we aren't excluding it based on component ID, let's see if this component has a parent process group IDs
+                    // that is being excluded
+                    if (componentMapHolder == null) {
+                        continue;
+                    }
+                    final String processGroupId = componentMapHolder.getProcessGroupId(componentId, provenanceEventRecord.getComponentType());
+                    if (!StringUtils.isEmpty(processGroupId)) {
+
+                        // Check if the process group or any parent process group is specified as a target component ID.
+                        if (componentIdsExclude.contains(processGroupId)) {
+                            continue;
+                        }
+                        ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
+                        while (parentProcessGroup != null && !componentIdsExclude.contains(parentProcessGroup.getId())) {
+                            parentProcessGroup = parentProcessGroup.getParent();
+                        }
+                        if (parentProcessGroup != null) {
+                            continue;
+                        }
+                    }
+                }
                 if (!componentIds.isEmpty() && !componentIds.contains(componentId)) {
                     // If we aren't filtering it out based on component ID, let's see if this component has a parent process group IDs
                     // that is being filtered on
@@ -245,7 +291,6 @@ public class ProvenanceEventConsumer {
                     if (StringUtils.isEmpty(processGroupId)) {
                         continue;
                     }
-                    // Check if the process group or any parent process group is specified as a target component ID.
                     if (!componentIds.contains(processGroupId)) {
                         ParentProcessGroupSearchNode parentProcessGroup = componentMapHolder.getProcessGroupParent(processGroupId);
                         while (parentProcessGroup != null && !componentIds.contains(parentProcessGroup.getId())) {
@@ -256,7 +301,8 @@ public class ProvenanceEventConsumer {
                         }
                     }
                 }
-                if (!eventTypes.isEmpty() && !eventTypes.contains(provenanceEventRecord.getEventType())) {
+
+                if (componentTypeRegexExclude != null && componentTypeRegexExclude.matcher(provenanceEventRecord.getComponentType()).matches()) {
                     continue;
                 }
                 if (componentTypeRegex != null && !componentTypeRegex.matcher(provenanceEventRecord.getComponentType()).matches()) {

http://git-wip-us.apache.org/repos/asf/nifi/blob/83d29300/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
index 61c8bc4..8331b46 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/main/java/org/apache/nifi/reporting/SiteToSiteProvenanceReportingTask.java
@@ -87,7 +87,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
     static final PropertyDescriptor FILTER_EVENT_TYPE = new PropertyDescriptor.Builder()
         .name("s2s-prov-task-event-filter")
-        .displayName("Event Type")
+        .displayName("Event Type to Include")
         .description("Comma-separated list of event types that will be used to filter the provenance events sent by the reporting task. "
                 + "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If "
                         + "multiple filters are set, the filters are cumulative.")
@@ -95,24 +95,55 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor FILTER_EVENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+            .name("s2s-prov-task-event-filter-exclude")
+            .displayName("Event Type to Exclude")
+            .description("Comma-separated list of event types that will be used to exclude the provenance events sent by the reporting task. "
+                    + "Available event types are " + Arrays.deepToString(ProvenanceEventType.values()) + ". If no filter is set, all the events are sent. If "
+                    + "multiple filters are set, the filters are cumulative. If an event type is included in Event Type to Include and excluded here, then the "
+                    + "exclusion takes precedence and the event will not be sent.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     static final PropertyDescriptor FILTER_COMPONENT_TYPE = new PropertyDescriptor.Builder()
         .name("s2s-prov-task-type-filter")
-        .displayName("Component Type")
+        .displayName("Component Type to Include")
         .description("Regular expression to filter the provenance events based on the component type. Only the events matching the regular "
                 + "expression will be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
         .required(false)
         .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor FILTER_COMPONENT_TYPE_EXCLUDE = new PropertyDescriptor.Builder()
+            .name("s2s-prov-task-type-filter-exclude")
+            .displayName("Component Type to Exclude")
+            .description("Regular expression to exclude the provenance events based on the component type. The events matching the regular "
+                    + "expression will not be sent. If no filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. "
+                    + "If a component type is included in Component Type to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+            .required(false)
+            .addValidator(StandardValidators.REGULAR_EXPRESSION_VALIDATOR)
+            .build();
+
     static final PropertyDescriptor FILTER_COMPONENT_ID = new PropertyDescriptor.Builder()
         .name("s2s-prov-task-id-filter")
-        .displayName("Component ID")
+        .displayName("Component ID to Include")
         .description("Comma-separated list of component UUID that will be used to filter the provenance events sent by the reporting task. If no "
                 + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative.")
         .required(false)
         .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
         .build();
 
+    static final PropertyDescriptor FILTER_COMPONENT_ID_EXCLUDE = new PropertyDescriptor.Builder()
+            .name("s2s-prov-task-id-filter-exclude")
+            .displayName("Component ID to Exclude")
+            .description("Comma-separated list of component UUID that will be used to exclude the provenance events sent by the reporting task. If no "
+                    + "filter is set, all the events are sent. If multiple filters are set, the filters are cumulative. If a component UUID is included in "
+                    + "Component ID to Include and excluded here, then the exclusion takes precedence and the event will not be sent.")
+            .required(false)
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
     static final PropertyDescriptor START_POSITION = new PropertyDescriptor.Builder()
         .name("start-position")
         .displayName("Start Position")
@@ -133,6 +164,7 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
 
         // initialize component type filtering
         consumer.setComponentTypeRegex(context.getProperty(FILTER_COMPONENT_TYPE).getValue());
+        consumer.setComponentTypeRegexExclude(context.getProperty(FILTER_COMPONENT_TYPE_EXCLUDE).getValue());
 
         final String[] targetEventTypes = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE).getValue(), ','));
         if(targetEventTypes != null) {
@@ -145,12 +177,28 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
             }
         }
 
+        final String[] targetEventTypesExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_EVENT_TYPE_EXCLUDE).getValue(), ','));
+        if(targetEventTypesExclude != null) {
+            for(String type : targetEventTypesExclude) {
+                try {
+                    consumer.addTargetEventTypeExclude(ProvenanceEventType.valueOf(type));
+                } catch (Exception e) {
+                    getLogger().warn(type + " is not a correct event type, removed from the exclude filtering.");
+                }
+            }
+        }
+
         // initialize component ID filtering
         final String[] targetComponentIds = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID).getValue(), ','));
         if(targetComponentIds != null) {
             consumer.addTargetComponentId(targetComponentIds);
         }
 
+        final String[] targetComponentIdsExclude = StringUtils.stripAll(StringUtils.split(context.getProperty(FILTER_COMPONENT_ID_EXCLUDE).getValue(), ','));
+        if(targetComponentIdsExclude != null) {
+            consumer.addTargetComponentIdExclude(targetComponentIdsExclude);
+        }
+
         consumer.setScheduled(true);
     }
 
@@ -166,8 +214,11 @@ public class SiteToSiteProvenanceReportingTask extends AbstractSiteToSiteReporti
         final List<PropertyDescriptor> properties = new ArrayList<>(super.getSupportedPropertyDescriptors());
         properties.add(PLATFORM);
         properties.add(FILTER_EVENT_TYPE);
+        properties.add(FILTER_EVENT_TYPE_EXCLUDE);
         properties.add(FILTER_COMPONENT_TYPE);
+        properties.add(FILTER_COMPONENT_TYPE_EXCLUDE);
         properties.add(FILTER_COMPONENT_ID);
+        properties.add(FILTER_COMPONENT_ID_EXCLUDE);
         properties.add(START_POSITION);
         return properties;
     }

http://git-wip-us.apache.org/repos/asf/nifi/blob/83d29300/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
----------------------------------------------------------------------
diff --git a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
index 26d5877..31054c2 100644
--- a/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
+++ b/nifi-nar-bundles/nifi-site-to-site-reporting-bundle/nifi-site-to-site-reporting-task/src/test/java/org/apache/nifi/reporting/TestSiteToSiteProvenanceReportingTask.java
@@ -273,6 +273,25 @@ public class TestSiteToSiteProvenanceReportingTask {
     }
 
     @Test
+    public void testFilterComponentTypeExcludeSuccess() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
     public void testFilterComponentTypeNoResult() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
@@ -292,6 +311,25 @@ public class TestSiteToSiteProvenanceReportingTask {
     }
 
     @Test
+    public void testFilterComponentTypeNoResultExcluded() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "proc.*");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(3, task.dataSent.size());
+    }
+
+    @Test
     public void testFilterEventTypeSuccess() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
@@ -311,6 +349,25 @@ public class TestSiteToSiteProvenanceReportingTask {
     }
 
     @Test
+    public void testFilterEventTypeExcludeSuccess() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE_EXCLUDE, "RECEIVE, notExistingType, DROP");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
     public void testFilterEventTypeNoResult() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
@@ -372,6 +429,26 @@ public class TestSiteToSiteProvenanceReportingTask {
     }
 
     @Test
+    public void testFilterMultiFilterExcludeTakesPrecedence() throws IOException, InitializationException {
+        final Map<PropertyDescriptor, String> properties = new HashMap<>();
+        for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {
+            properties.put(descriptor, descriptor.getDefaultValue());
+        }
+        properties.put(SiteToSiteProvenanceReportingTask.BATCH_SIZE, "1000");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_COMPONENT_TYPE_EXCLUDE, "dummy.*");
+        properties.put(SiteToSiteProvenanceReportingTask.FILTER_EVENT_TYPE, "RECEIVE");
+
+        ProvenanceEventRecord event = createProvenanceEventRecord();
+
+        MockSiteToSiteProvenanceReportingTask task = setup(event, properties);
+        task.initialize(initContext);
+        task.onScheduled(confContext);
+        task.onTrigger(context);
+
+        assertEquals(0, task.dataSent.size());
+    }
+
+    @Test
     public void testFilterProcessGroupId() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
         for (final PropertyDescriptor descriptor : new MockSiteToSiteProvenanceReportingTask().getSupportedPropertyDescriptors()) {