You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/12/05 06:13:02 UTC

[1/3] incubator-eagle git commit: [EAGLE-815] Add eagle alert template, severity and category support

Repository: incubator-eagle
Updated Branches:
  refs/heads/master e5e215e0b -> 1c81c0865


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
index 67b94f8..026f60b 100644
--- a/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
+++ b/eagle-hadoop-metric/src/test/java/org/apache/eagle/metric/SendSampleDataToKafka.java
@@ -16,21 +16,25 @@
  */
 package org.apache.eagle.metric;
 
-import com.google.common.base.Preconditions;
 import com.typesafe.config.ConfigFactory;
 import kafka.producer.KeyedMessage;
 import kafka.producer.ProducerConfig;
-import org.apache.commons.io.IOUtils;
 import org.apache.eagle.app.messaging.KafkaStreamProvider;
 import org.apache.eagle.app.messaging.KafkaStreamSinkConfig;
 
 import java.io.IOException;
-import java.io.InputStream;
 import java.net.URISyntaxException;
 import java.util.Properties;
 
 public class SendSampleDataToKafka {
     public static void main(String[] args) throws URISyntaxException, IOException {
+        String data = "{" +
+            "\"host\":\"localhost\", " +
+            "\"timestamp\": 1480319109000, " +
+            "\"metric\": \"hadoop.cpu.usage\", " +
+            "\"component\": \"namenode\", " +
+            "\"site\": \"test\", " +
+            "\"value\": 0.98}";
         KafkaStreamSinkConfig config = new KafkaStreamProvider().getSinkConfig("HADOOP_JMX_METRIC_STREAM",ConfigFactory.load());
         Properties properties = new Properties();
         properties.put("metadata.broker.list", config.getBrokerList());
@@ -44,10 +48,7 @@ public class SendSampleDataToKafka {
         ProducerConfig producerConfig = new ProducerConfig(properties);
         kafka.javaapi.producer.Producer producer = new kafka.javaapi.producer.Producer(producerConfig);
         try {
-            InputStream is = SendSampleDataToKafka.class.getResourceAsStream("hadoop_jmx_metric_sample.json");
-            Preconditions.checkNotNull(is, "hadoop_jmx_metric_sample.json");
-            String value = IOUtils.toString(is);
-            producer.send(new KeyedMessage(config.getTopicId(), value));
+            producer.send(new KeyedMessage(config.getTopicId(), data));
         } finally {
             producer.close();
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
index f0f62f2..68472a0 100644
--- a/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
+++ b/eagle-hadoop-metric/src/test/resources/hadoop_jmx_metric_sample.json
@@ -1,6 +1,6 @@
 {
   "host":"localhost",
-  "timestamp": 1480319107,
+  "timestamp": 1480319107000,
   "metric": "hadoop.cpu.usage",
   "component": "namenode",
   "site": "test",

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-hadoop-metric/src/test/resources/integrate_test_policy.json
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/test/resources/integrate_test_policy.json b/eagle-hadoop-metric/src/test/resources/integrate_test_policy.json
new file mode 100644
index 0000000..e9de11c
--- /dev/null
+++ b/eagle-hadoop-metric/src/test/resources/integrate_test_policy.json
@@ -0,0 +1,37 @@
+{
+  "name": "TEST_POLICY",
+  "description": "from HADOOP_JMX_METRIC_STREAM_SANDBOX[site==\"test\"] insert into ALERT_STREAM;",
+  "inputStreams": [
+    "HADOOP_JMX_METRIC_STREAM_SANDBOX"
+  ],
+  "outputStreams": [
+    "ALERT_STREAM"
+  ],
+  "definition": {
+    "type": "siddhi",
+    "value": "from HADOOP_JMX_METRIC_STREAM_SANDBOX[site==\"test\"] select component,value, name insert into ALERT_STREAM;",
+    "handlerClass": null,
+    "properties": {},
+    "inputStreams": [],
+    "outputStreams": []
+  },
+  "stateDefinition": null,
+  "policyStatus": "ENABLED",
+  "alertDefinition": {
+    "templateType": "TEXT",
+    "subject": "$component Disk Usage $value",
+    "body": "#set($usage_percentage = $value * 100)\r\n\r\nDisk Usage (<strong>$metric<strong>) reached <span style=\"color:red\">$usage_percentage %</span> (Threshold: 90%) on <strong>$component</strong> of cluster: <strong>$site</strong> on:\r\n<ul>\r\n    <li><strong>Hostname</strong>: $host</li>\r\n    <li><strong>When</strong>: $ALERT_TIME</li>\r\n    <li><strong>Root Cause</strong>: UNKNOWN</li>\r\n    <li><strong>Action Required</strong>: N/A</li>\r\n</ul>",
+    "severity": "CRITICAL",
+    "category": "HDFS"
+  },
+  "partitionSpec": [
+    {
+      "streamId": "HADOOP_JMX_METRIC_STREAM_SANDBOX",
+      "type": "SHUFFLE",
+      "columns": [],
+      "sortSpec": null
+    }
+  ],
+  "dedicated": false,
+  "parallelismHint": 5
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
----------------------------------------------------------------------
diff --git a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
index 16f1144..f0fb6b5 100644
--- a/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
+++ b/eagle-jpm/eagle-jpm-spark-history/src/main/java/org/apache/eagle/jpm/spark/history/storm/SparkHistoryJobParseBolt.java
@@ -134,7 +134,7 @@ public class SparkHistoryJobParseBolt extends BaseRichBolt {
         List<String> attempts = new ArrayList<String>();
         SparkApplication app = null;
         /*try {
-            List apps = this.historyServerFetcher.getResource(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
+            List apps = this.historyServerFetcher.getBodyTemplate(Constants.ResourceType.SPARK_JOB_DETAIL, appId);
             if (apps != null) {
                 app = (SparkApplication) apps.get(0);
                 attempts = app.getAttempts();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-server/src/main/webapp/app/dev/partials/alert/policyEdit/advancedMode.html
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/partials/alert/policyEdit/advancedMode.html b/eagle-server/src/main/webapp/app/dev/partials/alert/policyEdit/advancedMode.html
index 1d38f2e..d5829c5 100644
--- a/eagle-server/src/main/webapp/app/dev/partials/alert/policyEdit/advancedMode.html
+++ b/eagle-server/src/main/webapp/app/dev/partials/alert/policyEdit/advancedMode.html
@@ -143,7 +143,6 @@
 					<p class="text-danger">{{definitionMessage}}</p>
 				</div>
 
-
 				<ul class="sm-padding">
 					<li class="text-danger" ng-if="policy.outputStreams.length === 0"><i class="fa fa-fw fa-warning"></i> No alert stream defined</li>
 					<li ng-repeat="stream in outputStreams track by $index">
@@ -154,6 +153,30 @@
 					</li>
 				</ul>
 
+				<div class="form-group">
+					<label>Alert Definition*</label><br/>
+					<span class="text-muted">Category</span>
+					<input class="form-control" type="text" ng-model="policy.alertDefinition.category" ng-disabled="policyLock" />
+					<span class="text-muted">Severity</span>
+					<select class="form-control" ng-model="policy.alertDefinition.severity" ng-disabled="policyLock" >
+						<option value="WARNING" class="text-warning">WARNING</option>
+						<option value="CRITICAL" class="text-danger">CRITICAL</option>
+						<option value="FATAL" class="text-danger">FATAL</option>
+						<option value="OK" class="text-success">OK</option>
+					</select>
+					<span class="text-muted">Alert Subject</span>
+					<span class="fa fa-question-circle ng-scope"
+						  uib-tooltip="Alert subject, support template with alert stream fields and built-in context like: STREAM_ID, ALERT_ID, CREATED_BY, POLICY_ID, CREATED_TIMESTAMP, CREATED_TIME, ALERT_TIMESTAMP, ALERT_TIME, ALERT_SCHEMA, POLICY_DESC, POLICY_TYPE, POLICY_DEFINITION, POLICY_HANDLER">
+					</span>
+					<input type="text" class="form-control" placeholder="Please input alert subject (support template)" ng-model="policy.alertDefinition.subject" ng-disabled="policyLock"/>
+					<span class="text-muted">Alert Body</span>
+					<span class="fa fa-question-circle ng-scope"
+						  uib-tooltip="Alert body, support template with alert stream fields and built-in  context like: STREAM_ID, ALERT_ID, CREATED_BY, POLICY_ID, CREATED_TIMESTAMP, CREATED_TIME, ALERT_TIMESTAMP, ALERT_TIME, ALERT_SCHEMA, POLICY_DESC, POLICY_TYPE, POLICY_DEFINITION, POLICY_HANDLER">
+					</span>
+					<!--<textarea class="form-control" rows="3" placeholder="Please input alert body (support template)" ng-model="policy.alertDefinition.body" ng-disabled="policyLock"></textarea>-->
+					<div editor placeholder="Please input alert body (support template)" ng-model="policy.alertDefinition.body" ng-disabled="policyLock"></div>
+				</div>
+
 				<label>
 					Publish Alerts
 				</label>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
----------------------------------------------------------------------
diff --git a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
index e202450..3bb5609 100644
--- a/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
+++ b/eagle-server/src/main/webapp/app/dev/public/js/ctrls/alertEditCtrl.js
@@ -69,6 +69,12 @@
 				type: "siddhi",
 				value: ""
 			},
+			alertDefinition: {
+				subject: "",
+				body: "",
+				severity: "WARNING",
+				category: "DEFAULT"
+			},
 			partitionSpec: [],
 			parallelismHint: 5
 		}, $scope.policy);


[2/3] incubator-eagle git commit: [EAGLE-815] Add eagle alert template, severity and category support

Posted by ha...@apache.org.
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm
new file mode 100644
index 0000000..3926cc8
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT_TEMPLATE.vm
@@ -0,0 +1,301 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+    #set ( $alert = $alertList[0] )
+<head>
+    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
+    <meta name="viewport" content="width=device-width"/>
+    <title>$alert["alertSubject"]</title>
+    <style>
+        body {
+            width: 100% !important;
+            min-width: 100%;
+            -webkit-text-size-adjust: 100%;
+            -ms-text-size-adjust: 100%;
+            margin: 0;
+            padding: 0;
+        }
+
+        table {
+            border-spacing: 0;
+            border-collapse: collapse;
+        }
+
+        table th,
+        table td {
+            padding: 3px 0 3px 0;
+        }
+
+        .body {
+            width: 100%;
+        }
+
+        p, a, h1, h2, h3, ul, ol, li {
+            font-family: Helvetica, Arial, sans-serif;
+            font-weight: normal;
+            margin: 0;
+            padding: 0;
+        }
+
+        p {
+            font-size: 14px;
+            line-height: 19px;
+        }
+
+        a {
+            color: #3294b1;
+        }
+
+        h1 {
+            font-size: 36px;
+            margin: 15px 0 5px 0;
+        }
+
+        h2 {
+            font-size: 32px;
+        }
+
+        h3 {
+            font-size: 28px;
+        }
+
+        ul, ol {
+            margin: 0 0 0 25px;
+            padding: 0;
+        }
+
+        .btn {
+            background: #2ba6cb !important;
+            border: 1px solid #2284a1;
+            padding: 10px 20px 10px 20px;
+            text-align: center;
+        }
+
+        .btn:hover {
+            background: #2795b6 !important;
+        }
+
+        .btn a {
+            color: #FFFFFF;
+            text-decoration: none;
+            font-weight: bold;
+            padding: 10px 20px 10px 20px;
+        }
+
+        .tableBordered {
+            border-top: 1px solid #b9e5ff;
+        }
+
+        .tableBordered th {
+            background: #ECF8FF;
+        }
+
+        .tableBordered th p {
+            font-weight: bold;
+            color: #3294b1;
+        }
+
+        .tableBordered th,
+        .tableBordered td {
+            color: #333333;
+            border-bottom: 1px solid #b9e5ff;
+            text-align: center;
+            padding-bottom: 5px;
+        }
+
+        .panel {
+            height: 100px;
+        }
+    </style>
+</head>
+<body>
+<table class="body">
+    <tr>
+        <td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
+            <!-- Header -->
+            <table width="580">
+                <tr>
+                    <td style="padding: 0 0 0 0;" align="left">
+                        <p style="color:#FFFFFF;font-weight: bold; font-size: 22px">Eagle Alert Notification</p>
+                    </td>
+                </tr>
+            </table>
+        </td>
+    </tr>
+    <tr>
+        <td align="center" valign="top">
+            <!-- Eagle Body -->
+            <table width="580">
+                <tr>
+                    <!-- Title -->
+                    <td align="center">
+                        <h2>Warning: $alert["alertSubject"]</h2>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Time -->
+                    <td>
+                        <table width="580">
+                            <tr>
+                                <td>
+                                    <p><b>Detected Time: $alert["alertTime"]</b></p>
+                                </td>
+                                #set ( $alertSeverity = $alert["alertSeverity"] )
+                                #if (!$alertSeverity || ("$alertSeverity" == ""))
+                                    #set ( $alert["alertSeverity"] = "WARNING")
+                                #end
+                                <td align="right">
+                                    <p><b>
+                                        Severity:
+                                        #if ($alert["alertSeverity"] == "WARNING")
+                                            <span>$alert["alertSeverity"]</span>
+                                        #else
+                                            <span style="color: #FF0000;">$alert["alertSeverity"]</span>
+                                        #end
+                                    </b></p>
+                                </td>
+                            </tr>
+                        </table>
+                    </td>
+                </tr>
+
+                <tr>
+                    <!-- Basic Information -->
+                    <td style="padding: 20px 0 10px 0;">
+                        <p><b>Alert Message </b></p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Description -->
+                    <td valign="top"
+                        style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 20px;">
+                        <p>$alert["alertBody"]</p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Basic Information -->
+                    <td style="padding: 20px 0 10px 0;">
+                        <p><b>Alert Detail</b></p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Basic Information Content -->
+                    <td>
+                        <table class="tableBordered" width="580">
+                            <tr>
+                                <th>
+                                    <p>Policy Name</p>
+                                </th>
+                                <td>
+                                    <p><a href="$alert["policyDetailUrl"]">$alert["policyId"]</a></p>
+                                </td>
+                            </tr>
+                            <tr>
+                                <th>
+                                    <p>Severity Level</p>
+                                </th>
+                                <td>
+                                    <p>$alert["alertSeverity"]</p>
+                                </td>
+                            </tr>
+                            <tr>
+                                <th>
+                                    <p>Alert Stream</p>
+                                </th>
+                                <td>
+                                    <p>$alert["streamId"]</p>
+                                </td>
+                            </tr>
+                            <tr>
+                                <th>
+                                    <p>Created Time</p>
+                                </th>
+                                <td>
+                                    <p>$alert["alertTime"]</p>
+                                </td>
+                            </tr>
+                            <tr>
+                                <th>
+                                    <p>Created By</p>
+                                </th>
+                                <td>
+                                    <p>$alert["creator"]</p>
+                                </td>
+                            </tr>
+                        </table>
+                    </td>
+                </tr>
+##                <tr>
+##                    <!-- View Detail -->
+##                    <td align="center" style="padding: 10px 0 0 0;">
+##                        <table width="580">
+##                            <tr>
+##                                <td class="btn">
+##                                    <a href="$alert["policyDetailUrl"]">View Policy Details</a>
+##                                </td>
+##                            </tr>
+##                        </table>
+##                    </td>
+##                </tr>
+
+                <tr>
+                    <!-- View Detail -->
+                    <td align="center" style="padding: 10px 0 0 0;">
+                        <table width="580">
+                            <tr>
+                                <td class="btn">
+                                    <a href="$alert["alertDetailUrl"]">View Alert on Eagle</a>
+                                </td>
+                            </tr>
+                        </table>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Actions Required -->
+                    <td style="padding: 20px 0 10px 0;">
+                        <p><b>Actions Required</b></p>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Possible Root Causes Content -->
+                    <td class="panel" valign="top"
+                        style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
+                        <p>
+                            The alert notification was automatically detected and sent by Eagle according to policy: $alert["policyId"].
+                            To follow-up on this, please verify the alert and diagnose the root cause with Eagle:
+                        </p>
+                        <p></p>
+                        <ul>
+                            <li><p><a href="$alert["alertDetailUrl"]">View alert detail</a></p></li>
+                            <li><p><a href="$alert["policyDetailUrl"]">View policy detail</a></p></li>
+                            <li><p><a href="$alert["homeUrl"]">View eagle home</a></p></li>
+                        </ul>
+                    </td>
+                </tr>
+                <tr>
+                    <!-- Copyright -->
+                    <td align="center">
+                        <p><i>Powered by <a href="http://eagle.incubator.apache.org">Apache Eagle</a> (version: $alert["version"])</i></p>
+                    </td>
+                </tr>
+            </table>
+        </td>
+    </tr>
+</table>
+</body>
+</html>
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
new file mode 100644
index 0000000..0e3d5fe
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_INLINED_TEMPLATE.vm
@@ -0,0 +1,259 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN"
+        "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+
+#set ( $alert = $alertList[0] )
+
+## Generate Alert Color
+#set($alertColor = "#337ab7")
+#if($alert["alertSeverity"] == "WARNING")
+    #set($alertColor = "#FF9F00")
+#elseif($alert["alertSeverity"] == "CRITICAL" || $alert["alertSeverity"] == "FETAL")
+    #set($alertColor = "#d43f3a")
+#elseif ($alert["alertSeverity"] == "OK")
+    #set($alertColor = "#68B90F")
+#end
+
+<html xmlns="http://www.w3.org/1999/xhtml"
+      style="font-family: 'Helvetica Neue', Helvetica, Arial, sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+<head>
+    <meta name="viewport" content="width=device-width"/>
+    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8"/>
+    <title>[$alert["alertSeverity"] $alert["alertSubject"]</title>
+    <style type="text/css">
+        img {
+            max-width: 100%;
+        }
+
+        body {
+            -webkit-font-smoothing: antialiased;
+            -webkit-text-size-adjust: none;
+            width: 100% !important;
+            height: 100%;
+            line-height: 1.6em;
+        }
+
+        body {
+            background-color: #f6f6f6;
+        }
+
+        @media only screen and (max-width: 640px) {
+            body {
+                padding: 0 !important;
+            }
+
+            h1 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h2 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h3 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h4 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+
+            h1 {
+                font-size: 22px !important;
+            }
+
+            h2 {
+                font-size: 18px !important;
+            }
+
+            h3 {
+                font-size: 16px !important;
+            }
+
+            .container {
+                padding: 0 !important;
+                width: 100% !important;
+            }
+
+            .content {
+                padding: 0 !important;
+            }
+
+            .content-wrap {
+                padding: 10px !important;
+            }
+
+            .invoice {
+                width: 100% !important;
+            }
+        }
+    </style>
+</head>
+
+<body itemscope itemtype="http://schema.org/EmailMessage"
+      style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; -webkit-font-smoothing: antialiased; -webkit-text-size-adjust: none; width: 100% !important; height: 100%; line-height: 1.6em; background-color: #f6f6f6; margin: 0;"
+      bgcolor="#f6f6f6">
+
+<table class="body-wrap"
+       style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; background-color: #f6f6f6; margin: 0;"
+       bgcolor="#f6f6f6">
+    <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+        <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0;"
+            valign="top"></td>
+        <td class="container" width="600"
+            style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; display: block !important; max-width: 600px !important; clear: both !important; margin: 0 auto;"
+            valign="top">
+            <div class="content"
+                 style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; max-width: 600px; display: block; margin: 0 auto; padding: 20px;">
+                <table class="main" width="100%" cellpadding="0" cellspacing="0"
+                       style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; border-radius: 3px; background-color: #fff; margin: 0; border: 1px solid #e9e9e9;"
+                       bgcolor="#fff">
+                    <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                        <td class="alert alert-warning"
+                            style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 16px; vertical-align: top; color: #fff; font-weight: 500; text-align: center; border-radius: 3px 3px 0 0; background-color: $alertColor; margin: 0; padding: 20px;"
+                            align="center" bgcolor="$alertColor" valign="top">
+                            <strong >$alert["alertSeverity"]: </strong>
+                            $alert["alertSubject"]
+                        </td>
+                    </tr>
+                    <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                        <td class="content-wrap"
+                            style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 20px;"
+                            valign="top">
+                            <table width="100%" cellpadding="0" cellspacing="0"
+                                   style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                            <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                <td class="content-block"
+                                    style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 0 0 20px;"
+                                    valign="top">
+                                    <small>CATEGORY:</small> <strong style="color: $alertColor">#if($alert["alertCategory"]) $alert["alertCategory"] #else N/A #end</strong> <small>TIME:</small> <strong>$alert["alertTime"]</strong>
+                                </td>
+                            </tr>
+                                <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                    <td class="content-block"
+                                        style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0 4px; padding: 10px 10px; background-color: #eee;"
+                                        valign="top">
+                                        <div style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif;  box-sizing: border-box; font-size: 14px; vertical-align: top;">
+                                            $alert["alertBody"]
+                                        </div>
+                                    </td>
+                                </tr>
+
+                                <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                    <td class="content-block aligncenter"
+                                        style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; text-align: center; margin: 0; padding: 0 0 10px;"
+                                        align="center" valign="top">
+                                        <table class="invoice"
+                                               style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; text-align: left; width: 90%; margin: 10px auto;">
+                                            <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                                <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 5px 0;"
+                                                    valign="top">
+                                                    <table class="invoice-items" cellpadding="0" cellspacing="0"
+                                                           style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; margin: 0;">
+                                                        <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                                            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 5px 0;"
+                                                                valign="top">
+                                                                Severity
+                                                            </td>
+                                                            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 5px 0;"
+                                                                valign="top"> $alert["alertSeverity"]
+                                                            </td>
+                                                        </tr>
+                                                        <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                                            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; border-top-width: 1px; border-top-color: #eee; border-top-style: solid; margin: 0; padding: 5px 0;"
+                                                                valign="top">Category
+                                                            </td>
+                                                            <td class="alignright"
+                                                                style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; text-align: left; border-top-width: 1px; border-top-color: #eee; border-top-style: solid; margin: 0; padding: 5px 0;"
+                                                                align="right" valign="top">
+                                                                #if($alert["alertCategory"])
+                                                                    $alert["alertCategory"]
+                                                                #else
+                                                                    N/A
+                                                                #end
+                                                            </td>
+                                                        </tr>
+                                                        <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                                            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; border-top-width: 1px; border-top-color: #eee; border-top-style: solid; margin: 0; padding: 5px 0;"
+                                                                valign="top">Cluster
+                                                            </td>
+                                                            <td class="alignright"
+                                                                style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; text-align: left; border-top-width: 1px; border-top-color: #eee; border-top-style: solid; margin: 0; padding: 5px 0;"
+                                                                align="right" valign="top">$alert["siteId"]
+                                                            </td>
+                                                        </tr>
+                                                        <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                                            <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; border-top-width: 1px; border-top-color: #eee; border-top-style: solid; margin: 0; padding: 5px 0;"
+                                                                valign="top">Policy
+                                                            </td>
+                                                            <td class="alignright"
+                                                                style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; text-align: left; border-top-width: 1px; border-top-color: #eee; border-top-style: solid; margin: 0; padding: 5px 0;"
+                                                                align="right" valign="top"><a
+                                                                    href="$alert["policyDetailUrl"]"
+                                                                    style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; color: #999; text-decoration: underline; margin: 0;">$alert[
+                                                                "policyId"]</a>
+                                                            </td>
+                                                        </tr>
+                                                    </table>
+                                                </td>
+                                            </tr>
+                                        </table>
+                                    </td>
+                                </tr>
+
+                                <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                                    <td class="content-block"
+                                        style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0; padding: 0 0 20px;"
+                                        valign="top">
+                                        <a href="$alert["alertDetailUrl"]" class="btn-primary"
+                                           style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; color: #FFF; text-decoration: none; line-height: 2em; font-weight: bold; text-align: center; cursor: pointer; display: inline-block; border-radius: 5px; text-transform: capitalize; background-color: $alertColor; margin: 0; border-color: $alertColor; border-style: solid; border-width: 10px 20px;">
+                                            View Alert Details
+                                        </a>
+                                    </td>
+                                </tr>
+                            </table>
+                        </td>
+                    </tr>
+                </table>
+                <div class="footer"
+                     style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; width: 100%; clear: both; color: #999; margin: 0; padding: 20px;">
+                    <table width="100%"
+                           style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                        <tr style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; margin: 0;">
+                            <td class="aligncenter content-block"
+                                style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 12px; vertical-align: top; color: #999; text-align: center; margin: 0; padding: 0 0 20px;"
+                                align="center" valign="top">
+                                Powered by <a href="http://eagle.incubator.apache.org"
+                                              style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 12px; color: #999; text-decoration: underline; margin: 0;">Apache
+                                Eagle</a> (version: $alert["version"])
+                            </td>
+                        </tr>
+                    </table>
+                </div>
+            </div>
+        </td>
+        <td style="font-family: 'Helvetica Neue',Helvetica,Arial,sans-serif; box-sizing: border-box; font-size: 14px; vertical-align: top; margin: 0;"
+            valign="top"></td>
+    </tr>
+</table>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
new file mode 100644
index 0000000..0eb5efc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_LIGHT_TEMPLATE.vm
@@ -0,0 +1,495 @@
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Transitional//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-transitional.dtd">
+<html xmlns="http://www.w3.org/1999/xhtml">
+
+    #set ( $alert = $alertList[0] )
+<head>
+    <meta name="viewport" content="width=device-width" />
+    <meta http-equiv="Content-Type" content="text/html; charset=UTF-8" />
+    <title>[$alert["alertSeverity"]$alert["alertSubject"]</title>
+    <style rel="stylesheet" type="text/css">
+        /* -------------------------------------
+            GLOBAL
+            A very basic CSS reset
+        ------------------------------------- */
+        * {
+            margin: 0;
+            font-family: "Helvetica Neue", Helvetica, Arial, sans-serif;
+            box-sizing: border-box;
+            font-size: 14px;
+        }
+
+        img {
+            max-width: 100%;
+        }
+
+        body {
+            -webkit-font-smoothing: antialiased;
+            -webkit-text-size-adjust: none;
+            width: 100% !important;
+            height: 100%;
+            line-height: 1.6em;
+            /* 1.6em * 14px = 22.4px, use px to get airier line-height also in Thunderbird, and Yahoo!, Outlook.com, AOL webmail clients */
+            /*line-height: 22px;*/
+        }
+
+        /* Let's make sure all tables have defaults */
+        table td {
+            vertical-align: top;
+        }
+
+        /* -------------------------------------
+            BODY & CONTAINER
+        ------------------------------------- */
+        body {
+            background-color: #f6f6f6;
+        }
+
+        .body-wrap {
+            background-color: #f6f6f6;
+            width: 100%;
+        }
+
+        .container {
+            display: block !important;
+            max-width: 600px !important;
+            margin: 0 auto !important;
+            /* makes it centered */
+            clear: both !important;
+        }
+
+        .content {
+            max-width: 600px;
+            margin: 0 auto;
+            display: block;
+            padding: 20px;
+        }
+
+        /* -------------------------------------
+            HEADER, FOOTER, MAIN
+        ------------------------------------- */
+        .main {
+            background-color: #fff;
+            border: 1px solid #e9e9e9;
+            border-radius: 3px;
+        }
+
+        .content-wrap {
+            padding: 20px;
+        }
+
+        .content-block {
+            padding: 0 0 20px;
+        }
+
+        .header {
+            width: 100%;
+            margin-bottom: 20px;
+        }
+
+        .footer {
+            width: 100%;
+            clear: both;
+            color: #999;
+            padding: 20px;
+        }
+        .footer p, .footer a, .footer td {
+            color: #999;
+            font-size: 12px;
+        }
+
+        /* -------------------------------------
+            TYPOGRAPHY
+        ------------------------------------- */
+        h1, h2, h3 {
+            font-family: "Helvetica Neue", Helvetica, Arial, "Lucida Grande", sans-serif;
+            color: #000;
+            margin: 40px 0 0;
+            line-height: 1.2em;
+            font-weight: 400;
+        }
+
+        h1 {
+            font-size: 32px;
+            font-weight: 500;
+            /* 1.2em * 32px = 38.4px, use px to get airier line-height also in Thunderbird, and Yahoo!, Outlook.com, AOL webmail clients */
+            /*line-height: 38px;*/
+        }
+
+        h2 {
+            font-size: 24px;
+            /* 1.2em * 24px = 28.8px, use px to get airier line-height also in Thunderbird, and Yahoo!, Outlook.com, AOL webmail clients */
+            /*line-height: 29px;*/
+        }
+
+        h3 {
+            font-size: 18px;
+            /* 1.2em * 18px = 21.6px, use px to get airier line-height also in Thunderbird, and Yahoo!, Outlook.com, AOL webmail clients */
+            /*line-height: 22px;*/
+        }
+
+        h4 {
+            font-size: 14px;
+            font-weight: 600;
+        }
+
+        p, ul, ol {
+            margin-bottom: 10px;
+            font-weight: normal;
+        }
+        p li, ul li, ol li {
+            margin-left: 5px;
+            list-style-position: inside;
+        }
+
+
+        /* -------------------------------------
+            LINKS & BUTTONS
+        ------------------------------------- */
+        a {
+            color: #348eda;
+            text-decoration: underline;
+        }
+
+        .btn-primary {
+            text-decoration: none;
+            color: #FFF;
+            background-color: #348eda;
+            border: solid #348eda;
+            border-width: 10px 20px;
+            line-height: 2em;
+            /* 2em * 14px = 28px, use px to get airier line-height also in Thunderbird, and Yahoo!, Outlook.com, AOL webmail clients */
+            /*line-height: 28px;*/
+            font-weight: bold;
+            text-align: center;
+            cursor: pointer;
+            display: inline-block;
+            border-radius: 5px;
+            text-transform: capitalize;
+        }
+
+        .btn-warning {
+            text-decoration: none;
+            color: #FFF;
+            background-color: #f0ad4e;
+            border-color: solid #eea236;
+            /*background-color: #348eda;*/
+            /*border: solid #348eda;*/
+            border-width: 10px 20px;
+            line-height: 2em;
+            /* 2em * 14px = 28px, use px to get airier line-height also in Thunderbird, and Yahoo!, Outlook.com, AOL webmail clients */
+            /*line-height: 28px;*/
+            font-weight: bold;
+            text-align: center;
+            cursor: pointer;
+            display: inline-block;
+            border-radius: 5px;
+            text-transform: capitalize;
+        }
+
+        .btn-danger {
+            text-decoration: none;
+            color: #fff;
+            background-color: #d9534f;
+            border-color: solid #d43f3a;
+            /*color: #FFF;*/
+            /*background-color: #348eda;*/
+            /*border: solid #348eda;*/
+            border-width: 10px 20px;
+            line-height: 2em;
+            /* 2em * 14px = 28px, use px to get airier line-height also in Thunderbird, and Yahoo!, Outlook.com, AOL webmail clients */
+            /*line-height: 28px;*/
+            font-weight: bold;
+            text-align: center;
+            cursor: pointer;
+            display: inline-block;
+            border-radius: 5px;
+            text-transform: capitalize;
+        }
+
+        .text-light {
+            color: #eee;
+        }
+
+        .text-primary {
+            color: #348eda;
+        }
+
+        .text-warning {
+            color: #eea236
+        }
+
+        .text-danger {
+            color: #d43f3a;
+        }
+
+        .label {
+            display: inline;
+            padding: .2em .6em .3em;
+            font-size: 75%;
+            font-weight: 700;
+            line-height: 1;
+            color: #fff;
+            text-align: center;
+            white-space: nowrap;
+            vertical-align: baseline;
+            border-radius: .25em;
+        }
+
+        .label-ok {
+            background-color: #777;
+        }
+        .label-default {
+            background-color: #777;
+        }
+        .label-primary {
+            background-color: #337ab7;
+        }
+        .label-critical {
+            background-color: #d9534f;
+        }
+        .label-warning {
+            background-color: #f0ad4e;
+        }
+
+        /* -------------------------------------
+            OTHER STYLES THAT MIGHT BE USEFUL
+        ------------------------------------- */
+        .last {
+            margin-bottom: 0;
+        }
+
+        .first {
+            margin-top: 0;
+        }
+
+        .aligncenter {
+            text-align: center;
+        }
+
+        .alignright {
+            text-align: right;
+        }
+
+        .alignleft {
+            text-align: left;
+        }
+
+        .clear {
+            clear: both;
+        }
+
+        /* -------------------------------------
+            ALERTS
+            Change the class depending on warning email, good email or bad email
+        ------------------------------------- */
+        .alert {
+            font-size: 16px;
+            color: #fff;
+            font-weight: 500;
+            padding: 20px;
+            text-align: center;
+            border-radius: 3px 3px 0 0;
+        }
+        .alert a {
+            color: #fff;
+            text-decoration: none;
+            font-weight: 500;
+            font-size: 16px;
+        }
+        .alert.alert-warning {
+            background-color: #FF9F00;
+        }
+        .alert.alert-bad {
+            background-color: #D0021B;
+        }
+        .alert.alert-critical {
+            background-color: #D0021B;
+        }
+        .alert.alert-fetal {
+            background-color: #D0021B;
+        }
+        .alert.alert-ok {
+            background-color: #68B90F;
+        }
+
+        .alert-body {
+            margin-bottom: 10px;
+            padding: 8px;
+            background-color: #fff;
+            border: 1px solid transparent;
+            border-radius: 4px;
+            -webkit-box-shadow: 0 1px 1px rgba(0,0,0,.05);
+            box-shadow: 0 1px 1px rgba(0,0,0,.05);
+        }
+        .alert-body-default {
+            border-color: #eee;
+        }
+        .alert-body-primary {
+            border-color: #337ab7;
+        }
+        .alert-body-warning {
+            border-color: #faebcc;
+        }
+        .alert-body-danger {
+            border-color: #ebccd1;
+        }
+        /* -------------------------------------
+            INVOICE
+            Styles for the billing table
+        ------------------------------------- */
+        .invoice {
+            margin: 40px auto;
+            text-align: left;
+            width: 80%;
+        }
+        .invoice td {
+            padding: 5px 0;
+        }
+        .invoice .invoice-items {
+            width: 100%;
+        }
+        .invoice .invoice-items td {
+            border-bottom: #eee 1px solid;
+        }
+        .invoice .invoice-items .total td {
+            border-top: 2px solid #333;
+            border-bottom: 2px solid #333;
+            font-weight: 700;
+        }
+
+        /* -------------------------------------
+            RESPONSIVE AND MOBILE FRIENDLY STYLES
+        ------------------------------------- */
+        @media only screen and (max-width: 640px) {
+            body {
+                padding: 0 !important;
+            }
+            h1, h2, h3, h4 {
+                font-weight: 800 !important;
+                margin: 20px 0 5px !important;
+            }
+            h1 {
+                font-size: 22px !important;
+            }
+
+            h2 {
+                font-size: 18px !important;
+            }
+            h3 {
+                font-size: 16px !important;
+            }
+            .container {
+                padding: 0 !important;
+                width: 100% !important;
+            }
+            .content {
+                padding: 0 !important;
+            }
+            .content-wrap {
+                padding: 10px !important;
+            }
+            .invoice {
+                width: 100% !important;
+            }
+        }
+        /*# sourceMappingURL=styles.css.map */
+    </style>
+</head>
+
+<body itemscope itemtype="http://schema.org/EmailMessage">
+
+<table class="body-wrap">
+    <tr>
+        <td></td>
+        <td class="container" width="600">
+            <div class="content">
+                <table class="main" width="100%" cellpadding="0" cellspacing="0">
+                    <tr>
+                        <td class="alert alert-$alert["alertSeverity"].toLowerCase()">
+                            <strong>$alert["alertSeverity"]</strong> $alert["alertSubject"]
+                        </td>
+                    </tr>
+                    <tr>
+                        <td class="content-wrap">
+                            <table width="100%" cellpadding="0" cellspacing="0">
+                                <tr>
+                                    <td class="content-block">
+                                        <table>
+                                            <tbody>
+                                                <tr>
+                                                    <td>Category:</td>
+                                                    <td><strong>$alert["alertCategory"]</strong></td>
+                                                    <td>Time:</td>
+                                                    <td><strong>$alert["alertTime"]</strong></td>
+                                                </tr>
+                                            </tbody>
+                                        </table>
+                                    </td>
+                                </tr>
+                                <tr>
+                                    <td class="content-block">
+                                        <div class="alert-body alert-body-default">
+                                        $alert["alertBody"]
+                                        </div>
+                                    </td>
+                                </tr>
+                                <tr>
+                                    <td class="content-block invoice">
+                                        <table class="invoice-items" cellpadding="0" cellspacing="0">
+                                            <tbody>
+                                                <tr>
+                                                    <td>Cluster</td>
+                                                    <td>$alert["siteId"]</td>
+                                                </tr>
+                                                <tr>
+                                                    <td>Policy</td>
+                                                    <td><a href="$alert["policyDetailUrl"]">$alert["policyId"]</a></td>
+                                                </tr>
+                                            </tbody>
+                                        </table>
+                                    </td>
+                                </tr>
+                                <tr>
+                                    <td class="content-block">
+                                        <a href="$alert["alertDetailUrl"]" class="btn-primary">View Alert Details</a>
+                                    </td>
+                                </tr>
+                                <tr>
+                                    <td class="content-block">
+                                        <i>Note: The alert was automatically detected by <a href="#">Eagle</a>.</i>
+                                    </td>
+                                </tr>
+                            </table>
+                        </td>
+                    </tr>
+                </table>
+                <div class="footer">
+                    <table width="100%">
+                        <tr>
+                            <td class="aligncenter content-block">Powered by <a href="http://eagle.incubator.apache.org">Apache Eagle</a> (version: $alert["version"])</td>
+                        </tr>
+                    </table>
+                </div>
+            </div>
+        </td>
+        <td></td>
+    </tr>
+</table>
+</body>
+</html>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
index ee865b3..bd06c9b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/AlertPublisherTestHelper.java
@@ -16,21 +16,20 @@
  */
 package org.apache.eagle.alert.engine.publisher;
 
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamColumn;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.dedup.DedupCache;
 import org.junit.Assert;
 
 import java.util.Arrays;
+import java.util.HashMap;
 
 public class AlertPublisherTestHelper {
 
     public static AlertStreamEvent mockEvent(String policyId){
         StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicy(stream.getStreamId(), policyId);
+        PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), policyId);
         return  createEvent(stream, policy,
             new Object[] {System.currentTimeMillis(), "host1", "testPolicy-host1-01", "open", 0, 0});
     }
@@ -42,8 +41,15 @@ public class AlertPublisherTestHelper {
         event.setStreamId(stream.getStreamId());
         event.setTimestamp(System.currentTimeMillis());
         event.setCreatedTime(System.currentTimeMillis());
+        event.setSubject("Namenode Disk Used 98%");
+        event.setBody("Disk Usage of Test cluster's name node (<a href=\"#\">namenode.hostname.domain</a>) is <strong style=\"color: red\">98%</strong> at <strong>2016-11-30 12:30:45</strong>, exceeding alert threshold <strong>90</strong>%");
         event.setData(data);
         event.ensureAlertId();
+        event.setSeverity(AlertSeverity.CRITICAL);
+        event.setCategory("HDFS");
+        event.setContext(new HashMap<String,Object>(){{
+            put(AlertPublishEvent.SITE_ID_KEY,"TestCluster");
+        }});
         Assert.assertNotNull(event.getAlertId());
         return event;
     }
@@ -84,7 +90,7 @@ public class AlertPublisherTestHelper {
         return sd;
     }
 
-    public static  PolicyDefinition createPolicy(String streamName, String policyName) {
+    public static  PolicyDefinition createPolicyGroupByStreamId(String streamName, String policyName) {
         PolicyDefinition pd = new PolicyDefinition();
         PolicyDefinition.Definition def = new PolicyDefinition.Definition();
         // expression, something like "PT5S,dynamic,1,host"
@@ -103,4 +109,5 @@ public class AlertPublisherTestHelper {
         pd.addPartition(sp);
         return pd;
     }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
index 6e59fa2..c48df9a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDedupWithoutStateTest.java
@@ -46,7 +46,7 @@ public class DefaultDedupWithoutStateTest {
             "PT10S", Arrays.asList(new String[] {"alertKey"}), null, null, dedupCache);
 
         StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+        PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), "testPolicy");
 
         int[] hostIndex = new int[] {1, 2, 3};
         String[] states = new String[] {"OPEN", "WARN", "CLOSE"};

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
index 700e3ee..297b790 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/dedup/DefaultDeduplicatorTest.java
@@ -45,7 +45,7 @@ public class DefaultDeduplicatorTest {
             "PT1M", Arrays.asList(new String[] {"alertKey"}), "state", "close", dedupCache);
 
         StreamDefinition stream = createStream();
-        PolicyDefinition policy = createPolicy(stream.getStreamId(), "testPolicy");
+        PolicyDefinition policy = createPolicyGroupByStreamId(stream.getStreamId(), "testPolicy");
 
         AlertStreamEvent e1 = createEvent(stream, policy, new Object[] {
             System.currentTimeMillis(), "host1", "testPolicy-host1-01", "OPEN", 0, 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
new file mode 100644
index 0000000..b893213
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngineTest.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.List;
+
+public class VelocityAlertTemplateEngineTest {
+    @Test
+    public void testVelocityAlertTemplate () {
+        AlertTemplateEngine templateEngine = new VelocityAlertTemplateEngine();
+        templateEngine.init(ConfigFactory.load());
+        templateEngine.register(mockPolicy("testPolicy"));
+        AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicy"));
+        Assert.assertEquals("Alert (2016-11-30 07:31:15): cpu usage on hadoop of cluster test_cluster at localhost is 0.98, " +
+            "exceeding thread hold: 90%. (policy: testPolicy, description: Policy for monitoring cpu usage > 90%), " +
+            "definition: from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
+            "select site, metric, host, role, value insert into capacityUsageAlert", event.getBody());
+        Assert.assertEquals("Name Node Usage Exceed 90%, reach 98.0% now", event.getSubject());
+    }
+
+    @Test
+    public void testVelocityAlertTemplateWithoutTemplate () {
+        AlertTemplateEngine templateEngine = new VelocityAlertTemplateEngine();
+        templateEngine.init(ConfigFactory.load());
+        templateEngine.register(mockPolicyWithoutTemplate("testPolicyName"));
+        AlertStreamEvent event = templateEngine.filter(mockAlertEvent("testPolicyName"));
+        System.out.print(event.getBody());
+        Assert.assertEquals("Message: Alert {stream=ALERT_STREAM,timestamp=2016-11-30 07:31:15,923," +
+            "data={site=test_cluster, role=hadoop, metric=cpu.usage, host=localhost, value=0.98}, " +
+            "policyId=testPolicyName, createdBy=junit, metaVersion=SAMPLE_META_VERSION} " +
+            "(Auto-generated alert message as template not defined in policy testPolicyName)", event.getBody());
+        Assert.assertEquals("testPolicyName", event.getSubject());
+    }
+
+    private static PolicyDefinition mockPolicy (String policyId) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
+            "select site, metric, host, role, value insert into capacityUsageAlert");
+        def.setType("siddhi");
+        pd.setDefinition(def);
+        pd.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM"));
+        pd.setOutputStreams(Collections.singletonList("capacityUsageAlert"));
+        pd.setName(policyId);
+        pd.setDescription("Policy for monitoring cpu usage > 90%");
+        AlertDefinition alertDefinition = new AlertDefinition();
+        alertDefinition.setSubject("Name Node Usage Exceed 90%, reach #set($usage_per = $value * 100)$usage_per% now");
+        alertDefinition.setBody("Alert ($CREATED_TIME): cpu usage on $role of cluster $site at $host is $value, exceeding thread hold: 90%. "
+                + "(policy: $POLICY_ID, description: $POLICY_DESC), definition: $POLICY_DEFINITION");
+        pd.setAlertDefinition(alertDefinition);
+        return pd;
+    }
+    private static PolicyDefinition mockPolicyWithoutTemplate (String policyId) {
+        PolicyDefinition pd = new PolicyDefinition();
+        PolicyDefinition.Definition def = new PolicyDefinition.Definition();
+        def.setValue("from HADOOP_JMX_METRIC_STREAM[site == \"test_cluster\" and metric == \"cpu.usage\" and value > 0.9] " +
+            "select site, metric, host, role, value insert into capacityUsageAlert");
+        def.setType("siddhi");
+        pd.setDefinition(def);
+        pd.setInputStreams(Collections.singletonList("HADOOP_JMX_METRIC_STREAM"));
+        pd.setOutputStreams(Collections.singletonList("capacityUsageAlert"));
+        pd.setName(policyId);
+        pd.setDescription("Policy for monitoring cpu usage > 90%");
+        return pd;
+    }
+
+    private AlertStreamEvent mockAlertEvent (String policyId) {
+        AlertStreamEvent event = new AlertStreamEvent();
+        event.setCreatedBy("junit");
+        event.setCreatedTime(1480491075923L);
+        event.setPolicyId(policyId);
+        event.setStreamId("ALERT_STREAM");
+        event.setSchema(mockAlertStreamDefinition("ALERT_STREAM"));
+        event.setMetaVersion("SAMPLE_META_VERSION");
+        event.setTimestamp(1480491075923L);
+        event.setData(new Object[]{"test_cluster", "cpu.usage", "localhost", "hadoop", 0.98});
+        event.ensureAlertId();
+        return event;
+    }
+
+    private StreamDefinition mockAlertStreamDefinition(String streamId){
+        StreamDefinition streamDefinition = new StreamDefinition();
+        streamDefinition.setStreamId(streamId);
+        streamDefinition.setSiteId("test_cluster");
+        List<StreamColumn> columns = new ArrayList<>();
+        StreamColumn column = new StreamColumn();
+        column.setName("site");
+        column.setType(StreamColumn.Type.STRING);
+        columns.add(column);
+        column = new StreamColumn();
+        column.setName("metric");
+        column.setType(StreamColumn.Type.STRING);
+        columns.add(column);
+        column = new StreamColumn();
+        column.setName("host");
+        column.setType(StreamColumn.Type.STRING);
+        columns.add(column);
+        column = new StreamColumn();
+        column.setName("role");
+        column.setType(StreamColumn.Type.STRING);
+        columns.add(column);
+        column = new StreamColumn();
+        column.setName("value");
+        column.setType(StreamColumn.Type.STRING);
+        columns.add(column);
+
+        streamDefinition.setColumns(columns);
+        return streamDefinition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java
new file mode 100644
index 0000000..269cb71
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParserTest.java
@@ -0,0 +1,65 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.template;
+
+import org.apache.velocity.exception.MethodInvocationException;
+import org.apache.velocity.exception.ParseErrorException;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.HashMap;
+import java.util.Map;
+
+public class VelocityTemplateParserTest {
+    @Test
+    public void testParseVelocityTemplate() {
+        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
+        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
+        Assert.assertEquals(5, parser.getReferenceNames().size());
+        Assert.assertArrayEquals(new String[]{"category", "reason", "REASON", "source", "created_time"}, parser.getReferenceNames().toArray());
+    }
+
+
+    @Test(expected = ParseErrorException.class)
+    public void testParseInvalidVelocityTemplate() {
+        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time #if() #fi";
+        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
+        Assert.assertEquals(5, parser.getReferenceNames().size());
+        Assert.assertArrayEquals(new String[]{"category", "reason", "REASON", "source", "created_time"}, parser.getReferenceNames().toArray());
+    }
+
+    @Test
+    public void testValidateVelocityContext() {
+        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
+        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
+        Map<String,Object> context = new HashMap<>();
+        context.put("category", "UNKNOWN");
+        context.put("reason", "timeout");
+        context.put("REASON", "IO error");
+        context.put("source","localhost");
+        context.put("created_time", "2016-11-30 05:52:47,053");
+        parser.validateContext(context);
+    }
+
+    @Test(expected = MethodInvocationException.class)
+    public void testValidateInvalidVelocityContext() {
+        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
+        VelocityTemplateParser parser = new VelocityTemplateParser(templateString);
+        parser.validateContext(new HashMap<>());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java
new file mode 100644
index 0000000..b67f394
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateTest.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.parser.node.ASTReference;
+import org.apache.velocity.runtime.parser.node.ASTprocess;
+import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
+import org.apache.velocity.runtime.resource.util.StringResourceRepository;
+import org.apache.velocity.runtime.visitor.NodeViewMode;
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.stream.Collectors;
+
+public class VelocityTemplateTest {
+    private static final Logger LOG = LoggerFactory.getLogger(VelocityTemplateTest.class);
+
+    @Test
+    public void testVelocityTemplate() {
+        String templateString = "This alert ($category) was generated because $reason and $REASON from $source at $created_time";
+        String resultString = "This alert ($category) was generated because timeout and IO error from localhost at 2016-11-30 05:52:47,053";
+        VelocityEngine engine = new VelocityEngine();
+        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
+        engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
+        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
+        engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
+        engine.addProperty("string.resource.loader.repository.static", "false");
+        // engine.addProperty("runtime.references.strict", "true");
+        engine.init();
+
+        StringResourceRepository repo = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
+        repo.putStringResource("alert_template", "");
+        repo.putStringResource("alert_template", templateString);
+
+        Assert.assertEquals(templateString, repo.getStringResource("alert_template").getBody());
+
+        VelocityContext context = new VelocityContext();
+        context.put("reason", "timeout");
+        context.put("REASON", "IO error");
+        context.put("source","localhost");
+        context.put("created_time", "2016-11-30 05:52:47,053");
+
+        Template velocityTemplate = engine.getTemplate("alert_template");
+        ASTprocess data = (ASTprocess) velocityTemplate.getData();
+        ReferenceContext referenceContext = new ReferenceContext();
+        data.jjtAccept(referenceContext,null);
+        Assert.assertEquals(5, referenceContext.getReferences().size());
+        StringWriter writer = new StringWriter();
+        velocityTemplate.merge(context, writer);
+        velocityTemplate.process();
+        Assert.assertEquals(resultString, writer.toString());
+    }
+
+    private class ReferenceContext extends NodeViewMode {
+        private List<ASTReference> references = new ArrayList<>();
+
+        @Override
+        public Object visit(ASTReference node, Object data) {
+            references.add(node);
+            return super.visit(node, data);
+        }
+
+        public List<ASTReference> getReferences() {
+            return this.references;
+        }
+
+        public List<String> getReferenceNames() {
+            return this.references.stream().map(ASTReference::getRootString).collect(Collectors.toList());
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
index 13080a1..eaa9ea0 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamProvider.java
@@ -19,6 +19,7 @@ package org.apache.eagle.app.messaging;
 
 import backtype.storm.spout.Scheme;
 import com.typesafe.config.Config;
+import org.apache.commons.lang3.StringUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -59,19 +60,19 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk
         KafkaStreamSinkConfig sinkConfig = new KafkaStreamSinkConfig();
         sinkConfig.setTopicId(getSinkTopicName(streamId,config));
         sinkConfig.setBrokerList(config.getString("dataSinkConfig.brokerList"));
-        sinkConfig.setSerializerClass(config.hasPath("dataSinkConfig.serializerClass")
+        sinkConfig.setSerializerClass(hasNonBlankConfigPath(config, "dataSinkConfig.serializerClass")
             ? config.getString("dataSinkConfig.serializerClass") : "kafka.serializer.StringEncoder");
-        sinkConfig.setKeySerializerClass(config.hasPath("dataSinkConfig.keySerializerClass")
+        sinkConfig.setKeySerializerClass(hasNonBlankConfigPath(config, "dataSinkConfig.keySerializerClass")
             ? config.getString("dataSinkConfig.keySerializerClass") : "kafka.serializer.StringEncoder");
 
         // new added properties for async producer
-        sinkConfig.setNumBatchMessages(config.hasPath("dataSinkConfig.numBatchMessages")
+        sinkConfig.setNumBatchMessages(hasNonBlankConfigPath(config, "dataSinkConfig.numBatchMessages")
             ? config.getString("dataSinkConfig.numBatchMessages") : "1024");
-        sinkConfig.setProducerType(config.hasPath("dataSinkConfig.producerType")
+        sinkConfig.setProducerType(hasNonBlankConfigPath(config, "dataSinkConfig.producerType")
             ? config.getString("dataSinkConfig.producerType") : "async");
-        sinkConfig.setMaxQueueBufferMs(config.hasPath("dataSinkConfig.maxQueueBufferMs")
+        sinkConfig.setMaxQueueBufferMs(hasNonBlankConfigPath(config, "dataSinkConfig.maxQueueBufferMs")
             ? config.getString("dataSinkConfig.maxQueueBufferMs") : "3000");
-        sinkConfig.setRequestRequiredAcks(config.hasPath("dataSinkConfig.requestRequiredAcks")
+        sinkConfig.setRequestRequiredAcks(hasNonBlankConfigPath(config, "dataSinkConfig.requestRequiredAcks")
             ? config.getString("dataSinkConfig.requestRequiredAcks") : "1");
 
         return sinkConfig;
@@ -82,6 +83,10 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk
         return new KafkaStreamSink();
     }
 
+    private boolean hasNonBlankConfigPath(Config config, String configName) {
+        return config.hasPath(configName) && StringUtils.isNotBlank(config.getString(configName));
+    }
+
     @Override
     public KafkaStreamSourceConfig getSourceConfig(String streamId, Config config) {
         KafkaStreamSourceConfig sourceConfig = new KafkaStreamSourceConfig();
@@ -89,31 +94,31 @@ public class KafkaStreamProvider implements StreamProvider<KafkaStreamSink, Kafk
         sourceConfig.setTopicId(getSourceTopicName(streamId,config));
         sourceConfig.setBrokerZkQuorum(config.getString("dataSourceConfig.zkConnection"));
 
-        if (config.hasPath("dataSourceConfig.fetchSize")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.fetchSize")) {
             sourceConfig.setFetchSize(config.getInt("dataSourceConfig.fetchSize"));
         }
-        if (config.hasPath("dataSourceConfig.transactionZKRoot")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.transactionZKRoot")) {
             sourceConfig.setTransactionZKRoot(config.getString("dataSourceConfig.transactionZKRoot"));
         }
-        if (config.hasPath("dataSourceConfig.consumerGroupId")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.consumerGroupId")) {
             sourceConfig.setConsumerGroupId(config.getString("dataSourceConfig.consumerGroupId"));
         }
-        if (config.hasPath("dataSourceConfig.brokerZkPath")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.brokerZkPath")) {
             sourceConfig.setBrokerZkPath(config.getString("dataSourceConfig.brokerZkPath"));
         }
-        if (config.hasPath("dataSourceConfig.txZkServers")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.txZkServers")) {
             sourceConfig.setTransactionZkServers(config.getString("dataSourceConfig.txZkServers"));
         }
-        if (config.hasPath("dataSourceConfig.transactionStateUpdateMS")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.transactionStateUpdateMS")) {
             sourceConfig.setTransactionStateUpdateMS(config.getLong("dataSourceConfig.transactionStateUpdateMS"));
         }
-        if (config.hasPath("dataSourceConfig.startOffsetTime")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.startOffsetTime")) {
             sourceConfig.setStartOffsetTime(config.getInt("dataSourceConfig.startOffsetTime"));
         }
-        if (config.hasPath("dataSourceConfig.forceFromStart")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.forceFromStart")) {
             sourceConfig.setForceFromStart(config.getBoolean("dataSourceConfig.forceFromStart"));
         }
-        if (config.hasPath("dataSourceConfig.schemeCls")) {
+        if (hasNonBlankConfigPath(config, "dataSourceConfig.schemeCls")) {
             try {
                 sourceConfig.setSchemaClass((Class<? extends Scheme>) Class.forName(config.getString("dataSourceConfig.schemeCls")));
             } catch (ClassNotFoundException e) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
index 9edf644..5cc5145 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSource.java
@@ -22,6 +22,7 @@ import backtype.storm.spout.SpoutOutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import com.google.common.base.Preconditions;
+import org.apache.commons.lang3.StringUtils;
 import org.apache.eagle.alert.engine.spout.SchemeBuilder;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -115,7 +116,7 @@ public class KafkaStreamSource extends StormStreamSource<KafkaStreamSourceConfig
         String brokerZkPath = config.getBrokerZkPath();
 
         BrokerHosts hosts;
-        if (brokerZkPath == null) {
+        if (StringUtils.isNotBlank(brokerZkPath)) {
             hosts = new ZkHosts(zkConnString);
         } else {
             hosts = new ZkHosts(zkConnString, brokerZkPath);
@@ -127,9 +128,9 @@ public class KafkaStreamSource extends StormStreamSource<KafkaStreamSourceConfig
             groupId);
 
         // transaction zkServers to store kafka consumer offset. Default to use storm zookeeper
-        if (config.getTransactionZkServers() != null) {
+        if (StringUtils.isNotBlank(config.getTransactionZkServers())) {
             String[] txZkServers = config.getTransactionZkServers().split(",");
-            spoutConfig.zkServers = Arrays.asList(txZkServers).stream().map(server -> server.split(":")[0]).collect(Collectors.toList());
+            spoutConfig.zkServers = Arrays.stream(txZkServers).map(server -> server.split(":")[0]).collect(Collectors.toList());
             spoutConfig.zkPort = Integer.parseInt(txZkServers[0].split(":")[1]);
             LOG.info("txZkServers:" + spoutConfig.zkServers + ", zkPort:" + spoutConfig.zkPort);
         }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
index 5a9c162..e6fdb83 100644
--- a/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
+++ b/eagle-core/eagle-app/eagle-app-base/src/main/java/org/apache/eagle/app/messaging/KafkaStreamSourceConfig.java
@@ -19,8 +19,6 @@ package org.apache.eagle.app.messaging;
 import org.apache.eagle.metadata.model.StreamSourceConfig;
 
 public class KafkaStreamSourceConfig implements StreamSourceConfig {
-
-    private static final String DEFAULT_CONFIG_PREFIX = "dataSourceConfig";
     private static final String DEFAULT_CONSUMER_GROUP_ID = "eagleKafkaSource";
     private static final String DEFAULT_TRANSACTION_ZK_ROOT = "/consumers";
     private static final Class<? extends backtype.storm.spout.Scheme> DEFAULT_KAFKA_SCHEMA = JsonSchema.class;
@@ -34,7 +32,7 @@ public class KafkaStreamSourceConfig implements StreamSourceConfig {
     private int fetchSize = 1048576;
     private String transactionZKRoot = DEFAULT_TRANSACTION_ZK_ROOT;
     private String consumerGroupId = DEFAULT_CONSUMER_GROUP_ID;
-    private String brokerZkPath = null;
+    private String brokerZkPath = "/brokers";
     private long transactionStateUpdateMS = 2000;
     private int startOffsetTime = -1;
     private boolean forceFromStart = false;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntity.java b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntity.java
index fa85496..1a09a2d 100644
--- a/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntity.java
+++ b/eagle-core/eagle-metadata/eagle-metadata-base/src/main/java/org/apache/eagle/metadata/model/AlertEntity.java
@@ -42,6 +42,10 @@ public class AlertEntity extends TaggedLogAPIEntity {
     private String policyValue;
     @Column("c")
     private Map<String, Object> alertData;
+    @Column("d")
+    private String alertSubject;
+    @Column("e")
+    private String alertBody;
 
     public List<String> getAppIds() {
         return appIds;
@@ -70,4 +74,21 @@ public class AlertEntity extends TaggedLogAPIEntity {
         valueChanged("alertData");
     }
 
+    public String getAlertBody() {
+        return alertBody;
+    }
+
+    public void setAlertBody(String alertBody) {
+        this.alertBody = alertBody;
+        valueChanged("alertBody");
+    }
+
+    public String getAlertSubject() {
+        return alertSubject;
+    }
+
+    public void setAlertSubject(String alertSubject) {
+        this.alertSubject = alertSubject;
+        valueChanged("alertSubject");
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
----------------------------------------------------------------------
diff --git a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
index 07270a5..87b57d6 100644
--- a/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
+++ b/eagle-hadoop-metric/src/main/resources/META-INF/providers/org.apache.eagle.metric.HadoopMetricMonitorAppProdiver.xml
@@ -25,7 +25,7 @@
         <property>
             <name>dataSinkConfig.HADOOP_JMX_METRIC_STREAM.topic</name>
             <displayName>JMX Metric Kafka Topic</displayName>
-            <value>hadoop_jmx_metric_{SITE_ID}</value>
+            <value>hadoop_jmx_metric</value>
             <description>Hadoop JMX metric kafka topic name for stream: HADOOP_JMX_METRIC_STREAM</description>
             <required>true</required>
         </property>
@@ -36,6 +36,54 @@
             <description>Sourced Kafka Brokers like broker1,broker2,...</description>
             <required>true</required>
         </property>
+        <property>
+            <name>dataSourceConfig.zkConnection</name>
+            <displayName>Zookeeper Quorum</displayName>
+            <value>localhost:2181</value>
+            <description>Kafka Zookeeper Quorum</description>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSourceConfig.fetchSize</name>
+            <displayName>Kafka Fetch Size</displayName>
+            <value>1048576</value>
+            <description>Kafka Fetch Size</description>
+            <required>false</required>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionZKRoot</name>
+            <displayName>Kafka Transaction Root Path</displayName>
+            <value>/consumers</value>
+            <description>Kafka Transaction Root Path</description>
+            <required>false</required>
+        </property>
+        <property>
+            <name>dataSourceConfig.consumerGroupId</name>
+            <displayName>Kafka Consumer Group ID</displayName>
+            <value>eagleConsumer</value>
+            <description>Kafka Consumer Group ID</description>
+            <required>false</required>
+        </property>
+        <property>
+            <name>dataSourceConfig.brokerZkPath</name>
+            <displayName>Kafka Broker ZkPath</displayName>
+            <description>Kafka Broker ZkPath</description>
+            <required>false</required>
+        </property>
+        <property>
+            <name>dataSourceConfig.txZkServers</name>
+            <displayName>Kafka Transaction Zookeeper Servers</displayName>
+            <description>Kafka Transaction Zookeeper Servers</description>
+            <value>localhost:2181</value>
+            <required>true</required>
+        </property>
+        <property>
+            <name>dataSourceConfig.transactionStateUpdateMS</name>
+            <value>2000</value>
+            <displayName>Kafka Transaction Status Update MS</displayName>
+            <description>Kafka Transaction Status Update MS</description>
+            <required>false</required>
+        </property>
     </configuration>
     <streams>
         <stream>


[3/3] incubator-eagle git commit: [EAGLE-815] Add eagle alert template, severity and category support

Posted by ha...@apache.org.
[EAGLE-815] Add eagle alert template, severity and category support

Support alert template to generate human readable message

# New Features
* Support to define alert template in PolicyDefinition
* Support to generate alert event (subject&body, instead of using AlertPublishEvent for less changes) based on policy'template and alert event as context
* Dynamically load policy metadata in alert publisher
* Integration VelocityAlertTemplateEngine in alertPublisherBolt with metadata lifecycle
* Support persist alert message in AlertEntity as immutable field.
* Refactor alert mail template using https://github.com/mailgun/transactional-email-templates
* Refactor Alert Template to become simple and human-readable.
* Add Alert Category
* Add Alert Severity (notification template color will change according to different severiy): UNKNOWN (blue), OK(green), WARNING (orange), CRITICAL/FATAL (dark black)
* Add Alert Subject &Body Template Engine, for example:
    * Sample Event:

          {
              "host": "localhost",
              "timestamp": 1480319108000,
              "metric": "hadoop.cpu.usage",
              "component": "namenode",
              "site": "test2",
              "value": 0.96
          }

    * Sample Subject \u201cRESOURCEMANAGER JMX Metric Alert\u201d is defined by:

           $component.toUpperCase() JMX Metric Alert

    * Sample Body is defined as:

            An alert happened on <strong>$component</strong> (<strong>$host</strong>) of cluster <strong>$site</strong> at <strong>$ALERT_TIME</strong> because <span style="color: red">$metric = $value</span>

* Add VelocityTemplateParser

# TODO
* Integrate velocity parser into policy validator, make sure the template only contains `global variable` and `alert stream schema fields`.
* Add alert definition preview

Author: Hao Chen <ha...@apache.org>

Closes #711 from haoch/EAGLE-815.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/1c81c086
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/1c81c086
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/1c81c086

Branch: refs/heads/master
Commit: 1c81c0865c70b9a0a91b86d7813be923e86e1dba
Parents: e5e215e
Author: Hao Chen <ha...@apache.org>
Authored: Mon Dec 5 14:12:49 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Mon Dec 5 14:12:49 2016 +0800

----------------------------------------------------------------------
 .../eagle/alert/app/AlertEagleStorePlugin.java  |  14 +-
 .../engine/coordinator/AlertDefinition.java     |  72 +++
 .../alert/engine/coordinator/AlertSeverity.java |  21 +
 .../engine/coordinator/PolicyDefinition.java    |  18 +
 .../alert/engine/model/AlertPublishEvent.java   |  81 ++-
 .../alert/engine/model/AlertStreamEvent.java    |  60 ++-
 .../engine/model/AlertPublishEventTest.java     |   2 +-
 .../engine/model/AlertStreamEventTest.java      |  17 +-
 .../interpreter/PolicyExecutionPlanner.java     |   3 +
 .../engine/publisher/AlertStreamFilter.java     |  26 +
 .../engine/publisher/PipeStreamFilter.java      |  46 ++
 .../engine/publisher/PublishConstants.java      |   7 +-
 .../publisher/email/AlertEmailGenerator.java    |  56 ++-
 .../publisher/impl/AlertEmailPublisher.java     |   4 +-
 .../publisher/template/AlertContextFields.java  |  43 ++
 .../publisher/template/AlertTemplateEngine.java |  48 ++
 .../template/AlertTemplateProvider.java         |  23 +
 .../template/VelocityAlertTemplateEngine.java   | 170 +++++++
 .../template/VelocityTemplateParser.java        |  95 ++++
 .../alert/engine/runner/AlertPublisherBolt.java | 128 +++--
 .../src/main/resources/ALERT_DEFAULT.vm         | 301 -----------
 .../main/resources/ALERT_DEFAULT_TEMPLATE.vm    | 301 +++++++++++
 .../main/resources/ALERT_INLINED_TEMPLATE.vm    | 259 ++++++++++
 .../src/main/resources/ALERT_LIGHT_TEMPLATE.vm  | 495 +++++++++++++++++++
 .../publisher/AlertPublisherTestHelper.java     |  19 +-
 .../dedup/DefaultDedupWithoutStateTest.java     |   2 +-
 .../dedup/DefaultDeduplicatorTest.java          |   2 +-
 .../VelocityAlertTemplateEngineTest.java        | 135 +++++
 .../template/VelocityTemplateParserTest.java    |  65 +++
 .../template/VelocityTemplateTest.java          |  95 ++++
 .../app/messaging/KafkaStreamProvider.java      |  35 +-
 .../eagle/app/messaging/KafkaStreamSource.java  |   7 +-
 .../app/messaging/KafkaStreamSourceConfig.java  |   4 +-
 .../eagle/metadata/model/AlertEntity.java       |  21 +
 ...le.metric.HadoopMetricMonitorAppProdiver.xml |  50 +-
 .../eagle/metric/SendSampleDataToKafka.java     |  15 +-
 .../resources/hadoop_jmx_metric_sample.json     |   2 +-
 .../test/resources/integrate_test_policy.json   |  37 ++
 .../history/storm/SparkHistoryJobParseBolt.java |   2 +-
 .../partials/alert/policyEdit/advancedMode.html |  25 +-
 .../app/dev/public/js/ctrls/alertEditCtrl.js    |   6 +
 41 files changed, 2386 insertions(+), 426 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
index a534012..30d2b78 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert-app/src/main/java/org/apache/eagle/alert/app/AlertEagleStorePlugin.java
@@ -80,14 +80,18 @@ public class AlertEagleStorePlugin extends AbstractPublishPlugin {
         Map<String, String> tags = new HashMap<>();
         tags.put(POLICY_ID_KEY, event.getPolicyId());
         tags.put(ALERT_ID_KEY, event.getAlertId());
-        if (event.getExtraData() != null && !event.getExtraData().isEmpty()) {
-            tags.put(SITE_ID_KEY, event.getExtraData().get(SITE_ID_KEY).toString());
-            alertEvent.setPolicyValue(event.getExtraData().get(POLICY_VALUE_KEY).toString());
-            alertEvent.setAppIds((List<String>) event.getExtraData().get(APP_IDS_KEY));
+        tags.put(ALERT_CATEGORY, event.getCategory());
+        tags.put(ALERT_SEVERITY, event.getSeverity().toString());
+        if (event.getContext() != null && !event.getContext().isEmpty()) {
+            tags.put(SITE_ID_KEY, event.getContext().get(SITE_ID_KEY).toString());
+            alertEvent.setPolicyValue(event.getContext().get(POLICY_VALUE_KEY).toString());
+            alertEvent.setAppIds((List<String>) event.getContext().get(APP_IDS_KEY));
         }
         alertEvent.setTimestamp(event.getCreatedTime());
         alertEvent.setAlertData(event.getDataMap());
+        alertEvent.setAlertSubject(event.getSubject());
+        alertEvent.setAlertBody(event.getBody());
         alertEvent.setTags(tags);
         return alertEvent;
     }
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
new file mode 100644
index 0000000..66579bb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertDefinition.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+public class AlertDefinition {
+    private TemplateType templateType = TemplateType.TEXT;
+    private String subject;
+    private String body;
+
+    private AlertSeverity severity;
+    private String category;
+
+    public String getBody() {
+        return body;
+    }
+
+    public void setBody(String templateResource) {
+        this.body = templateResource;
+    }
+
+    public TemplateType getTemplateType() {
+        return templateType;
+    }
+
+    public void setTemplateType(TemplateType type) {
+        this.templateType = type;
+    }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public AlertSeverity getSeverity() {
+        return severity;
+    }
+
+    public void setSeverity(AlertSeverity severity) {
+        this.severity = severity;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public enum TemplateType {
+        TEXT,
+        // FILE,
+        // HTTP
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java
new file mode 100644
index 0000000..0d36231
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/AlertSeverity.java
@@ -0,0 +1,21 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.coordinator;
+
+public enum AlertSeverity {
+    UNKNOWN, OK, WARNING, CRITICAL, FATAL
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
index 94d84f2..3663670 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/PolicyDefinition.java
@@ -39,6 +39,7 @@ public class PolicyDefinition implements Serializable {
     private Definition definition;
     private Definition stateDefinition;
     private PolicyStatus policyStatus = PolicyStatus.ENABLED;
+    private AlertDefinition alertDefinition;
 
     // one stream only have one partition in one policy, since we don't support stream alias
     private List<StreamPartition> partitionSpec = new ArrayList<StreamPartition>();
@@ -170,6 +171,22 @@ public class PolicyDefinition implements Serializable {
         return false;
     }
 
+    public AlertDefinition getAlertDefinition() {
+        return alertDefinition;
+    }
+
+    public void setAlertDefinition(AlertDefinition alertDefinition) {
+        this.alertDefinition = alertDefinition;
+    }
+
+    public AlertSeverity getAlertSeverity() {
+        return alertDefinition == null ? null : alertDefinition.getSeverity();
+    }
+
+    public String getAlertCategory() {
+        return alertDefinition == null ? null : alertDefinition.getCategory();
+    }
+
     @JsonIgnoreProperties(ignoreUnknown = true)
     public static class Definition implements Serializable {
         private static final long serialVersionUID = -622366527887848346L;
@@ -273,6 +290,7 @@ public class PolicyDefinition implements Serializable {
         ENABLED, DISABLED
     }
 
+
     @Override
     public String toString() {
         return String.format("{name=\"%s\",definition=%s}", this.getName(), this.getDefinition() == null ? "null" : this.getDefinition().toString());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
index aee0ba0..a794e49 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertPublishEvent.java
@@ -24,6 +24,9 @@ import org.apache.eagle.common.DateTimeUtil;
 import java.util.List;
 import java.util.Map;
 
+/**
+ * Use as final rich alert event.
+ */
 public class AlertPublishEvent {
     private String alertId;
     private String siteId;
@@ -32,12 +35,19 @@ public class AlertPublishEvent {
     private String policyValue;
     private long alertTimestamp;
     private Map<String, Object> alertData;
+    private String alertSubject;
+    private String alertBody;
+    private String streamId;
+    private String createdBy;
+    private long createdTime;
 
     public static final String ALERT_ID_KEY = "alertId";
     public static final String SITE_ID_KEY = "siteId";
     public static final String APP_IDS_KEY = "appIds";
     public static final String POLICY_ID_KEY = "policyId";
     public static final String POLICY_VALUE_KEY = "policyValue";
+    public static final String ALERT_CATEGORY = "category";
+    public static final String ALERT_SEVERITY = "severity";
 
     public String getAlertId() {
         return alertId;
@@ -102,10 +112,21 @@ public class AlertPublishEvent {
         alertEvent.setAlertId(event.getAlertId());
         alertEvent.setPolicyId(event.getPolicyId());
         alertEvent.setAlertTimestamp(event.getCreatedTime());
-        if (event.getExtraData() != null && !event.getExtraData().isEmpty()) {
-            alertEvent.setSiteId(event.getExtraData().get(SITE_ID_KEY).toString());
-            alertEvent.setPolicyValue(event.getExtraData().get(POLICY_VALUE_KEY).toString());
-            alertEvent.setAppIds((List<String>) event.getExtraData().get(APP_IDS_KEY));
+        alertEvent.setStreamId(event.getStreamId());
+        alertEvent.setCreatedBy(event.getCreatedBy());
+        alertEvent.setCreatedTime(event.getCreatedTime());
+        alertEvent.setAlertSubject(event.getSubject());
+        alertEvent.setAlertBody(event.getBody());
+        if (event.getContext() != null && !event.getContext().isEmpty()) {
+            if (event.getContext().containsKey(SITE_ID_KEY)) {
+                alertEvent.setSiteId(event.getContext().get(SITE_ID_KEY).toString());
+            }
+            if (event.getContext().containsKey(POLICY_VALUE_KEY)) {
+                alertEvent.setPolicyValue(event.getContext().get(POLICY_VALUE_KEY).toString());
+            }
+            if (event.getContext().containsKey(APP_IDS_KEY)) {
+                alertEvent.setAppIds((List<String>) event.getContext().get(APP_IDS_KEY));
+            }
         }
         alertEvent.setAlertData(event.getDataMap());
         return alertEvent;
@@ -113,11 +134,51 @@ public class AlertPublishEvent {
 
     public String toString() {
         return String.format("%s %s alertId=%s, siteId=%s, policyId=%s, alertData=%s",
-            DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp),
-            DateTimeUtil.CURRENT_TIME_ZONE.getID(),
-            alertId,
-            siteId,
-            policyId,
-            alertData.toString());
+                DateTimeUtil.millisecondsToHumanDateWithSeconds(alertTimestamp),
+                DateTimeUtil.CURRENT_TIME_ZONE.getID(),
+                alertId,
+                siteId,
+                policyId,
+                alertData.toString());
+    }
+
+    public String getAlertSubject() {
+        return alertSubject;
+    }
+
+    public void setAlertSubject(String alertSubject) {
+        this.alertSubject = alertSubject;
+    }
+
+    public String getAlertBody() {
+        return alertBody;
+    }
+
+    public void setAlertBody(String alertBody) {
+        this.alertBody = alertBody;
+    }
+
+    public String getStreamId() {
+        return streamId;
+    }
+
+    public void setStreamId(String streamId) {
+        this.streamId = streamId;
+    }
+
+    public String getCreatedBy() {
+        return createdBy;
+    }
+
+    public void setCreatedBy(String createdBy) {
+        this.createdBy = createdBy;
+    }
+
+    public long getCreatedTime() {
+        return createdTime;
+    }
+
+    public void setCreatedTime(long createdTime) {
+        this.createdTime = createdTime;
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
index b7f0132..50512b1 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/model/AlertStreamEvent.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.engine.model;
 
+import org.apache.eagle.alert.engine.coordinator.AlertSeverity;
 import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
@@ -35,8 +36,18 @@ public class AlertStreamEvent extends StreamEvent {
     private StreamDefinition schema;
     private String createdBy;
     private long createdTime;
-    // app related fields
-    private Map<String, Object> extraData;
+    private String category;
+    private AlertSeverity severity = AlertSeverity.WARNING;
+
+    // ----------------------
+    // Lazy Alert Fields
+    // ----------------------
+
+    // Dynamical context like app related fields
+    private Map<String, Object> context;
+    // Alert content like subject and body
+    private String subject;
+    private String body;
 
     public AlertStreamEvent() {
     }
@@ -72,9 +83,10 @@ public class AlertStreamEvent extends StreamEvent {
                 dataStrings.add(null);
             }
         }
-        return String.format("AlertStreamEvent[stream=%S,timestamp=%s,data=[%s], policyId=%s, createdBy=%s, metaVersion=%s]",
+
+        return String.format("Alert {stream=%S,timestamp=%s,data=%s, policyId=%s, createdBy=%s, metaVersion=%s}",
                 this.getStreamId(), DateTimeUtil.millisecondsToHumanDateWithMilliseconds(this.getTimestamp()),
-                StringUtils.join(dataStrings, ","), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion());
+                this.getDataMap(), this.getPolicyId(), this.getCreatedBy(), this.getMetaVersion());
     }
 
     public String getCreatedBy() {
@@ -114,12 +126,12 @@ public class AlertStreamEvent extends StreamEvent {
         return event;
     }
 
-    public Map<String, Object> getExtraData() {
-        return extraData;
+    public Map<String, Object> getContext() {
+        return context;
     }
 
-    public void setExtraData(Map<String, Object> extraData) {
-        this.extraData = extraData;
+    public void setContext(Map<String, Object> context) {
+        this.context = context;
     }
 
     public String getAlertId() {
@@ -132,4 +144,36 @@ public class AlertStreamEvent extends StreamEvent {
             this.alertId = UUID.randomUUID().toString();
         }
     }
+
+    public String getSubject() {
+        return subject;
+    }
+
+    public void setSubject(String subject) {
+        this.subject = subject;
+    }
+
+    public String getBody() {
+        return body;
+    }
+
+    public void setBody(String body) {
+        this.body = body;
+    }
+
+    public String getCategory() {
+        return category;
+    }
+
+    public void setCategory(String category) {
+        this.category = category;
+    }
+
+    public AlertSeverity getSeverity() {
+        return severity;
+    }
+
+    public void setSeverity(AlertSeverity severity) {
+        this.severity = severity;
+    }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
index 01eaa3c..5903ffd 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertPublishEventTest.java
@@ -95,7 +95,7 @@ public class AlertPublishEventTest {
         extraData.put(AlertPublishEvent.SITE_ID_KEY, "SITE_ID_KEY");
         extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, "POLICY_VALUE_KEY");
         extraData.put(AlertPublishEvent.APP_IDS_KEY, Arrays.asList("appId1", "appId2"));
-        alertStreamEvent.setExtraData(extraData);
+        alertStreamEvent.setContext(extraData);
 
         alertPublishEvent = AlertPublishEvent.createAlertPublishEvent(alertStreamEvent);
         Assert.assertEquals("SITE_ID_KEY", alertPublishEvent.getSiteId());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
index eec3675..beaa8fa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/test/java/org/apache/eagle/alert/engine/model/AlertStreamEventTest.java
@@ -47,14 +47,21 @@ public class AlertStreamEventTest {
         AlertStreamEvent alertStreamEvent = new AlertStreamEvent();
         alertStreamEvent.setSchema(streamDefinition);
         alertStreamEvent.setData(new Object[]{"namevalue", "hostvalue", "1", 10, 0.1, -0.2, "{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}", 1});
-        Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1], policyId=null, createdBy=null, metaVersion=null]", alertStreamEvent.toString());
-        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertStreamEvent.getDataMap().toString());
+        Assert.assertEquals(
+                "Alert {stream=NULL,timestamp=1970-01-01 00:00:00,000,data={flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}, policyId=null, createdBy=null, metaVersion=null}",
+                alertStreamEvent.toString());
+        Assert.assertEquals(
+                "{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}",
+                alertStreamEvent.getDataMap().toString());
 
         AlertStreamEvent alertStreamEvent1 = new AlertStreamEvent(alertStreamEvent);
 
-        Assert.assertEquals("AlertStreamEvent[stream=NULL,timestamp=1970-01-01 00:00:00,000,data=[namevalue,hostvalue,1,10,0.1,-0.2,{\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"},1], policyId=null, createdBy=null, metaVersion=null]", alertStreamEvent1.toString());
-        Assert.assertEquals("{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}", alertStreamEvent1.getDataMap().toString());
-
+        Assert.assertEquals(
+                "Alert {stream=NULL,timestamp=1970-01-01 00:00:00,000,data={flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}, policyId=null, createdBy=null, metaVersion=null}",
+                alertStreamEvent1.toString());
+        Assert.assertEquals(
+                "{flag=1, data=0.1, name=namevalue, host=hostvalue, salary=-0.2, value=10, int=1, object={\"name\":\"heap.COMMITTED\", \"Value\":\"175636480\"}}",
+                alertStreamEvent1.getDataMap().toString());
 
         Assert.assertFalse(alertStreamEvent1 == alertStreamEvent);
         Assert.assertTrue(alertStreamEvent1.equals(alertStreamEvent));

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
index 9e8f9f1..b8e5e42 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/interpreter/PolicyExecutionPlanner.java
@@ -16,6 +16,9 @@
  */
 package org.apache.eagle.alert.engine.interpreter;
 
+/**
+ * Keep PolicyExecutionPlanner as simple and fast as possible (avoid any backend data exchanging).
+ */
 interface PolicyExecutionPlanner {
     /**
      * @return PolicyExecutionPlan.

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java
new file mode 100644
index 0000000..71c2a8e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/AlertStreamFilter.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+public interface AlertStreamFilter {
+    /**
+     * Filter Stream Event, if skipped, return null.
+     */
+    AlertStreamEvent filter(AlertStreamEvent event);
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java
new file mode 100644
index 0000000..a6cc3e5
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PipeStreamFilter.java
@@ -0,0 +1,46 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher;
+
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+
+import java.util.ArrayList;
+import java.util.List;
+
+public class PipeStreamFilter implements AlertStreamFilter {
+
+    private final List<AlertStreamFilter> filters;
+
+    public PipeStreamFilter(AlertStreamFilter... filters) {
+        this.filters = new ArrayList<>();
+        for (AlertStreamFilter filter : filters) {
+            this.filters.add(filter);
+        }
+    }
+
+    @Override
+    public AlertStreamEvent filter(AlertStreamEvent event) {
+        AlertStreamEvent current = event;
+        for (AlertStreamFilter filter : this.filters) {
+            if (current == null) {
+                return null;
+            }
+            current = filter.filter(current);
+        }
+        return current;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
index 46cce29..e716fbe 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/PublishConstants.java
@@ -51,11 +51,14 @@ public class PublishConstants {
     public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";
     public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
 
-    public static final String ALERT_EMAIL_MESSAGE = "alertMessage";
+    public static final String ALERT_EMAIL_SUBJECT = "alertSubject";
+    public static final String ALERT_EMAIL_BODY = "alertBody";
     public static final String ALERT_EMAIL_STREAM_ID = "streamId";
-    public static final String ALERT_EMAIL_TIMESTAMP = "alertTime";
+    public static final String ALERT_EMAIL_TIME = "alertTime";
     public static final String ALERT_EMAIL_POLICY_ID = "policyId";
     public static final String ALERT_EMAIL_ALERT_ID = "alertId";
+    public static final String ALERT_EMAIL_ALERT_CATEGORY = "alertCategory";
+    public static final String ALERT_EMAIL_ALERT_SEVERITY = "alertSeverity";
     public static final String ALERT_EMAIL_ALERT_DATA = "alertData";
     public static final String ALERT_EMAIL_ALERT_DATA_DESC = "alertDataDesc";
     public static final String ALERT_EMAIL_CREATOR = "creator";

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
index 809bb09..8aaf310 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/email/AlertEmailGenerator.java
@@ -22,10 +22,9 @@ package org.apache.eagle.alert.engine.publisher.email;
 
 import org.apache.commons.httpclient.URIException;
 import org.apache.commons.httpclient.util.URIUtil;
-import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.PublishConstants;
-
+import org.apache.eagle.common.DateTimeUtil;
 import org.apache.eagle.common.Version;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -65,7 +64,13 @@ public class AlertEmailGenerator {
         Map<String, String> alertContext = buildAlertContext(event);
         email.setAlertContext(alertContext);
         email.setVelocityTplFile(tplFile);
-        email.setSubject(subject);
+        if (event.getCategory() != null) {
+            email.setSubject(String.format("[Eagle Alert][%s][%s] %s",
+                event.getSeverity(), event.getCategory(), event.getSubject() != null ? event.getSubject() : subject));
+        } else {
+            email.setSubject(String.format("[Eagle Alert][%s] %s",
+                event.getSeverity(), event.getSubject() != null ? event.getSubject() : subject));
+        }
         email.setSender(sender);
         email.setRecipients(recipients);
         email.setCc(cc);
@@ -94,11 +99,12 @@ public class AlertEmailGenerator {
         return status;
     }
 
-    /**
-     * TODO Support template-based alert message.
-     */
-    private String renderAlertMessage(AlertStreamEvent event) {
-        return String.format("Alert policy \"%s\" was triggered: %s",event.getPolicyId(), generateAlertDataDesc(event));
+    private String getAlertBody(AlertStreamEvent event) {
+        if (event.getBody() == null) {
+            return String.format("Alert policy \"%s\" was triggered: %s", event.getPolicyId(), generateAlertDataDesc(event));
+        } else {
+            return event.getBody();
+        }
     }
 
     private String generateAlertDataDesc(AlertStreamEvent event) {
@@ -106,7 +112,7 @@ public class AlertEmailGenerator {
             return "N/A";
         }
         StringBuilder sb = new StringBuilder();
-        for (Map.Entry<String,Object> entry : event.getDataMap().entrySet()) {
+        for (Map.Entry<String, Object> entry : event.getDataMap().entrySet()) {
             sb.append(entry.getKey()).append("=").append(entry.getValue()).append(" ");
         }
         return sb.toString();
@@ -114,31 +120,45 @@ public class AlertEmailGenerator {
 
     private Map<String, String> buildAlertContext(AlertStreamEvent event) {
         Map<String, String> alertContext = new HashMap<>();
-        alertContext.put(PublishConstants.ALERT_EMAIL_MESSAGE, renderAlertMessage(event));
+
+        if (event.getContext() != null) {
+            for (Map.Entry<String, Object> entry : event.getContext().entrySet()) {
+                if (entry.getValue() == null) {
+                    alertContext.put(entry.getKey(), "N/A");
+                } else {
+                    alertContext.put(entry.getKey(), entry.getValue().toString());
+                }
+            }
+        }
+
+        alertContext.put(PublishConstants.ALERT_EMAIL_SUBJECT, event.getSubject());
+        alertContext.put(PublishConstants.ALERT_EMAIL_BODY, getAlertBody(event));
         alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_ID, event.getPolicyId());
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_ID, event.getAlertId());
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA, event.getDataMap().toString());
         alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DATA_DESC, generateAlertDataDesc(event));
-        alertContext.put(PublishConstants.ALERT_EMAIL_TIMESTAMP, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+        alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_CATEGORY, event.getCategory());
+        alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_SEVERITY, event.getSeverity().toString());
+        alertContext.put(PublishConstants.ALERT_EMAIL_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
         alertContext.put(PublishConstants.ALERT_EMAIL_STREAM_ID, event.getStreamId());
         alertContext.put(PublishConstants.ALERT_EMAIL_CREATOR, event.getCreatedBy());
         alertContext.put(PublishConstants.ALERT_EMAIL_VERSION, Version.version);
 
-        String rootUrl = this.getServerPort() == 80 ? String.format("http://%s",this.getServerHost())
-            : String.format("http://%s:%s",this.getServerHost(), this.getServerPort());
+        String rootUrl = this.getServerPort() == 80 ? String.format("http://%s", this.getServerHost())
+            : String.format("http://%s:%s", this.getServerHost(), this.getServerPort());
         try {
             alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DETAIL_URL,
-                String.format("%s/#/alert/detail/%s", rootUrl, URIUtil.encodeQuery(event.getAlertId(),"UTF-8")));
+                String.format("%s/#/alert/detail/%s", rootUrl, URIUtil.encodeQuery(event.getAlertId(), "UTF-8")));
             alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_DETAIL_URL,
-                String.format("%s/#/policy/detail/%s",rootUrl, URIUtil.encodeQuery(event.getPolicyId(),"UTF-8")));
+                String.format("%s/#/policy/detail/%s", rootUrl, URIUtil.encodeQuery(event.getPolicyId(), "UTF-8")));
         } catch (URIException e) {
-            LOG.warn(e.getMessage(),e);
+            LOG.warn(e.getMessage(), e);
             alertContext.put(PublishConstants.ALERT_EMAIL_ALERT_DETAIL_URL,
                 String.format("%s/#/alert/detail/%s", rootUrl, event.getAlertId()));
             alertContext.put(PublishConstants.ALERT_EMAIL_POLICY_DETAIL_URL,
-                String.format("%s/#/policy/detail/%s",rootUrl, event.getPolicyId()));
+                String.format("%s/#/policy/detail/%s", rootUrl, event.getPolicyId()));
         }
-        alertContext.put(PublishConstants.ALERT_EMAIL_HOME_URL,rootUrl);
+        alertContext.put(PublishConstants.ALERT_EMAIL_HOME_URL, rootUrl);
         return alertContext;
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
index 7431d35..d81ec2a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertEmailPublisher.java
@@ -158,7 +158,9 @@ public class AlertEmailPublisher extends AbstractPublishPlugin {
     private AlertEmailGenerator createEmailGenerator(Map<String, Object> notificationConfig) {
         String tplFileName = (String) notificationConfig.get(PublishConstants.TEMPLATE);
         if (tplFileName == null || tplFileName.equals("")) {
-            tplFileName = "ALERT_DEFAULT.vm";
+            // tplFileName = "ALERT_DEFAULT_TEMPLATE.vm";
+            // tplFileName = "ALERT_LIGHT_TEMPLATE.vm";
+            tplFileName = "ALERT_INLINED_TEMPLATE.vm";
         }
         String subject = (String) notificationConfig.get(PublishConstants.SUBJECT);
         if (subject == null) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
new file mode 100644
index 0000000..9f85952
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertContextFields.java
@@ -0,0 +1,43 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import java.util.Arrays;
+import java.util.List;
+
+public class AlertContextFields {
+    public static final String STREAM_ID = "STREAM_ID";
+    public static final String ALERT_ID = "ALERT_ID";
+    public static final String CREATED_BY = "CREATED_BY";
+    public static final String POLICY_ID = "POLICY_ID";
+    public static final String CREATED_TIMESTAMP = "CREATED_TIMESTAMP";
+    public static final String CREATED_TIME = "CREATED_TIME";
+    public static final String ALERT_TIMESTAMP = "ALERT_TIMESTAMP";
+    public static final String ALERT_TIME = "ALERT_TIME";
+    public static final String ALERT_SCHEMA = "ALERT_SCHEMA";
+    public static final String ALERT_EVENT = "ALERT_EVENT";
+    public static final String POLICY_DESC = "POLICY_DESC";
+    public static final String POLICY_TYPE = "POLICY_TYPE";
+    public static final String POLICY_DEFINITION = "POLICY_DEFINITION";
+    public static final String POLICY_HANDLER = "POLICY_HANDLER";
+
+    public static List<String> getAllContextFields() {
+        return Arrays.asList(
+            STREAM_ID, ALERT_ID, CREATED_BY, POLICY_ID, CREATED_TIMESTAMP, CREATED_TIME, ALERT_TIMESTAMP, ALERT_TIME, ALERT_SCHEMA, POLICY_DESC, POLICY_TYPE, POLICY_DEFINITION, POLICY_HANDLER
+        );
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
new file mode 100644
index 0000000..760ec7c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateEngine.java
@@ -0,0 +1,48 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+import java.util.Collection;
+
+/**
+ * Alert Template Engine.
+ */
+public interface AlertTemplateEngine extends AlertStreamFilter {
+    /**
+     * Initialize AlertTemplateEngine with Config.
+     */
+    void init(Config config);
+
+    /**
+     * Register policy with definition.
+     */
+    void register(PolicyDefinition policyDefinition);
+
+    /**
+     * Register policy by policyId.
+     */
+    void unregister(String policyId);
+
+    /**
+     * @return registered policy definitions.
+     */
+    Collection<PolicyDefinition> getPolicies();
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
new file mode 100644
index 0000000..875facb
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/AlertTemplateProvider.java
@@ -0,0 +1,23 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+public class AlertTemplateProvider {
+    public static AlertTemplateEngine createAlertTemplateEngine() {
+        return new VelocityAlertTemplateEngine();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
new file mode 100644
index 0000000..a019ca6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityAlertTemplateEngine.java
@@ -0,0 +1,170 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.publisher.template;
+
+import com.google.common.base.Preconditions;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.engine.coordinator.AlertDefinition;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.model.AlertStreamEvent;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
+import org.apache.velocity.runtime.resource.util.StringResourceRepository;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+import java.io.StringWriter;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+
+public class VelocityAlertTemplateEngine implements AlertTemplateEngine {
+    private static final String ALERT_BODY_TPL_PREFIX = "AlertBodyTemplate";
+    private static final String ALERT_SUBJECT_TPL_PREFIX = "AlertSubjectTemplate";
+    private static final Logger LOG = LoggerFactory.getLogger(VelocityAlertTemplateEngine.class);
+    private StringResourceRepository stringResourceRepository;
+    private Map<String, PolicyDefinition> policyDefinitionRepository;
+    private VelocityEngine engine;
+
+
+    @Override
+    public void init(Config config) {
+        engine = new VelocityEngine();
+        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
+        engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
+        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
+        engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
+        engine.addProperty("string.resource.loader.repository.static", "false");
+        engine.init();
+
+        stringResourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
+        policyDefinitionRepository = new HashMap<>();
+    }
+
+    private String getAlertBodyTemplateName(String policyId) {
+        return String.format("%s:%s", ALERT_BODY_TPL_PREFIX, policyId);
+    }
+
+    private String getAlertSubjectTemplateName(String policyId) {
+        return String.format("%s:%s", ALERT_SUBJECT_TPL_PREFIX, policyId);
+    }
+
+    @Override
+    public synchronized void register(PolicyDefinition policyDefinition) {
+        LOG.info("Registering {}", policyDefinition.getName());
+        Preconditions.checkNotNull(policyDefinition.getName(), "policyId is null");
+        AlertDefinition alertDefinition = policyDefinition.getAlertDefinition();
+        if (alertDefinition == null) {
+            LOG.warn("Subject template of policy {} is null, using policy name by default");
+            stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName());
+
+            LOG.warn("Body template of policy {} is null, using $ALERT_EVENT by default");
+            String defaultAlertBodyTmpl = String.format("Message: $%s (Auto-generated alert message as template not defined in policy %s)",
+                AlertContextFields.ALERT_EVENT, policyDefinition.getName());
+            stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), defaultAlertBodyTmpl);
+        } else if (alertDefinition.getTemplateType().equals(AlertDefinition.TemplateType.TEXT)) {
+            if (alertDefinition.getSubject() != null) {
+                stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), alertDefinition.getSubject());
+            } else {
+                LOG.warn("Subject template of policy {} is null, using policy name by default");
+                stringResourceRepository.putStringResource(getAlertSubjectTemplateName(policyDefinition.getName()), policyDefinition.getName());
+            }
+            if (alertDefinition.getBody() != null) {
+                stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), alertDefinition.getBody());
+            } else {
+                LOG.warn("Body template of policy {} is null, using ALERT_EVENT by default");
+                stringResourceRepository.putStringResource(getAlertBodyTemplateName(policyDefinition.getName()), "$" + AlertContextFields.ALERT_EVENT);
+            }
+        } else {
+            throw new IllegalArgumentException("Unsupported alert template type " + alertDefinition.getTemplateType());
+        }
+        policyDefinitionRepository.put(policyDefinition.getName(), policyDefinition);
+    }
+
+    @Override
+    public synchronized void unregister(String policyId) {
+        LOG.info("Unregistering {}", policyId);
+        stringResourceRepository.removeStringResource(getAlertBodyTemplateName(policyId));
+        stringResourceRepository.removeStringResource(getAlertSubjectTemplateName(policyId));
+        policyDefinitionRepository.remove(policyId);
+    }
+
+    @Override
+    public synchronized AlertStreamEvent filter(AlertStreamEvent event) {
+        Preconditions.checkArgument(this.policyDefinitionRepository.containsKey(event.getPolicyId()), "Unknown policyId " + event.getPolicyId());
+        PolicyDefinition policyDefinition = this.policyDefinitionRepository.get(event.getPolicyId());
+        StringWriter bodyWriter = new StringWriter();
+        StringWriter subjectWriter = new StringWriter();
+        try {
+            VelocityContext alertContext = buildAlertContext(policyDefinition, event);
+            Template template = engine.getTemplate(getAlertBodyTemplateName(event.getPolicyId()));
+            template.merge(alertContext, bodyWriter);
+            event.setBody(bodyWriter.toString());
+
+            template = engine.getTemplate(getAlertSubjectTemplateName(event.getPolicyId()));
+            template.merge(alertContext, subjectWriter);
+            event.setSubject(subjectWriter.toString());
+        } finally {
+            try {
+                bodyWriter.close();
+            } catch (IOException e) {
+                LOG.warn(e.getMessage(), e);
+            }
+            try {
+                subjectWriter.close();
+            } catch (IOException e) {
+                LOG.warn(e.getMessage(), e);
+            }
+        }
+        return event;
+    }
+
+    @Override
+    public synchronized Collection<PolicyDefinition> getPolicies() {
+        return policyDefinitionRepository.values();
+    }
+
+    private static VelocityContext buildAlertContext(PolicyDefinition policyDefinition, AlertStreamEvent event) {
+        VelocityContext context = new VelocityContext();
+        context.put(AlertContextFields.STREAM_ID, event.getStreamId());
+        context.put(AlertContextFields.ALERT_ID, event.getAlertId());
+        context.put(AlertContextFields.CREATED_BY, event.getCreatedBy());
+        context.put(AlertContextFields.CREATED_TIMESTAMP, event.getCreatedTime());
+        context.put(AlertContextFields.CREATED_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getCreatedTime()));
+        context.put(AlertContextFields.ALERT_TIMESTAMP, event.getTimestamp());
+        context.put(AlertContextFields.ALERT_TIME, DateTimeUtil.millisecondsToHumanDateWithSeconds(event.getTimestamp()));
+        context.put(AlertContextFields.ALERT_SCHEMA, event.getSchema());
+        context.put(AlertContextFields.ALERT_EVENT, event);
+
+        context.put(AlertContextFields.POLICY_ID, policyDefinition.getName());
+        context.put(AlertContextFields.POLICY_DESC, policyDefinition.getDescription());
+        context.put(AlertContextFields.POLICY_TYPE, policyDefinition.getDefinition().getType());
+        context.put(AlertContextFields.POLICY_DEFINITION, policyDefinition.getDefinition().getValue());
+        context.put(AlertContextFields.POLICY_HANDLER, policyDefinition.getDefinition().getHandlerClass());
+
+        for (Map.Entry<String, Object> entry : event.getDataMap().entrySet()) {
+            context.put(entry.getKey(), entry.getValue());
+        }
+        return context;
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
new file mode 100644
index 0000000..a824a0d
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/template/VelocityTemplateParser.java
@@ -0,0 +1,95 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.engine.publisher.template;
+
+import org.apache.velocity.Template;
+import org.apache.velocity.VelocityContext;
+import org.apache.velocity.app.Velocity;
+import org.apache.velocity.app.VelocityEngine;
+import org.apache.velocity.exception.MethodInvocationException;
+import org.apache.velocity.exception.ParseErrorException;
+import org.apache.velocity.runtime.RuntimeConstants;
+import org.apache.velocity.runtime.parser.node.ASTReference;
+import org.apache.velocity.runtime.parser.node.ASTprocess;
+import org.apache.velocity.runtime.resource.loader.StringResourceLoader;
+import org.apache.velocity.runtime.resource.util.StringResourceRepository;
+import org.apache.velocity.runtime.visitor.NodeViewMode;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.StringWriter;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+public class VelocityTemplateParser {
+    private static final Logger LOG = LoggerFactory.getLogger(VelocityTemplateParser.class);
+    private static final String TEMPLATE_NAME = "template";
+    private final Template template;
+    private final ParserNodeVisitor visitor;
+
+    public VelocityTemplateParser(String templateString) throws ParseErrorException {
+        VelocityEngine engine = new VelocityEngine();
+        engine.setProperty(RuntimeConstants.RUNTIME_LOG_LOGSYSTEM_CLASS, "org.apache.velocity.runtime.log.Log4JLogChute");
+        engine.setProperty("runtime.log.logsystem.log4j.logger", LOG.getName());
+        engine.setProperty(Velocity.RESOURCE_LOADER, "string");
+        engine.addProperty("string.resource.loader.class", StringResourceLoader.class.getName());
+        engine.addProperty("string.resource.loader.repository.static", "false");
+        engine.addProperty("runtime.references.strict", "true");
+        engine.init();
+        StringResourceRepository resourceRepository = (StringResourceRepository) engine.getApplicationAttribute(StringResourceLoader.REPOSITORY_NAME_DEFAULT);
+        resourceRepository.putStringResource(TEMPLATE_NAME, templateString);
+        template = engine.getTemplate(TEMPLATE_NAME);
+        ASTprocess data = (ASTprocess) template.getData();
+        visitor = new ParserNodeVisitor();
+        data.jjtAccept(visitor, null);
+    }
+
+    public List<String> getReferenceNames() {
+        return this.visitor.getReferenceNames();
+    }
+
+    public Template getTemplate() {
+        return template;
+    }
+
+    /**
+     * @throws MethodInvocationException if required variable is missing in context.
+     */
+    public void validateContext(Map<String, Object> context) throws MethodInvocationException {
+        VelocityContext velocityContext = new VelocityContext();
+        for (Map.Entry<String, Object> entry : context.entrySet()) {
+            velocityContext.put(entry.getKey(), entry.getValue());
+        }
+        template.merge(velocityContext, new StringWriter());
+    }
+
+    private class ParserNodeVisitor extends NodeViewMode {
+        private List<String> referenceNames = new ArrayList<>();
+
+        @Override
+        public Object visit(ASTReference node, Object data) {
+            referenceNames.add(node.getRootString());
+            return super.visit(node, data);
+        }
+
+        public List<String> getReferenceNames() {
+            return this.referenceNames;
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
index 72eafe4..2b57e96 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertPublisherBolt.java
@@ -16,7 +16,13 @@
  */
 package org.apache.eagle.alert.engine.runner;
 
-import org.apache.commons.collections.map.HashedMap;
+import backtype.storm.metric.api.MultiCountMetric;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+import com.typesafe.config.Config;
 import org.apache.eagle.alert.coordination.model.PublishSpec;
 import org.apache.eagle.alert.engine.StreamContextImpl;
 import org.apache.eagle.alert.engine.coordinator.*;
@@ -24,16 +30,12 @@ import org.apache.eagle.alert.engine.model.AlertPublishEvent;
 import org.apache.eagle.alert.engine.model.AlertStreamEvent;
 import org.apache.eagle.alert.engine.publisher.AlertPublishSpecListener;
 import org.apache.eagle.alert.engine.publisher.AlertPublisher;
+import org.apache.eagle.alert.engine.publisher.AlertStreamFilter;
+import org.apache.eagle.alert.engine.publisher.PipeStreamFilter;
 import org.apache.eagle.alert.engine.publisher.impl.AlertPublisherImpl;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateEngine;
+import org.apache.eagle.alert.engine.publisher.template.AlertTemplateProvider;
 import org.apache.eagle.alert.utils.AlertConstants;
-import backtype.storm.metric.api.MultiCountMetric;
-import backtype.storm.task.OutputCollector;
-import backtype.storm.task.TopologyContext;
-import backtype.storm.topology.OutputFieldsDeclarer;
-import backtype.storm.tuple.Fields;
-import backtype.storm.tuple.Tuple;
-import com.typesafe.config.Config;
-
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
@@ -41,22 +43,24 @@ import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.List;
 import java.util.Map;
+import java.util.stream.Collectors;
 
-@SuppressWarnings("serial")
 public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPublishSpecListener {
     private static final Logger LOG = LoggerFactory.getLogger(AlertPublisherBolt.class);
     private final AlertPublisher alertPublisher;
     private volatile Map<String, Publishment> cachedPublishments = new HashMap<>();
     private volatile Map<String, PolicyDefinition> policyDefinitionMap;
     private volatile Map<String, StreamDefinition> streamDefinitionMap;
+    private AlertTemplateEngine alertTemplateEngine;
 
     private boolean logEventEnabled;
     private TopologyContext context;
-    
+    private AlertStreamFilter alertFilter;
+
     public AlertPublisherBolt(String alertPublisherName, Config config, IMetadataChangeNotifyService coordinatorService) {
         super(alertPublisherName, coordinatorService, config);
         this.alertPublisher = new AlertPublisherImpl(alertPublisherName);
-        
+
         if (config != null && config.hasPath("topology.logEventEnabled")) {
             logEventEnabled = config.getBoolean("topology.logEventEnabled");
         }
@@ -69,23 +73,28 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
         this.alertPublisher.init(config, stormConf);
         streamContext = new StreamContextImpl(config, context.registerMetric("eagle.publisher", new MultiCountMetric(), 60), context);
         this.context = context;
+        this.alertTemplateEngine = AlertTemplateProvider.createAlertTemplateEngine();
+        this.alertTemplateEngine.init(config);
+        this.alertFilter = new PipeStreamFilter(new AlertContextEnrichFilter(this), new AlertTemplateFilter(alertTemplateEngine));
     }
 
     @Override
     public void execute(Tuple input) {
         try {
-            streamContext.counter().scope("receive_count");
+            streamContext.counter().incr("receive_count");
             PublishPartition partition = (PublishPartition) input.getValueByField(AlertConstants.FIELD_0);
             AlertStreamEvent event = (AlertStreamEvent) input.getValueByField(AlertConstants.FIELD_1);
             if (logEventEnabled) {
                 LOG.info("Alert publish bolt {}/{} with partition {} received event: {}", this.getBoltId(), this.context.getThisTaskId(), partition, event);
             }
-            wrapAlertPublishEvent(event);
-            alertPublisher.nextEvent(partition, event);
+            AlertStreamEvent filteredEvent = alertFilter.filter(event);
+            if (filteredEvent != null) {
+                alertPublisher.nextEvent(partition, filteredEvent);
+            }
             this.collector.ack(input);
-            streamContext.counter().scope("ack_count");
-        } catch (Exception ex) {
-            streamContext.counter().scope("fail_count");
+            streamContext.counter().incr("ack_count");
+        } catch (Throwable ex) {
+            streamContext.counter().incr("fail_count");
             LOG.error(ex.getMessage(), ex);
             collector.reportError(ex);
         }
@@ -131,30 +140,79 @@ public class AlertPublisherBolt extends AbstractStreamBolt implements AlertPubli
 
     @Override
     public void onAlertPolicyChange(Map<String, PolicyDefinition> pds, Map<String, StreamDefinition> sds) {
+        List<String> policyToRemove = new ArrayList<>();
+        if (this.policyDefinitionMap != null) {
+            policyToRemove.addAll(this.policyDefinitionMap.keySet().stream().filter(policyId -> !pds.containsKey(policyId)).collect(Collectors.toList()));
+        }
+
         this.policyDefinitionMap = pds;
         this.streamDefinitionMap = sds;
+
+        for (Map.Entry<String, PolicyDefinition> entry : pds.entrySet()) {
+            try {
+                this.alertTemplateEngine.register(entry.getValue());
+            } catch (Throwable throwable) {
+                LOG.error("Failed to register policy {} in template engine", entry.getKey(), throwable);
+            }
+        }
+
+        for (String policyId : policyToRemove) {
+            try {
+                this.alertTemplateEngine.unregister(policyId);
+            } catch (Throwable throwable) {
+                LOG.error("Failed to unregister policy {} from template engine", policyId, throwable);
+            }
+        }
     }
 
-    @SuppressWarnings("unchecked")
-    private void wrapAlertPublishEvent(AlertStreamEvent event) {
-        Map<String, Object> extraData = new HashedMap();
-        List<String> appIds = new ArrayList<>();
-        if (policyDefinitionMap == null || streamDefinitionMap == null) {
-            LOG.warn("policyDefinitions or streamDefinitions in publisher bolt have not been initialized");
-            return;
+    private class AlertContextEnrichFilter implements AlertStreamFilter {
+        private final AlertPublisherBolt alertPublisherBolt;
+
+        private AlertContextEnrichFilter(AlertPublisherBolt alertPublisherBolt) {
+            this.alertPublisherBolt = alertPublisherBolt;
         }
-        PolicyDefinition policyDefinition = policyDefinitionMap.get(event.getPolicyId());
-        if (this.policyDefinitionMap != null && policyDefinition != null) {
-            for (String inputStreamId : policyDefinition.getInputStreams()) {
-                StreamDefinition sd = this.streamDefinitionMap.get(inputStreamId);
-                if (sd != null) {
-                    extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId());
-                    appIds.add(sd.getDataSource());
+
+        /**
+         * TODO: Refactor wrapAlertPublishEvent into alertTemplateEngine and remove extraData from AlertStreamEvent.
+         */
+        @Override
+        public AlertStreamEvent filter(AlertStreamEvent event) {
+            event.ensureAlertId();
+            Map<String, Object> extraData = new HashMap<>();
+            List<String> appIds = new ArrayList<>();
+            if (alertPublisherBolt.policyDefinitionMap == null || alertPublisherBolt.streamDefinitionMap == null) {
+                LOG.warn("policyDefinitions or streamDefinitions in publisher bolt have not been initialized");
+            } else {
+                PolicyDefinition policyDefinition = alertPublisherBolt.policyDefinitionMap.get(event.getPolicyId());
+                if (alertPublisherBolt.policyDefinitionMap != null && policyDefinition != null) {
+                    for (String inputStreamId : policyDefinition.getInputStreams()) {
+                        StreamDefinition sd = alertPublisherBolt.streamDefinitionMap.get(inputStreamId);
+                        if (sd != null) {
+                            extraData.put(AlertPublishEvent.SITE_ID_KEY, sd.getSiteId());
+                            appIds.add(sd.getDataSource());
+                        }
+                    }
+                    extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds);
+                    extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, policyDefinition.getDefinition().getValue());
+                    event.setSeverity(policyDefinition.getAlertSeverity());
+                    event.setCategory(policyDefinition.getAlertCategory());
                 }
+                event.setContext(extraData);
             }
-            extraData.put(AlertPublishEvent.APP_IDS_KEY, appIds);
-            extraData.put(AlertPublishEvent.POLICY_VALUE_KEY, policyDefinition.getDefinition().getValue());
+            return event;
+        }
+    }
+
+    private class AlertTemplateFilter implements AlertStreamFilter {
+        private final AlertTemplateEngine alertTemplateEngine;
+
+        private AlertTemplateFilter(AlertTemplateEngine alertTemplateEngine) {
+            this.alertTemplateEngine = alertTemplateEngine;
+        }
+
+        @Override
+        public AlertStreamEvent filter(AlertStreamEvent event) {
+            return this.alertTemplateEngine.filter(event);
         }
-        event.setExtraData(extraData);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/1c81c086/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
deleted file mode 100644
index fad3aa9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/ALERT_DEFAULT.vm
+++ /dev/null
@@ -1,301 +0,0 @@
-<!--
-  ~ Licensed to the Apache Software Foundation (ASF) under one or more
-  ~ contributor license agreements.  See the NOTICE file distributed with
-  ~ this work for additional information regarding copyright ownership.
-  ~ The ASF licenses this file to You under the Apache License, Version 2.0
-  ~ (the "License"); you may not use this file except in compliance with
-  ~ the License.  You may obtain a copy of the License at
-  ~
-  ~    http://www.apache.org/licenses/LICENSE-2.0
-  ~
-  ~ Unless required by applicable law or agreed to in writing, software
-  ~ distributed under the License is distributed on an "AS IS" BASIS,
-  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  ~ See the License for the specific language governing permissions and
-  ~ limitations under the License.
-  -->
-<!DOCTYPE html PUBLIC "-//W3C//DTD XHTML 1.0 Strict//EN" "http://www.w3.org/TR/xhtml1/DTD/xhtml1-strict.dtd">
-<html xmlns="http://www.w3.org/1999/xhtml">
-<head>
-    <meta http-equiv="Content-Type" content="text/html; charset=utf-8"/>
-    <meta name="viewport" content="width=device-width"/>
-    <style>
-        body {
-            width: 100% !important;
-            min-width: 100%;
-            -webkit-text-size-adjust: 100%;
-            -ms-text-size-adjust: 100%;
-            margin: 0;
-            padding: 0;
-        }
-
-        table {
-            border-spacing: 0;
-            border-collapse: collapse;
-        }
-
-        table th,
-        table td {
-            padding: 3px 0 3px 0;
-        }
-
-        .body {
-            width: 100%;
-        }
-
-        p, a, h1, h2, h3, ul, ol, li {
-            font-family: Helvetica, Arial, sans-serif;
-            font-weight: normal;
-            margin: 0;
-            padding: 0;
-        }
-
-        p {
-            font-size: 14px;
-            line-height: 19px;
-        }
-
-        a {
-            color: #3294b1;
-        }
-
-        h1 {
-            font-size: 36px;
-            margin: 15px 0 5px 0;
-        }
-
-        h2 {
-            font-size: 32px;
-        }
-
-        h3 {
-            font-size: 28px;
-        }
-
-        ul, ol {
-            margin: 0 0 0 25px;
-            padding: 0;
-        }
-
-        .btn {
-            background: #2ba6cb !important;
-            border: 1px solid #2284a1;
-            padding: 10px 20px 10px 20px;
-            text-align: center;
-        }
-
-        .btn:hover {
-            background: #2795b6 !important;
-        }
-
-        .btn a {
-            color: #FFFFFF;
-            text-decoration: none;
-            font-weight: bold;
-            padding: 10px 20px 10px 20px;
-        }
-
-        .tableBordered {
-            border-top: 1px solid #b9e5ff;
-        }
-
-        .tableBordered th {
-            background: #ECF8FF;
-        }
-
-        .tableBordered th p {
-            font-weight: bold;
-            color: #3294b1;
-        }
-
-        .tableBordered th,
-        .tableBordered td {
-            color: #333333;
-            border-bottom: 1px solid #b9e5ff;
-            text-align: center;
-            padding-bottom: 5px;
-        }
-
-        .panel {
-            height: 100px;
-        }
-    </style>
-</head>
-<body>
-    #set ( $elem = $alertList[0] )
-<table class="body">
-    <tr>
-        <td align="center" valign="top" style="background: #999999; padding: 0 0 0 0;">
-            <!-- Header -->
-            <table width="580">
-                <tr>
-                    <td style="padding: 0 0 0 0;" align="left">
-                        <p style="color:#FFFFFF;font-weight: bold; font-size: 22px">Eagle Alert Notification</p>
-                    </td>
-                </tr>
-            </table>
-        </td>
-    </tr>
-
-    <tr>
-        <td align="center" valign="top">
-            <!-- Eagle Body -->
-            <table width="580">
-                <tr>
-                    <!-- Title -->
-                    <td align="center">
-                        <h2>[Alert] $elem["policyId"]</h2>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Time -->
-                    <td>
-                        <table width="580">
-                            <tr>
-                                <td>
-                                    <p><b>Detected Time: $elem["alertTime"]</b></p>
-                                </td>
-                                #set ( $severity = $elem["severity"] )
-                                #if (!$severity || ("$severity" == ""))
-                                    #set ( $elem["severity"] = "WARNING")
-                                #end
-                                <td align="right">
-                                    <p><b>
-                                        Severity:
-                                        #if ($elem["severity"] == "WARNING")
-                                            <span>$elem["severity"]</span>
-                                        #else
-                                            <span style="color: #FF0000;">$elem["severity"]</span>
-                                        #end
-                                    </b></p>
-                                </td>
-                            </tr>
-                        </table>
-                    </td>
-                </tr>
-
-                <tr>
-                    <!-- Basic Information -->
-                    <td style="padding: 20px 0 10px 0;">
-                        <p><b>Alert Message </b></p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Description -->
-                    <td valign="top"
-                        style="background: #ECF8FF; border: 1px solid #b9e5ff; padding: 10px 10px 12px 20px;">
-                        <p>$elem["alertMessage"]</p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Basic Information -->
-                    <td style="padding: 20px 0 10px 0;">
-                        <p><b>Alert Detail</b></p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Basic Information Content -->
-                    <td>
-                        <table class="tableBordered" width="580">
-                            <tr>
-                                <th>
-                                    <p>Policy Name</p>
-                                </th>
-                                <td>
-                                    <p><a href="$elem["policyDetailUrl"]">$elem["policyId"]</a></p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Severity Level</p>
-                                </th>
-                                <td>
-                                    <p>$elem["severity"]</p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Alert Stream</p>
-                                </th>
-                                <td>
-                                    <p>$elem["streamId"]</p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Created Time</p>
-                                </th>
-                                <td>
-                                    <p>$elem["alertTime"]</p>
-                                </td>
-                            </tr>
-                            <tr>
-                                <th>
-                                    <p>Created By</p>
-                                </th>
-                                <td>
-                                    <p>$elem["creator"]</p>
-                                </td>
-                            </tr>
-                        </table>
-                    </td>
-                </tr>
-##                <tr>
-##                    <!-- View Detail -->
-##                    <td align="center" style="padding: 10px 0 0 0;">
-##                        <table width="580">
-##                            <tr>
-##                                <td class="btn">
-##                                    <a href="$elem["policyDetailUrl"]">View Policy Details</a>
-##                                </td>
-##                            </tr>
-##                        </table>
-##                    </td>
-##                </tr>
-
-                <tr>
-                    <!-- View Detail -->
-                    <td align="center" style="padding: 10px 0 0 0;">
-                        <table width="580">
-                            <tr>
-                                <td class="btn">
-                                    <a href="$elem["alertDetailUrl"]">View Alert on Eagle</a>
-                                </td>
-                            </tr>
-                        </table>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Actions Required -->
-                    <td style="padding: 20px 0 10px 0;">
-                        <p><b>Actions Required</b></p>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Possible Root Causes Content -->
-                    <td class="panel" valign="top"
-                        style="background: #F4F4F4; border: 1px solid #AAAAAA; padding: 10px 10px 12px 10px;">
-                        <p>
-                            The alert notification was automatically detected and sent by Eagle according to policy: $elem["policyId"].
-                            To follow-up on this, please verify the alert and diagnose the root cause with Eagle:
-                        </p>
-                        <p></p>
-                        <ul>
-                            <li><p><a href="$elem["alertDetailUrl"]">View alert detail</a></p></li>
-                            <li><p><a href="$elem["policyDetailUrl"]">View policy detail</a></p></li>
-                            <li><p><a href="$elem["homeUrl"]">View eagle home</a></p></li>
-                        </ul>
-                    </td>
-                </tr>
-                <tr>
-                    <!-- Copyright -->
-                    <td align="center">
-                        <p><i>Powered by <a href="http://eagle.incubator.apache.org">Apache Eagle</a> (version: $elem["version"])</i></p>
-                    </td>
-                </tr>
-            </table>
-        </td>
-    </tr>
-</table>
-</body>
-</html>
\ No newline at end of file