You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/11/08 15:26:58 UTC

[nifi] branch master updated: NIFI-6842 - Introduce MetricsEventReportingTask

This is an automated email from the ASF dual-hosted git repository.

mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git


The following commit(s) were added to refs/heads/master by this push:
     new c998a72  NIFI-6842 - Introduce MetricsEventReportingTask
c998a72 is described below

commit c998a7259aca73e716433d0582b30d7ed986c4b2
Author: Yolanda M. Davis <yo...@gmail.com>
AuthorDate: Tue Nov 5 14:25:58 2019 -0500

    NIFI-6842 - Introduce MetricsEventReportingTask
    
    NIFI-6842 - Added AlertHandler for bulletin reporting. Update ReportingTask meta data.
    
    NIFI-6842 - corrected display names in action handlers, included metrics option for alert handlers, small refactor in reporting task
    
    NIFI-6842 - updated docs and tags
    
    NIFI-6842 - Added documentation for handlers.
    
    Signed-off-by: Matthew Burgess <ma...@apache.org>
    
    This closes #3874
---
 nifi-assembly/pom.xml                              |  12 +
 .../apache/nifi/rules/handlers/AlertHandler.java   | 169 +++++++++++++
 .../nifi/rules/handlers/ExpressionHandler.java     |  10 +-
 .../org/apache/nifi/rules/handlers/LogHandler.java |  29 ++-
 .../nifi/rules/handlers/RecordSinkHandler.java     |   5 +-
 .../org.apache.nifi.controller.ControllerService   |   3 +-
 .../additionalDetails.html                         |  39 +++
 .../additionalDetails.html                         |  38 +++
 .../additionalDetails.html                         |  38 +++
 .../additionalDetails.html                         |  37 +++
 .../nifi/rules/handlers/TestAlertHandler.java      | 264 +++++++++++++++++++++
 .../nifi-sql-reporting-tasks/pom.xml               |   6 +
 .../reporting/sql/MetricsEventReportingTask.java   | 105 ++++++++
 .../nifi/reporting/sql/QueryNiFiReportingTask.java |  84 +------
 .../nifi/reporting/sql/util/QueryMetricsUtil.java  | 114 +++++++++
 .../org.apache.nifi.reporting.ReportingTask        |   3 +-
 .../additionalDetails.html                         |  34 +++
 ...ask.java => TestMetricsEventReportingTask.java} | 176 +++++---------
 .../reporting/sql/TestQueryNiFiReportingTask.java  |  21 +-
 .../rules/MockPropertyContextActionHandler.java    |  76 ++++++
 .../nifi/rules/engine/MockRulesEngineService.java  |  47 ++++
 21 files changed, 1094 insertions(+), 216 deletions(-)

diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index f3381a7..b036e65 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -878,6 +878,18 @@ language governing permissions and limitations under the License. -->
                     <version>1.11.0-SNAPSHOT</version>
                     <type>nar</type>
                 </dependency>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-easyrules-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
+                <dependency>
+                    <groupId>org.apache.nifi</groupId>
+                    <artifactId>nifi-rules-action-handler-nar</artifactId>
+                    <version>1.11.0-SNAPSHOT</version>
+                    <type>nar</type>
+                </dependency>
             </dependencies>
         </profile>
         <profile>
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java
new file mode 100644
index 0000000..234ea3c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules.handlers;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.rules.Action;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({"rules", "rules engine", "action", "action handler", "logging", "alerts", "bulletins"})
+@CapabilityDescription("Creates alerts as bulletins based on a provided action (usually created by a rules engine).  " +
+        "Action objects executed with this Handler should contain \"category\", \"message\", and \"logLevel\" attributes.")
+public class AlertHandler extends AbstractActionHandlerService {
+
+    public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder()
+            .name("alert-default-log-level")
+            .displayName("Default Alert Log Level")
+            .required(true)
+            .description("The default Log Level that will be used to log an alert message" +
+                    " if a log level was not provided in the received action's attributes.")
+            .allowableValues(DebugLevels.values())
+            .defaultValue("info")
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_CATEGORY = new PropertyDescriptor.Builder()
+            .name("alert-default-category")
+            .displayName("Default Category")
+            .required(true)
+            .description("The default category to use when logging alert message "+
+                    " if a category was not provided in the received action's attributes.")
+            .defaultValue("Rules Triggered Alert")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    public static final PropertyDescriptor DEFAULT_MESSAGE = new PropertyDescriptor.Builder()
+            .name("alert-default-message")
+            .displayName("Default Message")
+            .required(true)
+            .description("The default message to include in alert if an alert message was " +
+                    "not provided in the received action's attributes")
+            .defaultValue("An alert was triggered by a rules-based action.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .build();
+
+    private static final PropertyDescriptor INCLUDE_FACTS = new PropertyDescriptor.Builder()
+            .name("alert-include-facts")
+            .displayName("Include Fact Data")
+            .required(true)
+            .description("If true, the alert message will include the facts which triggered this action. Default is false.")
+            .defaultValue("true")
+            .allowableValues("true", "false")
+            .build();
+
+    private List<PropertyDescriptor> properties;
+    private String defaultCategory;
+    private String defaultLogLevel;
+    private String defaultMessage;
+    private Boolean includeFacts;
+
+    @Override
+    protected void init(ControllerServiceInitializationContext config) throws InitializationException {
+        super.init(config);
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(DEFAULT_LOG_LEVEL);
+        properties.add(DEFAULT_CATEGORY);
+        properties.add(DEFAULT_MESSAGE);
+        properties.add(INCLUDE_FACTS);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @OnEnabled
+    public void onEnabled(final ConfigurationContext context) throws InitializationException {
+        defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
+        defaultCategory = context.getProperty(DEFAULT_CATEGORY).getValue();
+        defaultMessage = context.getProperty(DEFAULT_MESSAGE).getValue();
+        includeFacts = context.getProperty(INCLUDE_FACTS).asBoolean();
+    }
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    public void execute(Action action, Map<String, Object> facts) {
+        throw new UnsupportedOperationException("This method is not supported.  The AlertHandler requires a Reporting Context");
+    }
+
+    @Override
+    public void execute(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
+        ComponentLog logger = getLogger();
+
+        if (propertyContext instanceof ReportingContext) {
+
+            ReportingContext context = (ReportingContext) propertyContext;
+            Map<String, String> attributes = action.getAttributes();
+            if (context.getBulletinRepository() != null) {
+                final String category = attributes.getOrDefault("category", defaultCategory);
+                final String message = getMessage(attributes.getOrDefault("message", defaultMessage), facts);
+                final String level = attributes.getOrDefault("severity", attributes.getOrDefault("logLevel", defaultLogLevel));
+                Severity severity;
+                try {
+                    severity = Severity.valueOf(level.toUpperCase());
+                } catch (IllegalArgumentException iae) {
+                    severity = Severity.INFO;
+                }
+                BulletinRepository bulletinRepository = context.getBulletinRepository();
+                bulletinRepository.addBulletin(context.createBulletin(category, severity, message));
+
+            } else {
+                logger.warn("Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
+            }
+
+        } else {
+            logger.warn("Reporting context was not provided to create bulletins.");
+        }
+
+    }
+
+    protected String getMessage(String alertMessage, Map<String, Object> facts){
+        if (includeFacts) {
+            final StringBuilder message = new StringBuilder(alertMessage);
+            final Set<String> fields = facts.keySet();
+            message.append("\n");
+            message.append("Alert Facts:\n");
+            fields.forEach(field -> {
+                message.append("Field: ");
+                message.append(field);
+                message.append(", Value: ");
+                message.append(facts.get(field));
+                message.append("\n");
+            });
+            return message.toString();
+        }else{
+            return alertMessage;
+        }
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java
index be41a5c..711c0ec 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java
@@ -39,7 +39,7 @@ import java.util.Map;
 
 @Tags({"rules", "rules engine", "action", "action handler", "expression language","MVEL","SpEL"})
 @CapabilityDescription("Executes an action containing an expression written in MVEL or SpEL. The action " +
-"is usually created by a rules engine. ")
+"is usually created by a rules engine. Action objects executed with this Handler should contain \"command\" and \"type\" attributes.")
 public class ExpressionHandler extends AbstractActionHandlerService {
 
     enum ExpresssionType {
@@ -47,9 +47,11 @@ public class ExpressionHandler extends AbstractActionHandlerService {
     }
 
     public static final PropertyDescriptor DEFAULT_EXPRESSION_LANGUAGE_TYPE = new PropertyDescriptor.Builder()
-            .name("Expression Language Type")
+            .name("default-expression-language-type")
+            .displayName("Default Expression Language Type")
             .required(true)
-            .description("The expression language that should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).")
+            .description("If an expression language type is not provided as an attribute within an Action, the default expression language that " +
+                    "should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).")
             .allowableValues(ExpresssionType.values())
             .defaultValue("MVEL")
             .build();
@@ -82,7 +84,7 @@ public class ExpressionHandler extends AbstractActionHandlerService {
         final String command = attributes.get("command");
         if(StringUtils.isNotEmpty(command)) {
             try {
-                final String type = attributes.get("type");
+                final String type = attributes.getOrDefault("type",this.type.toString());
                 ExpresssionType expresssionType = ExpresssionType.valueOf(type);
                 if (expresssionType.equals(ExpresssionType.MVEL)) {
                     executeMVEL(command, facts);
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java
index 666afbc..20e9351 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java
@@ -38,19 +38,32 @@ import java.util.Map;
 import java.util.Set;
 
 @Tags({"rules", "rules engine", "action", "action handler", "logging"})
-@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)")
+@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine).  " +
+        " Action objects executed with this Handler should contain \"logLevel\" and \"message\" attributes.")
 public class LogHandler extends AbstractActionHandlerService {
 
     public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder()
-            .name("Log Level")
+            .name("logger-default-log-level")
+            .displayName("Default Log Level")
             .required(true)
-            .description("The Log Level to use when logging the Attributes")
+            .description("If a log level is not provided as an attribute within an Action, the default log level will be used.")
             .allowableValues(DebugLevels.values())
             .defaultValue("info")
             .build();
 
+    public static final PropertyDescriptor DEFAULT_LOG_MESSAGE = new PropertyDescriptor.Builder()
+            .name("logger-default-log-message")
+            .displayName("Default Log Message")
+            .required(true)
+            .description("If a log message is not provided as an attribute within an Action, the default log message will be used.")
+            .defaultValue("Rules Action Triggered Log.")
+            .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .build();
+
     private static final PropertyDescriptor LOG_FACTS = new PropertyDescriptor.Builder()
-            .name("Log Facts")
+            .name("log-facts")
+            .displayName("Log Facts")
             .required(true)
             .description("If true, the log message will include the facts which triggered this log action.")
             .defaultValue("true")
@@ -58,7 +71,8 @@ public class LogHandler extends AbstractActionHandlerService {
             .build();
 
     private static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
-            .name("Log prefix")
+            .name("log-prefix")
+            .displayName("Log Prefix")
             .required(false)
             .description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")
             .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -69,6 +83,7 @@ public class LogHandler extends AbstractActionHandlerService {
     private String logPrefix;
     private Boolean logFacts;
     private String defaultLogLevel;
+    private String defaultLogMessage;
 
     @Override
     protected void init(ControllerServiceInitializationContext config) throws InitializationException {
@@ -77,6 +92,7 @@ public class LogHandler extends AbstractActionHandlerService {
         properties.add(LOG_PREFIX);
         properties.add(LOG_FACTS);
         properties.add(DEFAULT_LOG_LEVEL);
+        properties.add(DEFAULT_LOG_MESSAGE);
         this.properties = Collections.unmodifiableList(properties);
     }
 
@@ -85,6 +101,7 @@ public class LogHandler extends AbstractActionHandlerService {
         logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue();
         logFacts = context.getProperty(LOG_FACTS).asBoolean();
         defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
+        defaultLogMessage = context.getProperty(DEFAULT_LOG_MESSAGE).evaluateAttributeExpressions().getValue();
     }
 
     @Override
@@ -98,7 +115,7 @@ public class LogHandler extends AbstractActionHandlerService {
         Map<String, String> attributes = action.getAttributes();
         final String logLevel = attributes.get("logLevel");
         final LogLevel level = getLogLevel(logLevel, LogLevel.valueOf(defaultLogLevel));
-        final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : "Rules Action Triggered Log.";
+        final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : defaultLogMessage;
         final String factsMessage = createFactsLogMessage(facts, eventMessage);
         logger.log(level, factsMessage);
     }
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
index 760315e..ee3ca25 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
@@ -43,8 +43,9 @@ import java.util.List;
 import java.util.Map;
 import java.util.stream.Collectors;
 
-@Tags({"rules", "rules engine", "action", "action handler", "logging"})
-@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)")
+@Tags({"rules", "rules engine", "action", "action handler", "record", "record sink"})
+@CapabilityDescription("Sends fact information to sink based on a provided action (usually created by a rules engine)." +
+        "  Action objects executed with this Handler should contain \"sendZeroResult\" attribute.")
 public class RecordSinkHandler extends AbstractActionHandlerService{
 
     static final PropertyDescriptor RECORD_SINK_SERVICE = new PropertyDescriptor.Builder()
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 7d2967f..461f818 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,4 +15,5 @@
 org.apache.nifi.rules.handlers.ActionHandlerLookup
 org.apache.nifi.rules.handlers.ExpressionHandler
 org.apache.nifi.rules.handlers.LogHandler
-org.apache.nifi.rules.handlers.RecordSinkHandler
\ No newline at end of file
+org.apache.nifi.rules.handlers.RecordSinkHandler
+org.apache.nifi.rules.handlers.AlertHandler
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html
new file mode 100644
index 0000000..ea1a498
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html
@@ -0,0 +1,39 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>AlertHandler</title>
+    <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+    The AlertHandler is used to broadcast alerts (bulletins) as dictated by the action object.  Action objects can include attributes to configure
+    the handler otherwise default values will be used.  Possible attribute values are listed below.
+</p>
+<h3>ExpressionHandler Service Attributes</h3>
+<table title="AlertHandler Attributes" border="1" width="500">
+    <tr><th>Attribute</th><th>Description</th></tr>
+    <tr><td>category</td><td>The category the alert should be grouped under.</td></tr>
+    <tr><td>logLevel</td><td>Log Level for the alert.  Possible values are  trace, debug, info, warn, error.</td></tr>
+    <tr><td>message</td><td>Message for the alert.</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html
new file mode 100644
index 0000000..8dff1bc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html
@@ -0,0 +1,38 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>ExpressionHandler</title>
+    <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+    The ExpressionHandler is used to execute dynamic commands writtin in MVEL or SpEL expression language.  Action objects must include attributes to configure
+    the handler otherwise an exception will be thrown.  Possible attribute values are listed below.
+</p>
+<h3>ExpressionHandler Service Attributes</h3>
+<table title="ExpressionHandler Attributes" border="1" width="500">
+    <tr><th>Attribute</th><th>Description</th></tr>
+    <tr><td>type</td><td>The expression language type of the command to be executed.  Possible values are MVEL and SpEl (MVEL will be applied by default if type is not provided).</td></tr>
+    <tr><td>command</td><td>The expression language command that should be executed</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html
new file mode 100644
index 0000000..b9b487d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html
@@ -0,0 +1,38 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>LogHandler</title>
+    <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+    The LogHandler is used to execute actions that dictate to log a message and/or metrics.  LogHandler can be invoked with any Action object.
+    Action objects can include attributes to configure the LogHandler or rely on the handler's default settings.  Possible attribute values are listed below.
+</p>
+<h3>LogHandler Service Attributes</h3>
+<table title="LogHandler Attributes" border="1" width="500">
+    <tr><th>Attribute</th><th>Description</th></tr>
+    <tr><td>logLevel</td><td>Log Level for logged message.  Possible values are trace, debug, info, warn, error.</td></tr>
+    <tr><td>message</td><td>Message for log.</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html
new file mode 100644
index 0000000..4633f37
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html
@@ -0,0 +1,37 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>RecordSinkHandler</title>
+    <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+    The RecordSinkHandler is used to execute actions that send metrics information to a configured sink.  RecordSinkHandler can be invoked with any Action object.
+    Action objects can include attributes to configure the handler.  Possible attribute values are listed below.
+</p>
+<h3>RecordSinkHandler Service Attributes</h3>
+<table title="RecordSinkHandler Attributes" border="1" width="500">
+    <tr><th>Attribute</th><th>Description</th></tr>
+    <tr><td>sendZeroResults</td><td>Allow empty results to be sent to sink. Possible values are true and false (default is false).</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java
new file mode 100644
index 0000000..5fda9de
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules.handlers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinFactory;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.rules.Action;
+import org.apache.nifi.util.MockBulletinRepository;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyString;
+
+public class TestAlertHandler {
+
+    private TestRunner runner;
+    private MockComponentLog mockComponentLog;
+    private ReportingContext reportingContext;
+    private AlertHandler alertHandler;
+    private MockAlertBulletinRepository mockAlertBulletinRepository;
+
+    @Before
+    public void setup() throws InitializationException {
+        runner = TestRunners.newTestRunner(TestProcessor.class);
+        mockComponentLog = new MockComponentLog();
+        AlertHandler handler = new MockAlertHandler(mockComponentLog);
+        mockAlertBulletinRepository = new MockAlertBulletinRepository();
+        runner.addControllerService("MockAlertHandler", handler);
+        runner.enableControllerService(handler);
+        alertHandler = (AlertHandler) runner.getProcessContext()
+                .getControllerServiceLookup()
+                .getControllerService("MockAlertHandler");
+        reportingContext = Mockito.mock(ReportingContext.class);
+        Mockito.when(reportingContext.getBulletinRepository()).thenReturn(mockAlertBulletinRepository);
+        Mockito.when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString()))
+                .thenAnswer(invocation ->
+                BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2)));
+    }
+
+    @Test
+    public void testValidService() {
+        runner.assertValid(alertHandler);
+        assertThat(alertHandler, instanceOf(AlertHandler.class));
+    }
+
+    @Test
+    public void testAlertNoReportingContext() {
+
+        final Map<String, String> attributes = new HashMap<>();
+        final Map<String, Object> metrics = new HashMap<>();
+
+        attributes.put("logLevel", "INFO");
+        attributes.put("message", "This should be not sent as an alert!");
+        metrics.put("jvmHeap", "1000000");
+        metrics.put("cpu", "90");
+
+        final Action action = new Action();
+        action.setType("ALERT");
+        action.setAttributes(attributes);
+        try {
+            alertHandler.execute(action, metrics);
+            fail();
+        } catch (UnsupportedOperationException ex) {
+            assertTrue(true);
+        }
+    }
+
+    @Test
+    public void testAlertWithBulletinLevel() {
+
+        final Map<String, String> attributes = new HashMap<>();
+        final Map<String, Object> metrics = new HashMap<>();
+
+        final String category = "Rules Alert";
+        final String message = "This should be sent as an alert!";
+        final String severity =  "INFO";
+        attributes.put("category", category);
+        attributes.put("message", message);
+        attributes.put("severity", severity);
+        metrics.put("jvmHeap", "1000000");
+        metrics.put("cpu", "90");
+
+        final String expectedOutput = "This should be sent as an alert!\n" +
+                "Alert Facts:\n" +
+                "Field: cpu, Value: 90\n" +
+                "Field: jvmHeap, Value: 1000000\n";
+
+        final Action action = new Action();
+        action.setType("ALERT");
+        action.setAttributes(attributes);
+        alertHandler.execute(reportingContext, action, metrics);
+        BulletinRepository bulletinRepository = reportingContext.getBulletinRepository();
+        List<Bulletin> bulletins = bulletinRepository.findBulletinsForController();
+        assertFalse(bulletins.isEmpty());
+        Bulletin bulletin = bulletins.get(0);
+        assertEquals(bulletin.getCategory(), category);
+        assertEquals(bulletin.getMessage(), expectedOutput);
+        assertEquals(bulletin.getLevel(), severity);
+    }
+
+    @Test
+    public void testAlertWithDefaultValues() {
+
+        final Map<String, String> attributes = new HashMap<>();
+        final Map<String, Object> metrics = new HashMap<>();
+
+        final String category = "Rules Triggered Alert";
+        final String message = "An alert was triggered by a rules based action.";
+        final String severity =  "INFO";
+        metrics.put("jvmHeap", "1000000");
+        metrics.put("cpu", "90");
+
+        final String expectedOutput = "An alert was triggered by a rules-based action.\n" +
+                "Alert Facts:\n" +
+                "Field: cpu, Value: 90\n" +
+                "Field: jvmHeap, Value: 1000000\n";
+
+        final Action action = new Action();
+        action.setType("ALERT");
+        action.setAttributes(attributes);
+        alertHandler.execute(reportingContext, action, metrics);
+        BulletinRepository bulletinRepository = reportingContext.getBulletinRepository();
+        List<Bulletin> bulletins = bulletinRepository.findBulletinsForController();
+        assertFalse(bulletins.isEmpty());
+        Bulletin bulletin = bulletins.get(0);
+        assertEquals(bulletin.getCategory(), category);
+        assertEquals(bulletin.getMessage(), expectedOutput);
+        assertEquals(bulletin.getLevel(), severity);
+    }
+
+    @Test
+    public void testInvalidContext(){
+        final Map<String, String> attributes = new HashMap<>();
+        final Map<String, Object> metrics = new HashMap<>();
+
+        final String category = "Rules Alert";
+        final String message = "This should be sent as an alert!";
+        final String severity =  "INFO";
+        attributes.put("category", category);
+        attributes.put("message", message);
+        attributes.put("severity", severity);
+        metrics.put("jvmHeap", "1000000");
+        metrics.put("cpu", "90");
+
+        final Action action = new Action();
+        action.setType("ALERT");
+        action.setAttributes(attributes);
+        PropertyContext fakeContext = new PropertyContext() {
+            @Override
+            public PropertyValue getProperty(PropertyDescriptor descriptor) {
+                return null;
+            }
+
+            @Override
+            public Map<String, String> getAllProperties() {
+                return null;
+            }
+        };
+        alertHandler.execute(fakeContext, action, metrics);
+        final String debugMessage = mockComponentLog.getWarnMessage();
+        assertTrue(StringUtils.isNotEmpty(debugMessage));
+        assertEquals(debugMessage,"Reporting context was not provided to create bulletins.");
+    }
+
+    @Test
+    public void testEmptyBulletinRepository(){
+        final Map<String, String> attributes = new HashMap<>();
+        final Map<String, Object> metrics = new HashMap<>();
+
+        final String category = "Rules Alert";
+        final String message = "This should be sent as an alert!";
+        final String severity =  "INFO";
+        attributes.put("category", category);
+        attributes.put("message", message);
+        attributes.put("severity", severity);
+        metrics.put("jvmHeap", "1000000");
+        metrics.put("cpu", "90");
+
+        final Action action = new Action();
+        action.setType("ALERT");
+        action.setAttributes(attributes);
+        ReportingContext fakeContext = Mockito.mock(ReportingContext.class);
+        Mockito.when(reportingContext.getBulletinRepository()).thenReturn(null);
+        alertHandler.execute(fakeContext, action, metrics);
+        final String debugMessage = mockComponentLog.getWarnMessage();
+        assertTrue(StringUtils.isNotEmpty(debugMessage));
+        assertEquals(debugMessage,"Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
+    }
+
+    private static class MockAlertHandler extends AlertHandler {
+
+        private ComponentLog testLogger;
+
+        public MockAlertHandler(ComponentLog testLogger) {
+            this.testLogger = testLogger;
+        }
+
+        @Override
+        protected ComponentLog getLogger() {
+            return testLogger;
+        }
+
+    }
+
+    private static class MockAlertBulletinRepository extends MockBulletinRepository {
+
+        List<Bulletin> bulletinList;
+
+
+        public MockAlertBulletinRepository() {
+            bulletinList = new ArrayList<>();
+        }
+
+        @Override
+        public void addBulletin(Bulletin bulletin) {
+            bulletinList.add(bulletin);
+        }
+
+        @Override
+        public List<Bulletin> findBulletinsForController() {
+            return bulletinList;
+        }
+
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
index 1b00ce7..7cb3179 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
@@ -81,6 +81,12 @@
             <version>1.11.0-SNAPSHOT</version>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.apache.nifi</groupId>
+            <artifactId>nifi-rules-engine-service-api</artifactId>
+            <version>1.11.0-SNAPSHOT</version>
+            <scope>provided</scope>
+        </dependency>
     </dependencies>
     <build>
         <plugins>
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
new file mode 100644
index 0000000..5254abe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
+import org.apache.nifi.rules.Action;
+import org.apache.nifi.rules.PropertyContextActionHandler;
+import org.apache.nifi.rules.engine.RulesEngineService;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"reporting", "rules", "action", "action handler", "status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"})
+@CapabilityDescription("Triggers rules-driven actions based on metrics values ")
+public class MetricsEventReportingTask  extends AbstractReportingTask {
+
+    private List<PropertyDescriptor> properties;
+    private MetricsQueryService metricsQueryService;
+    private volatile RulesEngineService rulesEngineService;
+    private volatile PropertyContextActionHandler actionHandler;
+
+    @Override
+    protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+        return properties;
+    }
+
+    @Override
+    protected void init(final ReportingInitializationContext config) {
+        metricsQueryService = new MetricsSqlQueryService(getLogger());
+        final List<PropertyDescriptor> properties = new ArrayList<>();
+        properties.add(QueryMetricsUtil.QUERY);
+        properties.add(QueryMetricsUtil.RULES_ENGINE);
+        properties.add(QueryMetricsUtil.ACTION_HANDLER);
+        this.properties = Collections.unmodifiableList(properties);
+    }
+
+    @OnScheduled
+    public void setup(final ConfigurationContext context) throws IOException {
+        actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
+        rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
+    }
+
+    @Override
+    public void onTrigger(ReportingContext context) {
+        try {
+            final String query = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
+            fireRules(context, actionHandler, rulesEngineService, query);
+        } catch (Exception e) {
+            getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e);
+        }
+    }
+
+    private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception {
+        QueryResult queryResult = metricsQueryService.query(context, query);
+        getLogger().debug("Executing query: {}", new Object[]{ query });
+        ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
+        Record record;
+        try {
+            while ((record = recordSet.next()) != null) {
+                final Map<String, Object> facts = new HashMap<>();
+                for (String fieldName : record.getRawFieldNames()) {
+                    facts.put(fieldName, record.getValue(fieldName));
+                }
+                List<Action> actions = engine.fireRules(facts);
+                if(actions == null ||  actions.isEmpty()){
+                    getLogger().debug("No actions required for provided facts.");
+                } else {
+                    actions.forEach(action -> {
+                        actionHandler.execute(context, action,facts);
+                    });
+                }
+            }
+        } finally {
+            metricsQueryService.closeQuietly(recordSet);
+        }
+    }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
index ae0e326..6f3aa9e 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
@@ -16,21 +16,16 @@
  */
 package org.apache.nifi.reporting.sql;
 
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.parser.SqlParser;
 import org.apache.nifi.annotation.documentation.CapabilityDescription;
 import org.apache.nifi.annotation.documentation.Tags;
 import org.apache.nifi.annotation.lifecycle.OnScheduled;
 import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
 import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
 import org.apache.nifi.record.sink.RecordSinkService;
 import org.apache.nifi.reporting.AbstractReportingTask;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
 import org.apache.nifi.serialization.record.ResultSetRecordSet;
 import org.apache.nifi.util.StopWatch;
 
@@ -50,34 +45,6 @@ import java.util.concurrent.TimeUnit;
         + "query on the table when the capability is disabled will cause an error.")
 public class QueryNiFiReportingTask extends AbstractReportingTask {
 
-    static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder()
-            .name("sql-reporting-record-sink")
-            .displayName("Record Destination Service")
-            .description("Specifies the Controller Service to use for writing out the query result records to some destination.")
-            .identifiesControllerService(RecordSinkService.class)
-            .required(true)
-            .build();
-
-    static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
-            .name("sql-reporting-query")
-            .displayName("SQL Query")
-            .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
-                    + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the "
-                    + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
-            .required(true)
-            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
-            .addValidator(new SqlValidator())
-            .build();
-
-    static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder()
-            .name("sql-reporting-include-zero-record-results")
-            .displayName("Include Zero Record Results")
-            .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.")
-            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
-            .allowableValues("true", "false")
-            .defaultValue("false")
-            .required(true)
-            .build();
 
     private List<PropertyDescriptor> properties;
 
@@ -89,9 +56,9 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
     protected void init(final ReportingInitializationContext config) {
         metricsQueryService = new MetricsSqlQueryService(getLogger());
         final List<PropertyDescriptor> properties = new ArrayList<>();
-        properties.add(QUERY);
-        properties.add(RECORD_SINK);
-        properties.add(INCLUDE_ZERO_RECORD_RESULTS);
+        properties.add(QueryMetricsUtil.QUERY);
+        properties.add(QueryMetricsUtil.RECORD_SINK);
+        properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS);
         this.properties = Collections.unmodifiableList(properties);
     }
 
@@ -102,7 +69,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
 
     @OnScheduled
     public void setup(final ConfigurationContext context) throws IOException {
-        recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
+        recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
         recordSinkService.reset();
     }
 
@@ -110,7 +77,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
     public void onTrigger(ReportingContext context) {
         final StopWatch stopWatch = new StopWatch(true);
         try {
-            final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue();
+            final String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
             final QueryResult queryResult = metricsQueryService.query(context, sql);
             final ResultSetRecordSet recordSet;
 
@@ -129,7 +96,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
                 attributes.put("reporting.task.name", getName());
                 attributes.put("reporting.task.uuid", getIdentifier());
                 attributes.put("reporting.task.type", this.getClass().getSimpleName());
-                recordSinkService.sendData(recordSet, attributes, context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
+                recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
             } catch (Exception e) {
                 getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e);
                 return;
@@ -143,41 +110,4 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
         }
     }
 
-    private static class SqlValidator implements Validator {
-        @Override
-        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
-            if (context.isExpressionLanguagePresent(input)) {
-                return new ValidationResult.Builder()
-                        .input(input)
-                        .subject(subject)
-                        .valid(true)
-                        .explanation("Expression Language Present")
-                        .build();
-            }
-
-            final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
-
-            final SqlParser.Config config = SqlParser.configBuilder()
-                    .setLex(Lex.MYSQL_ANSI)
-                    .build();
-
-            final SqlParser parser = SqlParser.create(substituted, config);
-            try {
-                parser.parseStmt();
-                return new ValidationResult.Builder()
-                        .subject(subject)
-                        .input(input)
-                        .valid(true)
-                        .build();
-            } catch (final Exception e) {
-                return new ValidationResult.Builder()
-                        .subject(subject)
-                        .input(input)
-                        .valid(false)
-                        .explanation("Not a valid SQL Statement: " + e.getMessage())
-                        .build();
-            }
-        }
-    }
-
 }
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java
new file mode 100644
index 0000000..159daec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.util;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.rules.PropertyContextActionHandler;
+import org.apache.nifi.rules.engine.RulesEngineService;
+
+public class QueryMetricsUtil {
+
+    public static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder()
+            .name("sql-reporting-record-sink")
+            .displayName("Record Destination Service")
+            .description("Specifies the Controller Service to use for writing out the query result records to some destination.")
+            .identifiesControllerService(RecordSinkService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+            .name("sql-reporting-query")
+            .displayName("SQL Query")
+            .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
+                    + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the "
+                    + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
+            .required(true)
+            .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+            .addValidator(new SqlValidator())
+            .build();
+
+    public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder()
+            .name("sql-reporting-include-zero-record-results")
+            .displayName("Include Zero Record Results")
+            .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.")
+            .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+            .allowableValues("true", "false")
+            .defaultValue("false")
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor RULES_ENGINE = new PropertyDescriptor.Builder()
+            .name("rules-engine-service")
+            .displayName("Rules Engine Service")
+            .description("Specifies the Controller Service to use for applying rules to metrics.")
+            .identifiesControllerService(RulesEngineService.class)
+            .required(true)
+            .build();
+
+    public static final PropertyDescriptor ACTION_HANDLER = new PropertyDescriptor.Builder()
+            .name("action-handler")
+            .displayName("Event Action Handler")
+            .description("Handler that will execute the defined action returned from rules engine (if Action type is supported by the handler)")
+            .identifiesControllerService(PropertyContextActionHandler.class)
+            .required(true)
+            .build();
+
+    public static class SqlValidator implements Validator {
+        @Override
+        public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+            if (context.isExpressionLanguagePresent(input)) {
+                return new ValidationResult.Builder()
+                        .input(input)
+                        .subject(subject)
+                        .valid(true)
+                        .explanation("Expression Language Present")
+                        .build();
+            }
+
+            final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+
+            final SqlParser.Config config = SqlParser.configBuilder()
+                    .setLex(Lex.MYSQL_ANSI)
+                    .build();
+
+            final SqlParser parser = SqlParser.create(substituted, config);
+            try {
+                parser.parseStmt();
+                return new ValidationResult.Builder()
+                        .subject(subject)
+                        .input(input)
+                        .valid(true)
+                        .build();
+            } catch (final Exception e) {
+                return new ValidationResult.Builder()
+                        .subject(subject)
+                        .input(input)
+                        .valid(false)
+                        .explanation("Not a valid SQL Statement: " + e.getMessage())
+                        .build();
+            }
+        }
+    }
+
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index ec340e8..c3f5883 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -13,4 +13,5 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-org.apache.nifi.reporting.sql.QueryNiFiReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.sql.QueryNiFiReportingTask
+org.apache.nifi.reporting.sql.MetricsEventReportingTask
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
new file mode 100644
index 0000000..2392aab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
@@ -0,0 +1,34 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+      http://www.apache.org/licenses/LICENSE-2.0
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+<head>
+    <meta charset="utf-8" />
+    <title>Metrics Event Reporting Task</title>
+    <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+    <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+    This reporting task can be used to issue SQL queries against various NiFi metrics information,  submit returned data to a rules engine (which will determine if any actions should be performed)
+    and execute the prescribed actions using action handlers. This task requires a RulesEngineService (which will identify any actions that should be performed) and an ActionHandler which will execute the action(s).
+    A distinct ActionHandler can be used to service all events or an ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally action handler should be associated with the expected action types
+    returned from the rules engine.
+</p>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
similarity index 55%
copy from nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
copy to nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
index 9f4cb0a..83992e4 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
@@ -16,24 +16,32 @@
  */
 package org.apache.nifi.reporting.sql;
 
-
+import com.google.common.collect.Lists;
 import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
 import org.apache.nifi.components.PropertyDescriptor;
 import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
 import org.apache.nifi.controller.ConfigurationContext;
 import org.apache.nifi.controller.status.ConnectionStatus;
 import org.apache.nifi.controller.status.ProcessGroupStatus;
 import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
 import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.record.sink.MockRecordSinkService;
-import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.reporting.BulletinRepository;
 import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingInitializationContext;
-import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
+import org.apache.nifi.rules.Action;
+import org.apache.nifi.rules.MockPropertyContextActionHandler;
+import org.apache.nifi.rules.PropertyContextActionHandler;
+import org.apache.nifi.rules.engine.MockRulesEngineService;
+import org.apache.nifi.rules.engine.RulesEngineService;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.Tuple;
 import org.junit.Before;
 import org.junit.Test;
 import org.mockito.Mockito;
@@ -46,23 +54,23 @@ import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
 import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
 
 import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestQueryNiFiReportingTask {
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
 
+public class TestMetricsEventReportingTask {
     private ReportingContext context;
-    private MockQueryNiFiReportingTask reportingTask;
-    private MockRecordSinkService mockRecordSinkService;
+    private MockMetricsEventReportingTask reportingTask;
+    private MockPropertyContextActionHandler actionHandler;
+    private MockRulesEngineService rulesEngineService;
     private ProcessGroupStatus status;
 
     @Before
     public void setup() {
-        mockRecordSinkService = new MockRecordSinkService();
         status = new ProcessGroupStatus();
+        actionHandler = new MockPropertyContextActionHandler();
         status.setId("1234");
         status.setFlowFilesReceived(5);
         status.setBytesReceived(10000);
@@ -83,15 +91,21 @@ public class TestQueryNiFiReportingTask {
         processorStatuses.add(procStatus);
         status.setProcessorStatus(processorStatuses);
 
+        ConnectionStatusPredictions connectionStatusPredictions = new ConnectionStatusPredictions();
+        connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000);
+        connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000);
+        connectionStatusPredictions.setNextPredictedQueuedCount(1000000000);
+        connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L);
+
         ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
         root1ConnectionStatus.setId("root1");
         root1ConnectionStatus.setQueuedCount(1000);
-        root1ConnectionStatus.setBackPressureObjectThreshold(1000);
+        root1ConnectionStatus.setPredictions(connectionStatusPredictions);
 
         ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
         root2ConnectionStatus.setId("root2");
         root2ConnectionStatus.setQueuedCount(500);
-        root2ConnectionStatus.setBackPressureObjectThreshold(1000);
+        root2ConnectionStatus.setPredictions(connectionStatusPredictions);
 
         Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>();
         rootConnectionStatuses.add(root1ConnectionStatus);
@@ -138,120 +152,42 @@ public class TestQueryNiFiReportingTask {
     @Test
     public void testConnectionStatusTable() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
+        properties.put(QueryMetricsUtil.QUERY, "select connectionId, predictedQueuedCount, predictedTimeToBytesBackpressureMillis from CONNECTION_STATUS_PREDICTIONS");
         reportingTask = initTask(properties);
         reportingTask.onTrigger(context);
+        List<Map<String,Object>> metricsList = actionHandler.getRows();
+        List<Tuple<String, Action>> defaultLogActions = actionHandler.getDefaultActionsByType("LOG");
+        List<Tuple<String, Action>> defaultAlertActions = actionHandler.getDefaultActionsByType("ALERT");
+        List<PropertyContext> propertyContexts = actionHandler.getPropertyContexts();
+        assertFalse(metricsList.isEmpty());
+        assertEquals(2,defaultLogActions.size());
+        assertEquals(2,defaultAlertActions.size());
+        assertEquals(4,propertyContexts.size());
 
-        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
-        assertEquals(4, rows.size());
-        // Validate the first row
-        Map<String, Object> row = rows.get(0);
-        assertEquals(3, row.size()); // Only projected 2 columns
-        Object id = row.get("id");
-        assertTrue(id instanceof String);
-        assertEquals("nested", id);
-        assertEquals(1001, row.get("queuedCount"));
-        // Validate the second row
-        row = rows.get(1);
-        id = row.get("id");
-        assertEquals("root1", id);
-        assertEquals(1000, row.get("queuedCount"));
-        assertEquals(true, row.get("isBackPressureEnabled"));
-        // Validate the third row
-        row = rows.get(2);
-        id = row.get("id");
-        assertEquals("root2", id);
-        assertEquals(500, row.get("queuedCount"));
-        assertEquals(false, row.get("isBackPressureEnabled"));
-        // Validate the fourth row
-        row = rows.get(3);
-        id = row.get("id");
-        assertEquals("nested2", id);
-        assertEquals(3, row.get("queuedCount"));
     }
 
-    @Test
-    public void testJvmMetricsTable() throws IOException, InitializationException {
-        final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select "
-                + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
-                MetricNames.JVM_THREAD_COUNT,
-                MetricNames.JVM_THREAD_STATES_BLOCKED,
-                MetricNames.JVM_THREAD_STATES_RUNNABLE,
-                MetricNames.JVM_THREAD_STATES_TERMINATED,
-                MetricNames.JVM_THREAD_STATES_TIMED_WAITING,
-                MetricNames.JVM_UPTIME,
-                MetricNames.JVM_HEAP_USED,
-                MetricNames.JVM_HEAP_USAGE,
-                MetricNames.JVM_NON_HEAP_USAGE,
-                MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) -> s.replace(".", "_")).collect(Collectors.joining(","))
-                + " from JVM_METRICS");
-        reportingTask = initTask(properties);
-        reportingTask.onTrigger(context);
-
-        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
-        assertEquals(1, rows.size());
-        Map<String,Object> row = rows.get(0);
-        assertEquals(11, row.size());
-        assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".","_")) instanceof Integer);
-        assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".","_")) instanceof Double);
-    }
-
-    @Test
-    public void testProcessGroupStatusTable() throws IOException, InitializationException {
-        final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
-        reportingTask = initTask(properties);
-        reportingTask.onTrigger(context);
-
-        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
-        assertEquals(4, rows.size());
-        // Validate the first row
-        Map<String, Object> row = rows.get(0);
-        assertEquals(20, row.size());
-        assertEquals(1L, row.get("bytesRead"));
-        // Validate the second row
-        row = rows.get(1);
-        assertEquals(1234L, row.get("bytesRead"));
-        // Validate the third row
-        row = rows.get(2);
-        assertEquals(12345L, row.get("bytesRead"));
-        // Validate the fourth row
-        row = rows.get(3);
-        assertEquals(20000L, row.get("bytesRead"));
-    }
-
-    @Test
-    public void testNoResults() throws IOException, InitializationException {
-        final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
-        reportingTask = initTask(properties);
-        reportingTask.onTrigger(context);
-
-        List<Map<String, Object>> rows = mockRecordSinkService.getRows();
-        assertEquals(0, rows.size());
-    }
-
-    private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
+    private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
 
         final ComponentLog logger = Mockito.mock(ComponentLog.class);
-        reportingTask = new MockQueryNiFiReportingTask();
+        final BulletinRepository bulletinRepository = Mockito.mock(BulletinRepository.class);
+        reportingTask = new MockMetricsEventReportingTask();
         final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
         Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
         Mockito.when(initContext.getLogger()).thenReturn(logger);
         reportingTask.initialize(initContext);
         Map<PropertyDescriptor, String> properties = new HashMap<>();
+
         for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) {
             properties.put(descriptor, descriptor.getDefaultValue());
         }
         properties.putAll(customProperties);
 
         context = Mockito.mock(ReportingContext.class);
+        Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
         Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask));
+        Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
+        Mockito.when(context.createBulletin(anyString(),any(Severity.class), anyString())).thenReturn(null);
+
         Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
             final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
             return new MockPropertyValue(properties.get(descriptor));
@@ -262,17 +198,27 @@ public class TestQueryNiFiReportingTask {
         Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
 
         final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
-        mockRecordSinkService = new MockRecordSinkService();
-        Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
-        Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
+        actionHandler = new MockPropertyContextActionHandler();
+        Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
+
+        Action action1 = new Action();
+        action1.setType("LOG");
+        Action action2 = new Action();
+        action2.setType("ALERT");
+
+        final PropertyValue resValue = Mockito.mock(StandardPropertyValue.class);
+        rulesEngineService = new MockRulesEngineService(Lists.newArrayList(action1,action2));
+        Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
 
         ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
-        Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
+        Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue);
+        Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue);
         reportingTask.setup(configContext);
 
         return reportingTask;
     }
 
-    private static final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask {
+    private static final class MockMetricsEventReportingTask extends MetricsEventReportingTask {
+
     }
-}
\ No newline at end of file
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index 9f4cb0a..eae9e91 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -31,6 +31,7 @@ import org.apache.nifi.reporting.EventAccess;
 import org.apache.nifi.reporting.InitializationException;
 import org.apache.nifi.reporting.ReportingContext;
 import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
 import org.apache.nifi.reporting.util.metrics.MetricNames;
 import org.apache.nifi.state.MockStateManager;
 import org.apache.nifi.util.MockPropertyValue;
@@ -138,8 +139,8 @@ public class TestQueryNiFiReportingTask {
     @Test
     public void testConnectionStatusTable() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
         reportingTask = initTask(properties);
         reportingTask.onTrigger(context);
 
@@ -174,8 +175,8 @@ public class TestQueryNiFiReportingTask {
     @Test
     public void testJvmMetricsTable() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select "
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select "
                 + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
                 MetricNames.JVM_THREAD_COUNT,
                 MetricNames.JVM_THREAD_STATES_BLOCKED,
@@ -202,8 +203,8 @@ public class TestQueryNiFiReportingTask {
     @Test
     public void testProcessGroupStatusTable() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
         reportingTask = initTask(properties);
         reportingTask.onTrigger(context);
 
@@ -227,8 +228,8 @@ public class TestQueryNiFiReportingTask {
     @Test
     public void testNoResults() throws IOException, InitializationException {
         final Map<PropertyDescriptor, String> properties = new HashMap<>();
-        properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
-        properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
+        properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+        properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
         reportingTask = initTask(properties);
         reportingTask.onTrigger(context);
 
@@ -263,11 +264,11 @@ public class TestQueryNiFiReportingTask {
 
         final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
         mockRecordSinkService = new MockRecordSinkService();
-        Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
+        Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
         Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
 
         ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
-        Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
+        Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
         reportingTask.setup(configContext);
 
         return reportingTask;
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
new file mode 100644
index 0000000..323317d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.Tuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler{
+
+    private List<Map<String, Object>> rows = new ArrayList<>();
+    private List<Tuple<String,Action>> defaultActions = new ArrayList<>();
+    private List<PropertyContext> propertyContexts = new ArrayList<>();
+
+
+    @Override
+    public void execute(PropertyContext context, Action action, Map<String, Object> facts) {
+        propertyContexts.add(context);
+        execute(action, facts);
+    }
+
+    @Override
+    public void execute(Action action, Map<String, Object> facts) {
+        rows.add(facts);
+        defaultActions.add( new Tuple<>(action.getType(),action));
+    }
+
+
+    @Override
+    public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
+
+    }
+
+    public List<Map<String, Object>> getRows() {
+        return rows;
+    }
+
+    public List<Tuple<String, Action>> getDefaultActions() {
+        return defaultActions;
+    }
+
+    public List<Tuple<String,Action>> getDefaultActionsByType(final String type){
+        return defaultActions.stream().filter(stringActionTuple -> stringActionTuple
+                .getKey().equalsIgnoreCase(type)).collect(Collectors.toList());
+    }
+
+    public List<PropertyContext> getPropertyContexts() {
+        return propertyContexts;
+    }
+
+    @Override
+    public String getIdentifier() {
+        return "MockPropertyContextActionHandler";
+    }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
new file mode 100644
index 0000000..e3ccc73
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules.engine;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.rules.Action;
+
+import java.util.List;
+import java.util.Map;
+
+public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService {
+    private List<Action> actions;
+
+    public MockRulesEngineService(List<Action> actions) {
+        this.actions = actions;
+    }
+
+    @Override
+    public List<Action> fireRules(Map<String, Object> facts) {
+        return actions;
+    }
+
+    @Override
+    public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
+    }
+
+    @Override
+    public String getIdentifier() {
+        return "MockRulesEngineService";
+    }
+}