You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@nifi.apache.org by ma...@apache.org on 2019/11/08 15:26:58 UTC
[nifi] branch master updated: NIFI-6842 - Introduce
MetricsEventReportingTask
This is an automated email from the ASF dual-hosted git repository.
mattyb149 pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/nifi.git
The following commit(s) were added to refs/heads/master by this push:
new c998a72 NIFI-6842 - Introduce MetricsEventReportingTask
c998a72 is described below
commit c998a7259aca73e716433d0582b30d7ed986c4b2
Author: Yolanda M. Davis <yo...@gmail.com>
AuthorDate: Tue Nov 5 14:25:58 2019 -0500
NIFI-6842 - Introduce MetricsEventReportingTask
NIFI-6842 - Added AlertHandler for bulletin reporting. Update ReportingTask meta data.
NIFI-6842 - corrected display names in action handlers, included metrics option for alert handlers, small refactor in reporting task
NIFI-6842 - updated docs and tags
NIFI-6842 - Added documentation for handlers.
Signed-off-by: Matthew Burgess <ma...@apache.org>
This closes #3874
---
nifi-assembly/pom.xml | 12 +
.../apache/nifi/rules/handlers/AlertHandler.java | 169 +++++++++++++
.../nifi/rules/handlers/ExpressionHandler.java | 10 +-
.../org/apache/nifi/rules/handlers/LogHandler.java | 29 ++-
.../nifi/rules/handlers/RecordSinkHandler.java | 5 +-
.../org.apache.nifi.controller.ControllerService | 3 +-
.../additionalDetails.html | 39 +++
.../additionalDetails.html | 38 +++
.../additionalDetails.html | 38 +++
.../additionalDetails.html | 37 +++
.../nifi/rules/handlers/TestAlertHandler.java | 264 +++++++++++++++++++++
.../nifi-sql-reporting-tasks/pom.xml | 6 +
.../reporting/sql/MetricsEventReportingTask.java | 105 ++++++++
.../nifi/reporting/sql/QueryNiFiReportingTask.java | 84 +------
.../nifi/reporting/sql/util/QueryMetricsUtil.java | 114 +++++++++
.../org.apache.nifi.reporting.ReportingTask | 3 +-
.../additionalDetails.html | 34 +++
...ask.java => TestMetricsEventReportingTask.java} | 176 +++++---------
.../reporting/sql/TestQueryNiFiReportingTask.java | 21 +-
.../rules/MockPropertyContextActionHandler.java | 76 ++++++
.../nifi/rules/engine/MockRulesEngineService.java | 47 ++++
21 files changed, 1094 insertions(+), 216 deletions(-)
diff --git a/nifi-assembly/pom.xml b/nifi-assembly/pom.xml
index f3381a7..b036e65 100755
--- a/nifi-assembly/pom.xml
+++ b/nifi-assembly/pom.xml
@@ -878,6 +878,18 @@ language governing permissions and limitations under the License. -->
<version>1.11.0-SNAPSHOT</version>
<type>nar</type>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-easyrules-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-rules-action-handler-nar</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <type>nar</type>
+ </dependency>
</dependencies>
</profile>
<profile>
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java
new file mode 100644
index 0000000..234ea3c
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/AlertHandler.java
@@ -0,0 +1,169 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules.handlers;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnEnabled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.processor.util.StandardValidators;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.rules.Action;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+@Tags({"rules", "rules engine", "action", "action handler", "logging", "alerts", "bulletins"})
+@CapabilityDescription("Creates alerts as bulletins based on a provided action (usually created by a rules engine). " +
+ "Action objects executed with this Handler should contain \"category\", \"message\", and \"logLevel\" attributes.")
+public class AlertHandler extends AbstractActionHandlerService {
+
+ public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder()
+ .name("alert-default-log-level")
+ .displayName("Default Alert Log Level")
+ .required(true)
+ .description("The default Log Level that will be used to log an alert message" +
+ " if a log level was not provided in the received action's attributes.")
+ .allowableValues(DebugLevels.values())
+ .defaultValue("info")
+ .build();
+
+ public static final PropertyDescriptor DEFAULT_CATEGORY = new PropertyDescriptor.Builder()
+ .name("alert-default-category")
+ .displayName("Default Category")
+ .required(true)
+ .description("The default category to use when logging alert message "+
+ " if a category was not provided in the received action's attributes.")
+ .defaultValue("Rules Triggered Alert")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ public static final PropertyDescriptor DEFAULT_MESSAGE = new PropertyDescriptor.Builder()
+ .name("alert-default-message")
+ .displayName("Default Message")
+ .required(true)
+ .description("The default message to include in alert if an alert message was " +
+ "not provided in the received action's attributes")
+ .defaultValue("An alert was triggered by a rules-based action.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .build();
+
+ private static final PropertyDescriptor INCLUDE_FACTS = new PropertyDescriptor.Builder()
+ .name("alert-include-facts")
+ .displayName("Include Fact Data")
+ .required(true)
+ .description("If true, the alert message will include the facts which triggered this action. Default is false.")
+ .defaultValue("true")
+ .allowableValues("true", "false")
+ .build();
+
+ private List<PropertyDescriptor> properties;
+ private String defaultCategory;
+ private String defaultLogLevel;
+ private String defaultMessage;
+ private Boolean includeFacts;
+
+ @Override
+ protected void init(ControllerServiceInitializationContext config) throws InitializationException {
+ super.init(config);
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(DEFAULT_LOG_LEVEL);
+ properties.add(DEFAULT_CATEGORY);
+ properties.add(DEFAULT_MESSAGE);
+ properties.add(INCLUDE_FACTS);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @OnEnabled
+ public void onEnabled(final ConfigurationContext context) throws InitializationException {
+ defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
+ defaultCategory = context.getProperty(DEFAULT_CATEGORY).getValue();
+ defaultMessage = context.getProperty(DEFAULT_MESSAGE).getValue();
+ includeFacts = context.getProperty(INCLUDE_FACTS).asBoolean();
+ }
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ public void execute(Action action, Map<String, Object> facts) {
+ throw new UnsupportedOperationException("This method is not supported. The AlertHandler requires a Reporting Context");
+ }
+
+ @Override
+ public void execute(PropertyContext propertyContext, Action action, Map<String, Object> facts) {
+ ComponentLog logger = getLogger();
+
+ if (propertyContext instanceof ReportingContext) {
+
+ ReportingContext context = (ReportingContext) propertyContext;
+ Map<String, String> attributes = action.getAttributes();
+ if (context.getBulletinRepository() != null) {
+ final String category = attributes.getOrDefault("category", defaultCategory);
+ final String message = getMessage(attributes.getOrDefault("message", defaultMessage), facts);
+ final String level = attributes.getOrDefault("severity", attributes.getOrDefault("logLevel", defaultLogLevel));
+ Severity severity;
+ try {
+ severity = Severity.valueOf(level.toUpperCase());
+ } catch (IllegalArgumentException iae) {
+ severity = Severity.INFO;
+ }
+ BulletinRepository bulletinRepository = context.getBulletinRepository();
+ bulletinRepository.addBulletin(context.createBulletin(category, severity, message));
+
+ } else {
+ logger.warn("Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
+ }
+
+ } else {
+ logger.warn("Reporting context was not provided to create bulletins.");
+ }
+
+ }
+
+ protected String getMessage(String alertMessage, Map<String, Object> facts){
+ if (includeFacts) {
+ final StringBuilder message = new StringBuilder(alertMessage);
+ final Set<String> fields = facts.keySet();
+ message.append("\n");
+ message.append("Alert Facts:\n");
+ fields.forEach(field -> {
+ message.append("Field: ");
+ message.append(field);
+ message.append(", Value: ");
+ message.append(facts.get(field));
+ message.append("\n");
+ });
+ return message.toString();
+ }else{
+ return alertMessage;
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java
index be41a5c..711c0ec 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/ExpressionHandler.java
@@ -39,7 +39,7 @@ import java.util.Map;
@Tags({"rules", "rules engine", "action", "action handler", "expression language","MVEL","SpEL"})
@CapabilityDescription("Executes an action containing an expression written in MVEL or SpEL. The action " +
-"is usually created by a rules engine. ")
+"is usually created by a rules engine. Action objects executed with this Handler should contain \"command\" and \"type\" attributes.")
public class ExpressionHandler extends AbstractActionHandlerService {
enum ExpresssionType {
@@ -47,9 +47,11 @@ public class ExpressionHandler extends AbstractActionHandlerService {
}
public static final PropertyDescriptor DEFAULT_EXPRESSION_LANGUAGE_TYPE = new PropertyDescriptor.Builder()
- .name("Expression Language Type")
+ .name("default-expression-language-type")
+ .displayName("Default Expression Language Type")
.required(true)
- .description("The expression language that should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).")
+ .description("If an expression language type is not provided as an attribute within an Action, the default expression language that " +
+ "should be used to compile and execute action. Supported languages are MVEL and Spring Expression Language (SpEL).")
.allowableValues(ExpresssionType.values())
.defaultValue("MVEL")
.build();
@@ -82,7 +84,7 @@ public class ExpressionHandler extends AbstractActionHandlerService {
final String command = attributes.get("command");
if(StringUtils.isNotEmpty(command)) {
try {
- final String type = attributes.get("type");
+ final String type = attributes.getOrDefault("type",this.type.toString());
ExpresssionType expresssionType = ExpresssionType.valueOf(type);
if (expresssionType.equals(ExpresssionType.MVEL)) {
executeMVEL(command, facts);
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java
index 666afbc..20e9351 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/LogHandler.java
@@ -38,19 +38,32 @@ import java.util.Map;
import java.util.Set;
@Tags({"rules", "rules engine", "action", "action handler", "logging"})
-@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)")
+@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine). " +
+ " Action objects executed with this Handler should contain \"logLevel\" and \"message\" attributes.")
public class LogHandler extends AbstractActionHandlerService {
public static final PropertyDescriptor DEFAULT_LOG_LEVEL = new PropertyDescriptor.Builder()
- .name("Log Level")
+ .name("logger-default-log-level")
+ .displayName("Default Log Level")
.required(true)
- .description("The Log Level to use when logging the Attributes")
+ .description("If a log level is not provided as an attribute within an Action, the default log level will be used.")
.allowableValues(DebugLevels.values())
.defaultValue("info")
.build();
+ public static final PropertyDescriptor DEFAULT_LOG_MESSAGE = new PropertyDescriptor.Builder()
+ .name("logger-default-log-message")
+ .displayName("Default Log Message")
+ .required(true)
+ .description("If a log message is not provided as an attribute within an Action, the default log message will be used.")
+ .defaultValue("Rules Action Triggered Log.")
+ .addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .build();
+
private static final PropertyDescriptor LOG_FACTS = new PropertyDescriptor.Builder()
- .name("Log Facts")
+ .name("log-facts")
+ .displayName("Log Facts")
.required(true)
.description("If true, the log message will include the facts which triggered this log action.")
.defaultValue("true")
@@ -58,7 +71,8 @@ public class LogHandler extends AbstractActionHandlerService {
.build();
private static final PropertyDescriptor LOG_PREFIX = new PropertyDescriptor.Builder()
- .name("Log prefix")
+ .name("log-prefix")
+ .displayName("Log Prefix")
.required(false)
.description("Log prefix appended to the log lines. It helps to distinguish the output of multiple LogAttribute processors.")
.addValidator(StandardValidators.NON_EMPTY_VALIDATOR)
@@ -69,6 +83,7 @@ public class LogHandler extends AbstractActionHandlerService {
private String logPrefix;
private Boolean logFacts;
private String defaultLogLevel;
+ private String defaultLogMessage;
@Override
protected void init(ControllerServiceInitializationContext config) throws InitializationException {
@@ -77,6 +92,7 @@ public class LogHandler extends AbstractActionHandlerService {
properties.add(LOG_PREFIX);
properties.add(LOG_FACTS);
properties.add(DEFAULT_LOG_LEVEL);
+ properties.add(DEFAULT_LOG_MESSAGE);
this.properties = Collections.unmodifiableList(properties);
}
@@ -85,6 +101,7 @@ public class LogHandler extends AbstractActionHandlerService {
logPrefix = context.getProperty(LOG_PREFIX).evaluateAttributeExpressions().getValue();
logFacts = context.getProperty(LOG_FACTS).asBoolean();
defaultLogLevel = context.getProperty(DEFAULT_LOG_LEVEL).getValue().toUpperCase();
+ defaultLogMessage = context.getProperty(DEFAULT_LOG_MESSAGE).evaluateAttributeExpressions().getValue();
}
@Override
@@ -98,7 +115,7 @@ public class LogHandler extends AbstractActionHandlerService {
Map<String, String> attributes = action.getAttributes();
final String logLevel = attributes.get("logLevel");
final LogLevel level = getLogLevel(logLevel, LogLevel.valueOf(defaultLogLevel));
- final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : "Rules Action Triggered Log.";
+ final String eventMessage = StringUtils.isNotEmpty(attributes.get("message")) ? attributes.get("message") : defaultLogMessage;
final String factsMessage = createFactsLogMessage(facts, eventMessage);
logger.log(level, factsMessage);
}
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
index 760315e..ee3ca25 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/java/org/apache/nifi/rules/handlers/RecordSinkHandler.java
@@ -43,8 +43,9 @@ import java.util.List;
import java.util.Map;
import java.util.stream.Collectors;
-@Tags({"rules", "rules engine", "action", "action handler", "logging"})
-@CapabilityDescription("Logs messages and fact information based on a provided action (usually created by a rules engine)")
+@Tags({"rules", "rules engine", "action", "action handler", "record", "record sink"})
+@CapabilityDescription("Sends fact information to sink based on a provided action (usually created by a rules engine)." +
+ " Action objects executed with this Handler should contain \"sendZeroResult\" attribute.")
public class RecordSinkHandler extends AbstractActionHandlerService{
static final PropertyDescriptor RECORD_SINK_SERVICE = new PropertyDescriptor.Builder()
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
index 7d2967f..461f818 100644
--- a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/META-INF/services/org.apache.nifi.controller.ControllerService
@@ -15,4 +15,5 @@
org.apache.nifi.rules.handlers.ActionHandlerLookup
org.apache.nifi.rules.handlers.ExpressionHandler
org.apache.nifi.rules.handlers.LogHandler
-org.apache.nifi.rules.handlers.RecordSinkHandler
\ No newline at end of file
+org.apache.nifi.rules.handlers.RecordSinkHandler
+org.apache.nifi.rules.handlers.AlertHandler
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html
new file mode 100644
index 0000000..ea1a498
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.AlertHandler/additionalDetails.html
@@ -0,0 +1,39 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>AlertHandler</title>
+ <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+ <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+ The AlertHandler is used to broadcast alerts (bulletins) as dictated by the action object. Action objects can include attributes to configure
+ the handler otherwise default values will be used. Possible attribute values are listed below.
+</p>
+<h3>ExpressionHandler Service Attributes</h3>
+<table title="AlertHandler Attributes" border="1" width="500">
+ <tr><th>Attribute</th><th>Description</th></tr>
+ <tr><td>category</td><td>The category the alert should be grouped under.</td></tr>
+ <tr><td>logLevel</td><td>Log Level for the alert. Possible values are trace, debug, info, warn, error.</td></tr>
+ <tr><td>message</td><td>Message for the alert.</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html
new file mode 100644
index 0000000..8dff1bc
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.ExpressionHandler/additionalDetails.html
@@ -0,0 +1,38 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>ExpressionHandler</title>
+ <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+ <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+ The ExpressionHandler is used to execute dynamic commands writtin in MVEL or SpEL expression language. Action objects must include attributes to configure
+ the handler otherwise an exception will be thrown. Possible attribute values are listed below.
+</p>
+<h3>ExpressionHandler Service Attributes</h3>
+<table title="ExpressionHandler Attributes" border="1" width="500">
+ <tr><th>Attribute</th><th>Description</th></tr>
+ <tr><td>type</td><td>The expression language type of the command to be executed. Possible values are MVEL and SpEl (MVEL will be applied by default if type is not provided).</td></tr>
+ <tr><td>command</td><td>The expression language command that should be executed</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html
new file mode 100644
index 0000000..b9b487d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.LogHandler/additionalDetails.html
@@ -0,0 +1,38 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>LogHandler</title>
+ <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+ <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+ The LogHandler is used to execute actions that dictate to log a message and/or metrics. LogHandler can be invoked with any Action object.
+ Action objects can include attributes to configure the LogHandler or rely on the handler's default settings. Possible attribute values are listed below.
+</p>
+<h3>LogHandler Service Attributes</h3>
+<table title="LogHandler Attributes" border="1" width="500">
+ <tr><th>Attribute</th><th>Description</th></tr>
+ <tr><td>logLevel</td><td>Log Level for logged message. Possible values are trace, debug, info, warn, error.</td></tr>
+ <tr><td>message</td><td>Message for log.</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html
new file mode 100644
index 0000000..4633f37
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/main/resources/docs/org.apache.nifi.rules.handlers.RecordSinkHandler/additionalDetails.html
@@ -0,0 +1,37 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>RecordSinkHandler</title>
+ <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+ <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+ The RecordSinkHandler is used to execute actions that send metrics information to a configured sink. RecordSinkHandler can be invoked with any Action object.
+ Action objects can include attributes to configure the handler. Possible attribute values are listed below.
+</p>
+<h3>RecordSinkHandler Service Attributes</h3>
+<table title="RecordSinkHandler Attributes" border="1" width="500">
+ <tr><th>Attribute</th><th>Description</th></tr>
+ <tr><td>sendZeroResults</td><td>Allow empty results to be sent to sink. Possible values are true and false (default is false).</td></tr>
+</table>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java
new file mode 100644
index 0000000..5fda9de
--- /dev/null
+++ b/nifi-nar-bundles/nifi-rules-action-handler-bundle/nifi-rules-action-handler-service/src/test/java/org/apache/nifi/rules/handlers/TestAlertHandler.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules.handlers;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.logging.ComponentLog;
+import org.apache.nifi.reporting.Bulletin;
+import org.apache.nifi.reporting.BulletinFactory;
+import org.apache.nifi.reporting.BulletinRepository;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.rules.Action;
+import org.apache.nifi.util.MockBulletinRepository;
+import org.apache.nifi.util.TestRunner;
+import org.apache.nifi.util.TestRunners;
+import org.junit.Before;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import static junit.framework.TestCase.assertEquals;
+import static junit.framework.TestCase.assertTrue;
+import static org.hamcrest.core.IsInstanceOf.instanceOf;
+
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertThat;
+import static org.junit.Assert.fail;
+import static org.mockito.ArgumentMatchers.anyString;
+
+public class TestAlertHandler {
+
+ private TestRunner runner;
+ private MockComponentLog mockComponentLog;
+ private ReportingContext reportingContext;
+ private AlertHandler alertHandler;
+ private MockAlertBulletinRepository mockAlertBulletinRepository;
+
+ @Before
+ public void setup() throws InitializationException {
+ runner = TestRunners.newTestRunner(TestProcessor.class);
+ mockComponentLog = new MockComponentLog();
+ AlertHandler handler = new MockAlertHandler(mockComponentLog);
+ mockAlertBulletinRepository = new MockAlertBulletinRepository();
+ runner.addControllerService("MockAlertHandler", handler);
+ runner.enableControllerService(handler);
+ alertHandler = (AlertHandler) runner.getProcessContext()
+ .getControllerServiceLookup()
+ .getControllerService("MockAlertHandler");
+ reportingContext = Mockito.mock(ReportingContext.class);
+ Mockito.when(reportingContext.getBulletinRepository()).thenReturn(mockAlertBulletinRepository);
+ Mockito.when(reportingContext.createBulletin(anyString(), Mockito.any(Severity.class), anyString()))
+ .thenAnswer(invocation ->
+ BulletinFactory.createBulletin(invocation.getArgument(0), invocation.getArgument(1).toString(), invocation.getArgument(2)));
+ }
+
+ @Test
+ public void testValidService() {
+ runner.assertValid(alertHandler);
+ assertThat(alertHandler, instanceOf(AlertHandler.class));
+ }
+
+ @Test
+ public void testAlertNoReportingContext() {
+
+ final Map<String, String> attributes = new HashMap<>();
+ final Map<String, Object> metrics = new HashMap<>();
+
+ attributes.put("logLevel", "INFO");
+ attributes.put("message", "This should be not sent as an alert!");
+ metrics.put("jvmHeap", "1000000");
+ metrics.put("cpu", "90");
+
+ final Action action = new Action();
+ action.setType("ALERT");
+ action.setAttributes(attributes);
+ try {
+ alertHandler.execute(action, metrics);
+ fail();
+ } catch (UnsupportedOperationException ex) {
+ assertTrue(true);
+ }
+ }
+
+ @Test
+ public void testAlertWithBulletinLevel() {
+
+ final Map<String, String> attributes = new HashMap<>();
+ final Map<String, Object> metrics = new HashMap<>();
+
+ final String category = "Rules Alert";
+ final String message = "This should be sent as an alert!";
+ final String severity = "INFO";
+ attributes.put("category", category);
+ attributes.put("message", message);
+ attributes.put("severity", severity);
+ metrics.put("jvmHeap", "1000000");
+ metrics.put("cpu", "90");
+
+ final String expectedOutput = "This should be sent as an alert!\n" +
+ "Alert Facts:\n" +
+ "Field: cpu, Value: 90\n" +
+ "Field: jvmHeap, Value: 1000000\n";
+
+ final Action action = new Action();
+ action.setType("ALERT");
+ action.setAttributes(attributes);
+ alertHandler.execute(reportingContext, action, metrics);
+ BulletinRepository bulletinRepository = reportingContext.getBulletinRepository();
+ List<Bulletin> bulletins = bulletinRepository.findBulletinsForController();
+ assertFalse(bulletins.isEmpty());
+ Bulletin bulletin = bulletins.get(0);
+ assertEquals(bulletin.getCategory(), category);
+ assertEquals(bulletin.getMessage(), expectedOutput);
+ assertEquals(bulletin.getLevel(), severity);
+ }
+
+ @Test
+ public void testAlertWithDefaultValues() {
+
+ final Map<String, String> attributes = new HashMap<>();
+ final Map<String, Object> metrics = new HashMap<>();
+
+ final String category = "Rules Triggered Alert";
+ final String message = "An alert was triggered by a rules based action.";
+ final String severity = "INFO";
+ metrics.put("jvmHeap", "1000000");
+ metrics.put("cpu", "90");
+
+ final String expectedOutput = "An alert was triggered by a rules-based action.\n" +
+ "Alert Facts:\n" +
+ "Field: cpu, Value: 90\n" +
+ "Field: jvmHeap, Value: 1000000\n";
+
+ final Action action = new Action();
+ action.setType("ALERT");
+ action.setAttributes(attributes);
+ alertHandler.execute(reportingContext, action, metrics);
+ BulletinRepository bulletinRepository = reportingContext.getBulletinRepository();
+ List<Bulletin> bulletins = bulletinRepository.findBulletinsForController();
+ assertFalse(bulletins.isEmpty());
+ Bulletin bulletin = bulletins.get(0);
+ assertEquals(bulletin.getCategory(), category);
+ assertEquals(bulletin.getMessage(), expectedOutput);
+ assertEquals(bulletin.getLevel(), severity);
+ }
+
+ @Test
+ public void testInvalidContext(){
+ final Map<String, String> attributes = new HashMap<>();
+ final Map<String, Object> metrics = new HashMap<>();
+
+ final String category = "Rules Alert";
+ final String message = "This should be sent as an alert!";
+ final String severity = "INFO";
+ attributes.put("category", category);
+ attributes.put("message", message);
+ attributes.put("severity", severity);
+ metrics.put("jvmHeap", "1000000");
+ metrics.put("cpu", "90");
+
+ final Action action = new Action();
+ action.setType("ALERT");
+ action.setAttributes(attributes);
+ PropertyContext fakeContext = new PropertyContext() {
+ @Override
+ public PropertyValue getProperty(PropertyDescriptor descriptor) {
+ return null;
+ }
+
+ @Override
+ public Map<String, String> getAllProperties() {
+ return null;
+ }
+ };
+ alertHandler.execute(fakeContext, action, metrics);
+ final String debugMessage = mockComponentLog.getWarnMessage();
+ assertTrue(StringUtils.isNotEmpty(debugMessage));
+ assertEquals(debugMessage,"Reporting context was not provided to create bulletins.");
+ }
+
+ @Test
+ public void testEmptyBulletinRepository(){
+ final Map<String, String> attributes = new HashMap<>();
+ final Map<String, Object> metrics = new HashMap<>();
+
+ final String category = "Rules Alert";
+ final String message = "This should be sent as an alert!";
+ final String severity = "INFO";
+ attributes.put("category", category);
+ attributes.put("message", message);
+ attributes.put("severity", severity);
+ metrics.put("jvmHeap", "1000000");
+ metrics.put("cpu", "90");
+
+ final Action action = new Action();
+ action.setType("ALERT");
+ action.setAttributes(attributes);
+ ReportingContext fakeContext = Mockito.mock(ReportingContext.class);
+ Mockito.when(reportingContext.getBulletinRepository()).thenReturn(null);
+ alertHandler.execute(fakeContext, action, metrics);
+ final String debugMessage = mockComponentLog.getWarnMessage();
+ assertTrue(StringUtils.isNotEmpty(debugMessage));
+ assertEquals(debugMessage,"Bulletin Repository is not available which is unusual. Cannot send a bulletin.");
+ }
+
+ private static class MockAlertHandler extends AlertHandler {
+
+ private ComponentLog testLogger;
+
+ public MockAlertHandler(ComponentLog testLogger) {
+ this.testLogger = testLogger;
+ }
+
+ @Override
+ protected ComponentLog getLogger() {
+ return testLogger;
+ }
+
+ }
+
+ private static class MockAlertBulletinRepository extends MockBulletinRepository {
+
+ List<Bulletin> bulletinList;
+
+
+ public MockAlertBulletinRepository() {
+ bulletinList = new ArrayList<>();
+ }
+
+ @Override
+ public void addBulletin(Bulletin bulletin) {
+ bulletinList.add(bulletin);
+ }
+
+ @Override
+ public List<Bulletin> findBulletinsForController() {
+ return bulletinList;
+ }
+
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
index 1b00ce7..7cb3179 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/pom.xml
@@ -81,6 +81,12 @@
<version>1.11.0-SNAPSHOT</version>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.apache.nifi</groupId>
+ <artifactId>nifi-rules-engine-service-api</artifactId>
+ <version>1.11.0-SNAPSHOT</version>
+ <scope>provided</scope>
+ </dependency>
</dependencies>
<build>
<plugins>
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
new file mode 100644
index 0000000..5254abe
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/MetricsEventReportingTask.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql;
+
+import org.apache.nifi.annotation.documentation.CapabilityDescription;
+import org.apache.nifi.annotation.documentation.Tags;
+import org.apache.nifi.annotation.lifecycle.OnScheduled;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.controller.ConfigurationContext;
+import org.apache.nifi.reporting.AbstractReportingTask;
+import org.apache.nifi.reporting.ReportingContext;
+import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
+import org.apache.nifi.rules.Action;
+import org.apache.nifi.rules.PropertyContextActionHandler;
+import org.apache.nifi.rules.engine.RulesEngineService;
+import org.apache.nifi.serialization.record.Record;
+import org.apache.nifi.serialization.record.ResultSetRecordSet;
+
+import java.io.IOException;
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+@Tags({"reporting", "rules", "action", "action handler", "status", "connection", "processor", "jvm", "metrics", "history", "bulletin", "sql"})
+@CapabilityDescription("Triggers rules-driven actions based on metrics values ")
+public class MetricsEventReportingTask extends AbstractReportingTask {
+
+ private List<PropertyDescriptor> properties;
+ private MetricsQueryService metricsQueryService;
+ private volatile RulesEngineService rulesEngineService;
+ private volatile PropertyContextActionHandler actionHandler;
+
+ @Override
+ protected List<PropertyDescriptor> getSupportedPropertyDescriptors() {
+ return properties;
+ }
+
+ @Override
+ protected void init(final ReportingInitializationContext config) {
+ metricsQueryService = new MetricsSqlQueryService(getLogger());
+ final List<PropertyDescriptor> properties = new ArrayList<>();
+ properties.add(QueryMetricsUtil.QUERY);
+ properties.add(QueryMetricsUtil.RULES_ENGINE);
+ properties.add(QueryMetricsUtil.ACTION_HANDLER);
+ this.properties = Collections.unmodifiableList(properties);
+ }
+
+ @OnScheduled
+ public void setup(final ConfigurationContext context) throws IOException {
+ actionHandler = context.getProperty(QueryMetricsUtil.ACTION_HANDLER).asControllerService(PropertyContextActionHandler.class);
+ rulesEngineService = context.getProperty(QueryMetricsUtil.RULES_ENGINE).asControllerService(RulesEngineService.class);
+ }
+
+ @Override
+ public void onTrigger(ReportingContext context) {
+ try {
+ final String query = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
+ fireRules(context, actionHandler, rulesEngineService, query);
+ } catch (Exception e) {
+ getLogger().error("Error opening loading rules: {}", new Object[]{e.getMessage()}, e);
+ }
+ }
+
+ private void fireRules(ReportingContext context, PropertyContextActionHandler actionHandler, RulesEngineService engine, String query) throws Exception {
+ QueryResult queryResult = metricsQueryService.query(context, query);
+ getLogger().debug("Executing query: {}", new Object[]{ query });
+ ResultSetRecordSet recordSet = metricsQueryService.getResultSetRecordSet(queryResult);
+ Record record;
+ try {
+ while ((record = recordSet.next()) != null) {
+ final Map<String, Object> facts = new HashMap<>();
+ for (String fieldName : record.getRawFieldNames()) {
+ facts.put(fieldName, record.getValue(fieldName));
+ }
+ List<Action> actions = engine.fireRules(facts);
+ if(actions == null || actions.isEmpty()){
+ getLogger().debug("No actions required for provided facts.");
+ } else {
+ actions.forEach(action -> {
+ actionHandler.execute(context, action,facts);
+ });
+ }
+ }
+ } finally {
+ metricsQueryService.closeQuietly(recordSet);
+ }
+ }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
index ae0e326..6f3aa9e 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/QueryNiFiReportingTask.java
@@ -16,21 +16,16 @@
*/
package org.apache.nifi.reporting.sql;
-import org.apache.calcite.config.Lex;
-import org.apache.calcite.sql.parser.SqlParser;
import org.apache.nifi.annotation.documentation.CapabilityDescription;
import org.apache.nifi.annotation.documentation.Tags;
import org.apache.nifi.annotation.lifecycle.OnScheduled;
import org.apache.nifi.components.PropertyDescriptor;
-import org.apache.nifi.components.ValidationContext;
-import org.apache.nifi.components.ValidationResult;
-import org.apache.nifi.components.Validator;
import org.apache.nifi.controller.ConfigurationContext;
-import org.apache.nifi.expression.ExpressionLanguageScope;
import org.apache.nifi.record.sink.RecordSinkService;
import org.apache.nifi.reporting.AbstractReportingTask;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.serialization.record.ResultSetRecordSet;
import org.apache.nifi.util.StopWatch;
@@ -50,34 +45,6 @@ import java.util.concurrent.TimeUnit;
+ "query on the table when the capability is disabled will cause an error.")
public class QueryNiFiReportingTask extends AbstractReportingTask {
- static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder()
- .name("sql-reporting-record-sink")
- .displayName("Record Destination Service")
- .description("Specifies the Controller Service to use for writing out the query result records to some destination.")
- .identifiesControllerService(RecordSinkService.class)
- .required(true)
- .build();
-
- static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
- .name("sql-reporting-query")
- .displayName("SQL Query")
- .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
- + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the "
- + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
- .required(true)
- .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
- .addValidator(new SqlValidator())
- .build();
-
- static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder()
- .name("sql-reporting-include-zero-record-results")
- .displayName("Include Zero Record Results")
- .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.")
- .expressionLanguageSupported(ExpressionLanguageScope.NONE)
- .allowableValues("true", "false")
- .defaultValue("false")
- .required(true)
- .build();
private List<PropertyDescriptor> properties;
@@ -89,9 +56,9 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
protected void init(final ReportingInitializationContext config) {
metricsQueryService = new MetricsSqlQueryService(getLogger());
final List<PropertyDescriptor> properties = new ArrayList<>();
- properties.add(QUERY);
- properties.add(RECORD_SINK);
- properties.add(INCLUDE_ZERO_RECORD_RESULTS);
+ properties.add(QueryMetricsUtil.QUERY);
+ properties.add(QueryMetricsUtil.RECORD_SINK);
+ properties.add(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS);
this.properties = Collections.unmodifiableList(properties);
}
@@ -102,7 +69,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
@OnScheduled
public void setup(final ConfigurationContext context) throws IOException {
- recordSinkService = context.getProperty(RECORD_SINK).asControllerService(RecordSinkService.class);
+ recordSinkService = context.getProperty(QueryMetricsUtil.RECORD_SINK).asControllerService(RecordSinkService.class);
recordSinkService.reset();
}
@@ -110,7 +77,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
public void onTrigger(ReportingContext context) {
final StopWatch stopWatch = new StopWatch(true);
try {
- final String sql = context.getProperty(QUERY).evaluateAttributeExpressions().getValue();
+ final String sql = context.getProperty(QueryMetricsUtil.QUERY).evaluateAttributeExpressions().getValue();
final QueryResult queryResult = metricsQueryService.query(context, sql);
final ResultSetRecordSet recordSet;
@@ -129,7 +96,7 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
attributes.put("reporting.task.name", getName());
attributes.put("reporting.task.uuid", getIdentifier());
attributes.put("reporting.task.type", this.getClass().getSimpleName());
- recordSinkService.sendData(recordSet, attributes, context.getProperty(INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
+ recordSinkService.sendData(recordSet, attributes, context.getProperty(QueryMetricsUtil.INCLUDE_ZERO_RECORD_RESULTS).asBoolean());
} catch (Exception e) {
getLogger().error("Error during transmission of query results due to {}", new Object[]{e.getMessage()}, e);
return;
@@ -143,41 +110,4 @@ public class QueryNiFiReportingTask extends AbstractReportingTask {
}
}
- private static class SqlValidator implements Validator {
- @Override
- public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
- if (context.isExpressionLanguagePresent(input)) {
- return new ValidationResult.Builder()
- .input(input)
- .subject(subject)
- .valid(true)
- .explanation("Expression Language Present")
- .build();
- }
-
- final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
-
- final SqlParser.Config config = SqlParser.configBuilder()
- .setLex(Lex.MYSQL_ANSI)
- .build();
-
- final SqlParser parser = SqlParser.create(substituted, config);
- try {
- parser.parseStmt();
- return new ValidationResult.Builder()
- .subject(subject)
- .input(input)
- .valid(true)
- .build();
- } catch (final Exception e) {
- return new ValidationResult.Builder()
- .subject(subject)
- .input(input)
- .valid(false)
- .explanation("Not a valid SQL Statement: " + e.getMessage())
- .build();
- }
- }
- }
-
}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java
new file mode 100644
index 0000000..159daec
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/java/org/apache/nifi/reporting/sql/util/QueryMetricsUtil.java
@@ -0,0 +1,114 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.reporting.sql.util;
+
+import org.apache.calcite.config.Lex;
+import org.apache.calcite.sql.parser.SqlParser;
+import org.apache.nifi.components.PropertyDescriptor;
+import org.apache.nifi.components.ValidationContext;
+import org.apache.nifi.components.ValidationResult;
+import org.apache.nifi.components.Validator;
+import org.apache.nifi.expression.ExpressionLanguageScope;
+import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.rules.PropertyContextActionHandler;
+import org.apache.nifi.rules.engine.RulesEngineService;
+
+public class QueryMetricsUtil {
+
+ public static final PropertyDescriptor RECORD_SINK = new PropertyDescriptor.Builder()
+ .name("sql-reporting-record-sink")
+ .displayName("Record Destination Service")
+ .description("Specifies the Controller Service to use for writing out the query result records to some destination.")
+ .identifiesControllerService(RecordSinkService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor QUERY = new PropertyDescriptor.Builder()
+ .name("sql-reporting-query")
+ .displayName("SQL Query")
+ .description("SQL SELECT statement specifies which tables to query and how data should be filtered/transformed. "
+ + "SQL SELECT can select from the CONNECTION_STATUS, PROCESSOR_STATUS, BULLETINS, PROCESS_GROUP_STATUS, JVM_METRICS, or CONNECTION_STATUS_PREDICTIONS tables. Note that the "
+ + "CONNECTION_STATUS_PREDICTIONS table is not available for querying if analytics are not enabled).")
+ .required(true)
+ .expressionLanguageSupported(ExpressionLanguageScope.VARIABLE_REGISTRY)
+ .addValidator(new SqlValidator())
+ .build();
+
+ public static final PropertyDescriptor INCLUDE_ZERO_RECORD_RESULTS = new PropertyDescriptor.Builder()
+ .name("sql-reporting-include-zero-record-results")
+ .displayName("Include Zero Record Results")
+ .description("When running the SQL statement, if the result has no data, this property specifies whether or not the empty result set will be transmitted.")
+ .expressionLanguageSupported(ExpressionLanguageScope.NONE)
+ .allowableValues("true", "false")
+ .defaultValue("false")
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor RULES_ENGINE = new PropertyDescriptor.Builder()
+ .name("rules-engine-service")
+ .displayName("Rules Engine Service")
+ .description("Specifies the Controller Service to use for applying rules to metrics.")
+ .identifiesControllerService(RulesEngineService.class)
+ .required(true)
+ .build();
+
+ public static final PropertyDescriptor ACTION_HANDLER = new PropertyDescriptor.Builder()
+ .name("action-handler")
+ .displayName("Event Action Handler")
+ .description("Handler that will execute the defined action returned from rules engine (if Action type is supported by the handler)")
+ .identifiesControllerService(PropertyContextActionHandler.class)
+ .required(true)
+ .build();
+
+ public static class SqlValidator implements Validator {
+ @Override
+ public ValidationResult validate(final String subject, final String input, final ValidationContext context) {
+ if (context.isExpressionLanguagePresent(input)) {
+ return new ValidationResult.Builder()
+ .input(input)
+ .subject(subject)
+ .valid(true)
+ .explanation("Expression Language Present")
+ .build();
+ }
+
+ final String substituted = context.newPropertyValue(input).evaluateAttributeExpressions().getValue();
+
+ final SqlParser.Config config = SqlParser.configBuilder()
+ .setLex(Lex.MYSQL_ANSI)
+ .build();
+
+ final SqlParser parser = SqlParser.create(substituted, config);
+ try {
+ parser.parseStmt();
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(true)
+ .build();
+ } catch (final Exception e) {
+ return new ValidationResult.Builder()
+ .subject(subject)
+ .input(input)
+ .valid(false)
+ .explanation("Not a valid SQL Statement: " + e.getMessage())
+ .build();
+ }
+ }
+ }
+
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
index ec340e8..c3f5883 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/META-INF/services/org.apache.nifi.reporting.ReportingTask
@@ -13,4 +13,5 @@
# See the License for the specific language governing permissions and
# limitations under the License.
-org.apache.nifi.reporting.sql.QueryNiFiReportingTask
\ No newline at end of file
+org.apache.nifi.reporting.sql.QueryNiFiReportingTask
+org.apache.nifi.reporting.sql.MetricsEventReportingTask
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
new file mode 100644
index 0000000..2392aab
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/main/resources/docs/org.apache.nifi.reporting.sql.MetricsEventReportingTask/additionalDetails.html
@@ -0,0 +1,34 @@
+<!DOCTYPE html>
+<html lang="en">
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+ http://www.apache.org/licenses/LICENSE-2.0
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<head>
+ <meta charset="utf-8" />
+ <title>Metrics Event Reporting Task</title>
+ <!--link rel="stylesheet" href="../../css/component-usage.css" type="text/css" /-->
+ <link rel="stylesheet" href="/nifi-docs/css/component-usage.css" type="text/css" />
+</head>
+
+<body>
+<h2>Summary</h2>
+<p>
+ This reporting task can be used to issue SQL queries against various NiFi metrics information, submit returned data to a rules engine (which will determine if any actions should be performed)
+ and execute the prescribed actions using action handlers. This task requires a RulesEngineService (which will identify any actions that should be performed) and an ActionHandler which will execute the action(s).
+ A distinct ActionHandler can be used to service all events or an ActionHandlerLookup can be used for dynamic handler lookup. NOTE: Optimally action handler should be associated with the expected action types
+ returned from the rules engine.
+</p>
+<br/>
+</body>
+</html>
\ No newline at end of file
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
similarity index 55%
copy from nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
copy to nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
index 9f4cb0a..83992e4 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestMetricsEventReportingTask.java
@@ -16,24 +16,32 @@
*/
package org.apache.nifi.reporting.sql;
-
+import com.google.common.collect.Lists;
import org.apache.nifi.attribute.expression.language.StandardPropertyValue;
import org.apache.nifi.components.PropertyDescriptor;
import org.apache.nifi.components.PropertyValue;
+import org.apache.nifi.context.PropertyContext;
import org.apache.nifi.controller.ConfigurationContext;
import org.apache.nifi.controller.status.ConnectionStatus;
import org.apache.nifi.controller.status.ProcessGroupStatus;
import org.apache.nifi.controller.status.ProcessorStatus;
+import org.apache.nifi.controller.status.analytics.ConnectionStatusPredictions;
import org.apache.nifi.logging.ComponentLog;
-import org.apache.nifi.record.sink.MockRecordSinkService;
-import org.apache.nifi.record.sink.RecordSinkService;
+import org.apache.nifi.reporting.BulletinRepository;
import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
-import org.apache.nifi.reporting.util.metrics.MetricNames;
+import org.apache.nifi.reporting.Severity;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
+import org.apache.nifi.rules.Action;
+import org.apache.nifi.rules.MockPropertyContextActionHandler;
+import org.apache.nifi.rules.PropertyContextActionHandler;
+import org.apache.nifi.rules.engine.MockRulesEngineService;
+import org.apache.nifi.rules.engine.RulesEngineService;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue;
+import org.apache.nifi.util.Tuple;
import org.junit.Before;
import org.junit.Test;
import org.mockito.Mockito;
@@ -46,23 +54,23 @@ import java.util.HashMap;
import java.util.List;
import java.util.Map;
import java.util.UUID;
-import java.util.stream.Collectors;
-import java.util.stream.Stream;
import static org.junit.Assert.assertEquals;
-import static org.junit.Assert.assertTrue;
-
-public class TestQueryNiFiReportingTask {
+import static org.junit.Assert.assertFalse;
+import static org.mockito.ArgumentMatchers.any;
+import static org.mockito.ArgumentMatchers.anyString;
+public class TestMetricsEventReportingTask {
private ReportingContext context;
- private MockQueryNiFiReportingTask reportingTask;
- private MockRecordSinkService mockRecordSinkService;
+ private MockMetricsEventReportingTask reportingTask;
+ private MockPropertyContextActionHandler actionHandler;
+ private MockRulesEngineService rulesEngineService;
private ProcessGroupStatus status;
@Before
public void setup() {
- mockRecordSinkService = new MockRecordSinkService();
status = new ProcessGroupStatus();
+ actionHandler = new MockPropertyContextActionHandler();
status.setId("1234");
status.setFlowFilesReceived(5);
status.setBytesReceived(10000);
@@ -83,15 +91,21 @@ public class TestQueryNiFiReportingTask {
processorStatuses.add(procStatus);
status.setProcessorStatus(processorStatuses);
+ ConnectionStatusPredictions connectionStatusPredictions = new ConnectionStatusPredictions();
+ connectionStatusPredictions.setPredictedTimeToCountBackpressureMillis(1000);
+ connectionStatusPredictions.setPredictedTimeToBytesBackpressureMillis(1000);
+ connectionStatusPredictions.setNextPredictedQueuedCount(1000000000);
+ connectionStatusPredictions.setNextPredictedQueuedBytes(1000000000000000L);
+
ConnectionStatus root1ConnectionStatus = new ConnectionStatus();
root1ConnectionStatus.setId("root1");
root1ConnectionStatus.setQueuedCount(1000);
- root1ConnectionStatus.setBackPressureObjectThreshold(1000);
+ root1ConnectionStatus.setPredictions(connectionStatusPredictions);
ConnectionStatus root2ConnectionStatus = new ConnectionStatus();
root2ConnectionStatus.setId("root2");
root2ConnectionStatus.setQueuedCount(500);
- root2ConnectionStatus.setBackPressureObjectThreshold(1000);
+ root2ConnectionStatus.setPredictions(connectionStatusPredictions);
Collection<ConnectionStatus> rootConnectionStatuses = new ArrayList<>();
rootConnectionStatuses.add(root1ConnectionStatus);
@@ -138,120 +152,42 @@ public class TestQueryNiFiReportingTask {
@Test
public void testConnectionStatusTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
+ properties.put(QueryMetricsUtil.QUERY, "select connectionId, predictedQueuedCount, predictedTimeToBytesBackpressureMillis from CONNECTION_STATUS_PREDICTIONS");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
+ List<Map<String,Object>> metricsList = actionHandler.getRows();
+ List<Tuple<String, Action>> defaultLogActions = actionHandler.getDefaultActionsByType("LOG");
+ List<Tuple<String, Action>> defaultAlertActions = actionHandler.getDefaultActionsByType("ALERT");
+ List<PropertyContext> propertyContexts = actionHandler.getPropertyContexts();
+ assertFalse(metricsList.isEmpty());
+ assertEquals(2,defaultLogActions.size());
+ assertEquals(2,defaultAlertActions.size());
+ assertEquals(4,propertyContexts.size());
- List<Map<String, Object>> rows = mockRecordSinkService.getRows();
- assertEquals(4, rows.size());
- // Validate the first row
- Map<String, Object> row = rows.get(0);
- assertEquals(3, row.size()); // Only projected 2 columns
- Object id = row.get("id");
- assertTrue(id instanceof String);
- assertEquals("nested", id);
- assertEquals(1001, row.get("queuedCount"));
- // Validate the second row
- row = rows.get(1);
- id = row.get("id");
- assertEquals("root1", id);
- assertEquals(1000, row.get("queuedCount"));
- assertEquals(true, row.get("isBackPressureEnabled"));
- // Validate the third row
- row = rows.get(2);
- id = row.get("id");
- assertEquals("root2", id);
- assertEquals(500, row.get("queuedCount"));
- assertEquals(false, row.get("isBackPressureEnabled"));
- // Validate the fourth row
- row = rows.get(3);
- id = row.get("id");
- assertEquals("nested2", id);
- assertEquals(3, row.get("queuedCount"));
}
- @Test
- public void testJvmMetricsTable() throws IOException, InitializationException {
- final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select "
- + Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
- MetricNames.JVM_THREAD_COUNT,
- MetricNames.JVM_THREAD_STATES_BLOCKED,
- MetricNames.JVM_THREAD_STATES_RUNNABLE,
- MetricNames.JVM_THREAD_STATES_TERMINATED,
- MetricNames.JVM_THREAD_STATES_TIMED_WAITING,
- MetricNames.JVM_UPTIME,
- MetricNames.JVM_HEAP_USED,
- MetricNames.JVM_HEAP_USAGE,
- MetricNames.JVM_NON_HEAP_USAGE,
- MetricNames.JVM_FILE_DESCRIPTOR_USAGE).map((s) -> s.replace(".", "_")).collect(Collectors.joining(","))
- + " from JVM_METRICS");
- reportingTask = initTask(properties);
- reportingTask.onTrigger(context);
-
- List<Map<String, Object>> rows = mockRecordSinkService.getRows();
- assertEquals(1, rows.size());
- Map<String,Object> row = rows.get(0);
- assertEquals(11, row.size());
- assertTrue(row.get(MetricNames.JVM_DAEMON_THREAD_COUNT.replace(".","_")) instanceof Integer);
- assertTrue(row.get(MetricNames.JVM_HEAP_USAGE.replace(".","_")) instanceof Double);
- }
-
- @Test
- public void testProcessGroupStatusTable() throws IOException, InitializationException {
- final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
- reportingTask = initTask(properties);
- reportingTask.onTrigger(context);
-
- List<Map<String, Object>> rows = mockRecordSinkService.getRows();
- assertEquals(4, rows.size());
- // Validate the first row
- Map<String, Object> row = rows.get(0);
- assertEquals(20, row.size());
- assertEquals(1L, row.get("bytesRead"));
- // Validate the second row
- row = rows.get(1);
- assertEquals(1234L, row.get("bytesRead"));
- // Validate the third row
- row = rows.get(2);
- assertEquals(12345L, row.get("bytesRead"));
- // Validate the fourth row
- row = rows.get(3);
- assertEquals(20000L, row.get("bytesRead"));
- }
-
- @Test
- public void testNoResults() throws IOException, InitializationException {
- final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
- reportingTask = initTask(properties);
- reportingTask.onTrigger(context);
-
- List<Map<String, Object>> rows = mockRecordSinkService.getRows();
- assertEquals(0, rows.size());
- }
-
- private MockQueryNiFiReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
+ private MockMetricsEventReportingTask initTask(Map<PropertyDescriptor, String> customProperties) throws InitializationException, IOException {
final ComponentLog logger = Mockito.mock(ComponentLog.class);
- reportingTask = new MockQueryNiFiReportingTask();
+ final BulletinRepository bulletinRepository = Mockito.mock(BulletinRepository.class);
+ reportingTask = new MockMetricsEventReportingTask();
final ReportingInitializationContext initContext = Mockito.mock(ReportingInitializationContext.class);
Mockito.when(initContext.getIdentifier()).thenReturn(UUID.randomUUID().toString());
Mockito.when(initContext.getLogger()).thenReturn(logger);
reportingTask.initialize(initContext);
Map<PropertyDescriptor, String> properties = new HashMap<>();
+
for (final PropertyDescriptor descriptor : reportingTask.getSupportedPropertyDescriptors()) {
properties.put(descriptor, descriptor.getDefaultValue());
}
properties.putAll(customProperties);
context = Mockito.mock(ReportingContext.class);
+ Mockito.when(context.isAnalyticsEnabled()).thenReturn(true);
Mockito.when(context.getStateManager()).thenReturn(new MockStateManager(reportingTask));
+ Mockito.when(context.getBulletinRepository()).thenReturn(bulletinRepository);
+ Mockito.when(context.createBulletin(anyString(),any(Severity.class), anyString())).thenReturn(null);
+
Mockito.doAnswer((Answer<PropertyValue>) invocation -> {
final PropertyDescriptor descriptor = invocation.getArgument(0, PropertyDescriptor.class);
return new MockPropertyValue(properties.get(descriptor));
@@ -262,17 +198,27 @@ public class TestQueryNiFiReportingTask {
Mockito.when(eventAccess.getControllerStatus()).thenReturn(status);
final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
- mockRecordSinkService = new MockRecordSinkService();
- Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
- Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
+ actionHandler = new MockPropertyContextActionHandler();
+ Mockito.when(pValue.asControllerService(PropertyContextActionHandler.class)).thenReturn(actionHandler);
+
+ Action action1 = new Action();
+ action1.setType("LOG");
+ Action action2 = new Action();
+ action2.setType("ALERT");
+
+ final PropertyValue resValue = Mockito.mock(StandardPropertyValue.class);
+ rulesEngineService = new MockRulesEngineService(Lists.newArrayList(action1,action2));
+ Mockito.when(resValue.asControllerService(RulesEngineService.class)).thenReturn(rulesEngineService);
ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
- Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
+ Mockito.when(configContext.getProperty(QueryMetricsUtil.RULES_ENGINE)).thenReturn(resValue);
+ Mockito.when(configContext.getProperty(QueryMetricsUtil.ACTION_HANDLER)).thenReturn(pValue);
reportingTask.setup(configContext);
return reportingTask;
}
- private static final class MockQueryNiFiReportingTask extends QueryNiFiReportingTask {
+ private static final class MockMetricsEventReportingTask extends MetricsEventReportingTask {
+
}
-}
\ No newline at end of file
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
index 9f4cb0a..eae9e91 100644
--- a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/reporting/sql/TestQueryNiFiReportingTask.java
@@ -31,6 +31,7 @@ import org.apache.nifi.reporting.EventAccess;
import org.apache.nifi.reporting.InitializationException;
import org.apache.nifi.reporting.ReportingContext;
import org.apache.nifi.reporting.ReportingInitializationContext;
+import org.apache.nifi.reporting.sql.util.QueryMetricsUtil;
import org.apache.nifi.reporting.util.metrics.MetricNames;
import org.apache.nifi.state.MockStateManager;
import org.apache.nifi.util.MockPropertyValue;
@@ -138,8 +139,8 @@ public class TestQueryNiFiReportingTask {
@Test
public void testConnectionStatusTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
+ properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+ properties.put(QueryMetricsUtil.QUERY, "select id,queuedCount,isBackPressureEnabled from CONNECTION_STATUS order by queuedCount desc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
@@ -174,8 +175,8 @@ public class TestQueryNiFiReportingTask {
@Test
public void testJvmMetricsTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select "
+ properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+ properties.put(QueryMetricsUtil.QUERY, "select "
+ Stream.of(MetricNames.JVM_DAEMON_THREAD_COUNT,
MetricNames.JVM_THREAD_COUNT,
MetricNames.JVM_THREAD_STATES_BLOCKED,
@@ -202,8 +203,8 @@ public class TestQueryNiFiReportingTask {
@Test
public void testProcessGroupStatusTable() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
+ properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+ properties.put(QueryMetricsUtil.QUERY, "select * from PROCESS_GROUP_STATUS order by bytesRead asc");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
@@ -227,8 +228,8 @@ public class TestQueryNiFiReportingTask {
@Test
public void testNoResults() throws IOException, InitializationException {
final Map<PropertyDescriptor, String> properties = new HashMap<>();
- properties.put(QueryNiFiReportingTask.RECORD_SINK, "mock-record-sink");
- properties.put(QueryNiFiReportingTask.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
+ properties.put(QueryMetricsUtil.RECORD_SINK, "mock-record-sink");
+ properties.put(QueryMetricsUtil.QUERY, "select * from CONNECTION_STATUS where queuedCount > 2000");
reportingTask = initTask(properties);
reportingTask.onTrigger(context);
@@ -263,11 +264,11 @@ public class TestQueryNiFiReportingTask {
final PropertyValue pValue = Mockito.mock(StandardPropertyValue.class);
mockRecordSinkService = new MockRecordSinkService();
- Mockito.when(context.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
+ Mockito.when(context.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
Mockito.when(pValue.asControllerService(RecordSinkService.class)).thenReturn(mockRecordSinkService);
ConfigurationContext configContext = Mockito.mock(ConfigurationContext.class);
- Mockito.when(configContext.getProperty(QueryNiFiReportingTask.RECORD_SINK)).thenReturn(pValue);
+ Mockito.when(configContext.getProperty(QueryMetricsUtil.RECORD_SINK)).thenReturn(pValue);
reportingTask.setup(configContext);
return reportingTask;
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
new file mode 100644
index 0000000..323317d
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/MockPropertyContextActionHandler.java
@@ -0,0 +1,76 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.context.PropertyContext;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.util.Tuple;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.stream.Collectors;
+
+public class MockPropertyContextActionHandler extends AbstractConfigurableComponent implements PropertyContextActionHandler{
+
+ private List<Map<String, Object>> rows = new ArrayList<>();
+ private List<Tuple<String,Action>> defaultActions = new ArrayList<>();
+ private List<PropertyContext> propertyContexts = new ArrayList<>();
+
+
+ @Override
+ public void execute(PropertyContext context, Action action, Map<String, Object> facts) {
+ propertyContexts.add(context);
+ execute(action, facts);
+ }
+
+ @Override
+ public void execute(Action action, Map<String, Object> facts) {
+ rows.add(facts);
+ defaultActions.add( new Tuple<>(action.getType(),action));
+ }
+
+
+ @Override
+ public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
+
+ }
+
+ public List<Map<String, Object>> getRows() {
+ return rows;
+ }
+
+ public List<Tuple<String, Action>> getDefaultActions() {
+ return defaultActions;
+ }
+
+ public List<Tuple<String,Action>> getDefaultActionsByType(final String type){
+ return defaultActions.stream().filter(stringActionTuple -> stringActionTuple
+ .getKey().equalsIgnoreCase(type)).collect(Collectors.toList());
+ }
+
+ public List<PropertyContext> getPropertyContexts() {
+ return propertyContexts;
+ }
+
+ @Override
+ public String getIdentifier() {
+ return "MockPropertyContextActionHandler";
+ }
+}
diff --git a/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
new file mode 100644
index 0000000..e3ccc73
--- /dev/null
+++ b/nifi-nar-bundles/nifi-sql-reporting-bundle/nifi-sql-reporting-tasks/src/test/java/org/apache/nifi/rules/engine/MockRulesEngineService.java
@@ -0,0 +1,47 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.nifi.rules.engine;
+
+import org.apache.nifi.components.AbstractConfigurableComponent;
+import org.apache.nifi.controller.ControllerServiceInitializationContext;
+import org.apache.nifi.reporting.InitializationException;
+import org.apache.nifi.rules.Action;
+
+import java.util.List;
+import java.util.Map;
+
+public class MockRulesEngineService extends AbstractConfigurableComponent implements RulesEngineService {
+ private List<Action> actions;
+
+ public MockRulesEngineService(List<Action> actions) {
+ this.actions = actions;
+ }
+
+ @Override
+ public List<Action> fireRules(Map<String, Object> facts) {
+ return actions;
+ }
+
+ @Override
+ public void initialize(ControllerServiceInitializationContext context) throws InitializationException {
+ }
+
+ @Override
+ public String getIdentifier() {
+ return "MockRulesEngineService";
+ }
+}