You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/04/13 14:29:06 UTC

incubator-eagle git commit: [EAGLE-223] Notification plugin to enable multiple instance of given alert plugin

Repository: incubator-eagle
Updated Branches:
  refs/heads/master f9b1bc7b6 -> 12e4395fd


[EAGLE-223] Notification plugin to enable multiple instance of given alert plugin

https://issues.apache.org/jira/browse/EAGLE-239
https://issues.apache.org/jira/browse/EAGLE-221
https://issues.apache.org/jira/browse/EAGLE-223

Bug fix
   - properties in AlertContext is missing
   - entities persistence into database duplicates

Improvements
  - Add notificationType select on UI
  - Multiple notification instances support for each notification type at backend
  - Update AlertNotificationEntity and add a field “Fields” to describe the template for each notification type.

Author: Zhao, Qingwen <qi...@ebay.com>
Author: jiljiang <ji...@ebay.com>

Closes #146 from qingwen220/notify.


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/12e4395f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/12e4395f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/12e4395f

Branch: refs/heads/master
Commit: 12e4395fda473e6474d556843b21bb576b79c2fd
Parents: f9b1bc7
Author: Zhao, Qingwen <qi...@ebay.com>
Authored: Wed Apr 13 20:28:48 2016 +0800
Committer: Hao Chen <ha...@apache.org>
Committed: Wed Apr 13 20:28:48 2016 +0800

----------------------------------------------------------------------
 .../src/main/bin/eagle-topology-init.sh         | 18 ++--
 eagle-assembly/src/main/conf/eagle-service.conf |  1 -
 .../eagle/alert/common/AlertEmailSender.java    |  1 +
 .../eagle/alert/email/AlertEmailComponent.java  |  1 +
 .../eagle/alert/email/AlertEmailContext.java    |  1 +
 .../base/NotificationConstants.java             |  4 +
 .../plugin/AlertEagleStorePlugin.java           | 16 ++--
 .../notification/plugin/AlertEmailPlugin.java   | 59 +++++++------
 .../notification/plugin/AlertKafkaPlugin.java   | 72 ++++++++-------
 .../plugin/KafkaProducerSingleton.java          |  9 +-
 .../notification/plugin/NotificationPlugin.java | 10 +--
 .../plugin/NotificationPluginManagerImpl.java   | 21 ++---
 .../utils/NotificationPluginUtils.java          | 15 ++--
 .../testcases/TestAlertEagleStorePlugin.java    |  4 +-
 .../testcases/TestAlertEmailPlugin.java         |  4 +-
 .../testcases/TestAlertKafkaPlugin.java         |  4 +-
 .../TestNotificationPluginManager.java          | 93 ++++++++++++++++++++
 .../testcases/TestNotificationPluginUtils.java  | 38 ++++++++
 .../alert/config/EmailNotificationConfig.java   |  1 +
 .../eagle/alert/config/NotificationConfig.java  |  1 +
 .../alert/notification/AlertEmailGenerator.java |  1 +
 .../SiddhiAlertPolicyValidateProvider.java      |  2 +-
 .../utils/AlertExecutorConsumerUtils.scala      | 18 ++--
 .../eagle/alert/entity/AlertAPIEntity.java      | 10 ++-
 .../alert/entity/AlertNotificationEntity.java   | 13 +++
 .../src/main/resources/log4j.properties         |  2 +-
 .../src/main/webapp/app/public/css/main.css     | 14 +++
 .../app/public/feature/common/controller.js     | 92 +++++++++++++++----
 .../feature/common/page/policyDetail.html       | 19 +++-
 .../public/feature/common/page/policyEdit.html  | 37 ++++----
 .../webapp/app/public/js/components/tabs.js     |  3 +
 31 files changed, 422 insertions(+), 162 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-assembly/src/main/bin/eagle-topology-init.sh
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/bin/eagle-topology-init.sh b/eagle-assembly/src/main/bin/eagle-topology-init.sh
index 28af83b..6fd7ac9 100755
--- a/eagle-assembly/src/main/bin/eagle-topology-init.sh
+++ b/eagle-assembly/src/main/bin/eagle-topology-init.sh
@@ -137,8 +137,13 @@ echo "Importing AlertStreamService for USERPROFILE"
 curl -silent -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H "Content-Type: application/json"  "http://$EAGLE_SERVICE_HOST:$EAGLE_SERVICE_PORT/eagle-service/rest/entities?serviceName=AlertStreamService" \
      -d '[ { "prefix": "alertStream", "tags": { "streamName": "userActivity", "site":"sandbox", "application":"userProfile" }, "alertExecutorIdList": [ "userProfileAnomalyDetectionExecutor" ] } ]'
 
-echo "Importing notification plugin configurations ... "
+#####################################################################
+#     Import notification plugin configuration into Eagle Service   #
+#####################################################################
 
+## AlertNotificationService : schema for notifcation plugin configuration
+echo ""
+echo "Importing notification plugin configurations ... "
 curl -silent -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Content-Type:application/json' \
  "http://${EAGLE_SERVICE_HOST}:${EAGLE_SERVICE_PORT}/eagle-service/rest/entities?serviceName=AlertNotificationService" \
  -d '
@@ -150,7 +155,8 @@ curl -silent -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Conten
        },
        "className": "org.apache.eagle.notification.plugin.AlertEmailPlugin",
        "description": "send alert to email",
-       "enabled":true
+       "enabled":true,
+       "fields": "[{\"name\":\"sender\"},{\"name\":\"recipients\"},{\"name\":\"subject\"}]"
      },
      {
        "prefix": "alertNotifications",
@@ -159,7 +165,8 @@ curl -silent -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Conten
        },
        "className": "org.apache.eagle.notification.plugin.AlertKafkaPlugin",
        "description": "send alert to kafka bus",
-       "enabled":true
+       "enabled":true,
+       "fields": "[{\"name\":\"kafka_broker\",\"value\":\"sandbox.hortonworks.com:6667\"},{\"name\":\"topic\"}]"
      },
      {
        "prefix": "alertNotifications",
@@ -175,11 +182,6 @@ curl -silent -u ${EAGLE_SERVICE_USER}:${EAGLE_SERVICE_PASSWD} -X POST -H 'Conten
 
 ## Finished
 echo ""
-echo "Finished initialization for alert notification plugins"
-
-
-## Finished
-echo ""
 echo "Finished initialization for eagle topology"
 
 exit 0

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-assembly/src/main/conf/eagle-service.conf
----------------------------------------------------------------------
diff --git a/eagle-assembly/src/main/conf/eagle-service.conf b/eagle-assembly/src/main/conf/eagle-service.conf
index 152b90d..59e970f 100644
--- a/eagle-assembly/src/main/conf/eagle-service.conf
+++ b/eagle-assembly/src/main/conf/eagle-service.conf
@@ -20,7 +20,6 @@ eagle {
 		storage-username="eagle"
 		storage-password=eagle
 		storage-database=eagle
-		# Derby database location: $TOMCAT_HOME/data/eagle
 		storage-connection-url="jdbc:derby:/tmp/eagle-db-local;create=true"
 		storage-connection-props="encoding=UTF-8"
 		storage-driver-class="org.apache.derby.jdbc.EmbeddedDriver"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/common/AlertEmailSender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/common/AlertEmailSender.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/common/AlertEmailSender.java
index faea5be..9761031 100644
--- a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/common/AlertEmailSender.java
+++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/common/AlertEmailSender.java
@@ -33,6 +33,7 @@ import org.apache.eagle.common.email.EagleMailClient;
 import com.netflix.config.ConcurrentMapConfiguration;
 import com.typesafe.config.ConfigObject;
 
+@Deprecated
 public class AlertEmailSender implements Runnable {
 	
 	protected final List<Map<String, String>> alertContexts = new ArrayList<Map<String, String>>();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailComponent.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailComponent.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailComponent.java
index c4537b7..da1078a 100644
--- a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailComponent.java
+++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailComponent.java
@@ -21,6 +21,7 @@ import org.apache.eagle.common.metric.AlertContext;
 /**
  * Alert email component is one part of an email, which could be an individual alert
  */
+@Deprecated
 public class AlertEmailComponent {
 	private AlertContext alertContext;
 	public AlertContext getAlertContext() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailContext.java b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailContext.java
index 5d0609b..b2fd929 100644
--- a/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailContext.java
+++ b/eagle-core/eagle-alert/eagle-alert-base/src/main/java/org/apache/eagle/alert/email/AlertEmailContext.java
@@ -22,6 +22,7 @@ import java.util.List;
  * alert email bean
  * one email consists of a list of email component
  */
+@Deprecated
 public class AlertEmailContext {
 	private List<AlertEmailComponent> components;
 	private String sender;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
index 98d6b5c..1e49ac8 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/base/NotificationConstants.java
@@ -28,4 +28,8 @@ public class NotificationConstants {
     public static final String SENDER = "sender";
     public static final String RECIPIENTS = "recipients";
     public static final String TPL_FILE_NAME = "tplFileName";
+
+    // kafka specific constants
+    public static final String TOPIC = "topic";
+    public static final String BROKER_LIST = "kafka_broker";
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
index cb0df70..6013a80 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEagleStorePlugin.java
@@ -28,25 +28,25 @@ import org.slf4j.LoggerFactory;
 import java.util.ArrayList;
 import java.util.List;
 import java.util.Map;
+import java.util.Vector;
 
 /**
  * Plugin to persist alerts to Eagle Storage
  */
 public class AlertEagleStorePlugin implements NotificationPlugin {
     private static final Logger LOG = LoggerFactory.getLogger(AlertEagleStorePlugin.class);
-    private NotificationStatus status;
+    private List<NotificationStatus> statusList = new Vector<>();
     private AlertEagleStorePersister persist;
 
     @Override
     public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws Exception {
         this.persist = new AlertEagleStorePersister(config);
-        this.status = new NotificationStatus();
         LOG.info("initialized plugin for EagleStorePlugin");
     }
 
     @Override
-    public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
-        if( isPolicyDelete ){
+    public void update(String policyId, List<Map<String,String>> notificationConfigCollection , boolean isPolicyDelete ) throws Exception {
+        if(isPolicyDelete){
             LOG.info("Deleted policy ...");
             return;
         }
@@ -54,8 +54,8 @@ public class AlertEagleStorePlugin implements NotificationPlugin {
     }
 
     @Override
-    public NotificationStatus getStatus() {
-        return this.status;
+    public List<NotificationStatus> getStatusList() {
+        return this.statusList;
     }
 
     /**
@@ -65,10 +65,11 @@ public class AlertEagleStorePlugin implements NotificationPlugin {
     @Override
     public void onAlert(AlertAPIEntity alertEntity) {
         LOG.info("write alert to eagle storage " + alertEntity);
+        NotificationStatus status = new NotificationStatus();
         try{
             List<AlertAPIEntity> list = new ArrayList<AlertAPIEntity>();
             list.add(alertEntity);
-            boolean result = persist.doPersist( list );
+            boolean result = persist.doPersist(list);
             if(result) {
                 status.successful = true;
                 status.errorMessage = "";
@@ -81,6 +82,7 @@ public class AlertEagleStorePlugin implements NotificationPlugin {
             status.errorMessage = ex.getMessage();
             LOG.error("Fail writing alert entity to Eagle Store", ex);
         }
+        this.statusList.add(status);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
index f9a7ed3..9acb38e 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertEmailPlugin.java
@@ -42,12 +42,12 @@ import java.util.concurrent.TimeUnit;
  */
 public class AlertEmailPlugin implements NotificationPlugin {
 	private static final Logger LOG = LoggerFactory.getLogger(AlertEmailPlugin.class);
-	private Map<String, AlertEmailGenerator> emailGenerators = new ConcurrentHashMap<>();
+	private Map<String, List<AlertEmailGenerator>> emailGenerators = new ConcurrentHashMap<>();
 	private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
 	private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
 	private final static long DEFAULT_THREAD_POOL_SHRINK_TIME = 60000L; // 1 minute
 	private transient ThreadPoolExecutor executorPool;
-	private NotificationStatus status = new NotificationStatus();
+	private Vector<NotificationStatus> statusList = new Vector<>();
 	private Config config;
 
 	@Override
@@ -57,32 +57,33 @@ public class AlertEmailPlugin implements NotificationPlugin {
 		LOG.info(" Creating Email Generator... ");
 		for( AlertDefinitionAPIEntity  entity : initAlertDefs ){
 			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
-			for( Map<String,String> notificationConfigMap : configMaps ){
-				String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
-				// for backward compatibility, default notification is email
-				if(notificationType == null || notificationType.equalsIgnoreCase(NotificationConstants.EMAIL_NOTIFICATION)){
-					AlertEmailGenerator generator = createEmailGenerator( notificationConfigMap );
-						this.emailGenerators.put(entity.getTags().get(Constants.POLICY_ID), generator);
-						LOG.info("Successfully initialized email notification for policy " + entity.getTags().get(Constants.POLICY_ID) + ",with " + notificationConfigMap);
-				}
-			}
+			this.update(entity.getTags().get(Constants.POLICY_ID), configMaps, false);
 		}
 	}
 
 	/**
-	 * @param notificationConf
+	 * @param notificationConfigCollection
 	 * @throws Exception
      */
 	@Override
-	public void update(String policyId, Map<String,String> notificationConf  , boolean isPolicyDelete ) throws Exception {
-		if( isPolicyDelete ){
+	public void update(String policyId, List<Map<String,String>> notificationConfigCollection, boolean isPolicyDelete) throws Exception {
+		if(isPolicyDelete){
 			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
 			this.emailGenerators.remove(policyId);
 			return;
 		}
-		AlertEmailGenerator generator = createEmailGenerator(notificationConf);
-		this.emailGenerators.put(policyId , generator );
-		LOG.info("created/updated email generator for updated policy " + policyId);
+		Vector<AlertEmailGenerator> generators = new Vector<>();
+		for(Map<String, String> notificationConf: notificationConfigCollection) {
+			String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
+			if(notificationType == null || notificationType.equalsIgnoreCase(NotificationConstants.EMAIL_NOTIFICATION)) {
+				AlertEmailGenerator generator = createEmailGenerator(notificationConf);
+				generators.add(generator);
+			}
+		}
+		if(generators.size() != 0) {
+			this.emailGenerators.put(policyId, generators);
+			LOG.info("created/updated email generators for policy " + policyId);
+		}
 	}
 
 	/**
@@ -93,20 +94,24 @@ public class AlertEmailPlugin implements NotificationPlugin {
 	@Override
 	public void onAlert(AlertAPIEntity alertEntity) throws  Exception {
 		String policyId = alertEntity.getTags().get(Constants.POLICY_ID);
-		AlertEmailGenerator generator = this.emailGenerators.get(policyId);
-		boolean isSuccess = generator.sendAlertEmail(alertEntity);
-		if( !isSuccess ) {
-			status.errorMessage = "Failed to send email";
-			status.successful = false;
-		}else {
-			status.errorMessage = "";
-			status.successful = true;
+		List<AlertEmailGenerator> generators = this.emailGenerators.get(policyId);
+		for(AlertEmailGenerator generator: generators) {
+			boolean isSuccess = generator.sendAlertEmail(alertEntity);
+			NotificationStatus status = new NotificationStatus();
+			if( !isSuccess ) {
+				status.errorMessage = "Failed to send email";
+				status.successful = false;
+			}else {
+				status.errorMessage = "";
+				status.successful = true;
+			}
+			this.statusList.add(status);
 		}
 	}
 
 	@Override
-	public NotificationStatus getStatus() {
-		return this.status;
+	public List<NotificationStatus> getStatusList() {
+		return this.statusList;
 	}
 
 	/**

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
index cc4f89d..683f2f5 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/AlertKafkaPlugin.java
@@ -32,6 +32,7 @@ import org.slf4j.LoggerFactory;
 
 import java.util.List;
 import java.util.Map;
+import java.util.Vector;
 import java.util.concurrent.ConcurrentHashMap;
 
 /**
@@ -40,8 +41,8 @@ import java.util.concurrent.ConcurrentHashMap;
 @SuppressWarnings({ "rawtypes", "unchecked" })
 public class AlertKafkaPlugin implements NotificationPlugin {
 	private static final Logger LOG = LoggerFactory.getLogger(AlertKafkaPlugin.class);
-	private NotificationStatus status = new NotificationStatus();
-	private Map<String, Map<String, String>> kafaConfigs = new ConcurrentHashMap<>();
+	private List<NotificationStatus> statusList = new Vector<>();
+	private Map<String, List<Map<String, String>>> kafaConfigs = new ConcurrentHashMap<>();
 	private Config config;
 
 	@Override
@@ -49,36 +50,39 @@ public class AlertKafkaPlugin implements NotificationPlugin {
 		this.config = config;
 		for( AlertDefinitionAPIEntity entity : initAlertDefs ) {
 			List<Map<String,String>>  configMaps = NotificationPluginUtils.deserializeNotificationConfig(entity.getNotificationDef());
-			for( Map<String,String> notificationConfigMap : configMaps ){
-				String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
-				if(notificationType == null){
-					LOG.error("no notificationType field for this notification, ignoring and continue " + notificationConfigMap);
-					continue;
-				}else {
-					// single policy can have multiple configs , only load Kafka Config's
-					if (notificationType.equalsIgnoreCase(NotificationConstants.KAFKA_STORE)) {
-						kafaConfigs.put(entity.getTags().get(Constants.POLICY_ID), notificationConfigMap);
-						break;
-					}
-				}
-			}
+			this.update(entity.getTags().get(Constants.POLICY_ID), configMaps, false);
 		}
 	}
 
 	/**
 	 * Update API to update policy delete/create/update in Notification Plug-ins
-	 * @param  notificationConf
+	 * @param  notificationConfigCollection
 	 * @param isPolicyDelete
 	 * @throws Exception
      */
 	@Override
-	public void update(String policyId, Map<String,String> notificationConf , boolean isPolicyDelete ) throws Exception {
+	public void update(String policyId, List<Map<String,String>> notificationConfigCollection, boolean isPolicyDelete ) throws Exception {
 		if( isPolicyDelete ){
 			LOG.info(" Policy been deleted.. Removing reference from Notification Plugin ");
 			this.kafaConfigs.remove(policyId);
 			return;
 		}
-		kafaConfigs.put(policyId, notificationConf );
+		Vector<Map<String, String>> kafkaConfigList = new Vector<>();
+		for(Map<String,String> notificationConfigMap : notificationConfigCollection){
+			String notificationType = notificationConfigMap.get(NotificationConstants.NOTIFICATION_TYPE);
+			if(notificationType == null){
+				LOG.error("invalid notificationType for this notification, ignoring and continue " + notificationConfigMap);
+				continue;
+			}else {
+				// single policy can have multiple configs , only load Kafka Config's
+				if (notificationType.equalsIgnoreCase(NotificationConstants.KAFKA_STORE)) {
+					kafkaConfigList.add(notificationConfigMap);
+				}
+			}
+		}
+		if(kafkaConfigList.size() != 0) {
+			kafaConfigs.put(policyId, kafkaConfigList);
+		}
 	}
 
 	/**
@@ -87,15 +91,20 @@ public class AlertKafkaPlugin implements NotificationPlugin {
      */
 	@Override
 	public void onAlert(AlertAPIEntity alertEntity) {
-		try{
-			KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(config);
-			producer.send(createRecord(alertEntity));
-			status.successful = true;
-			status.errorMessage = "";
-		}catch(Exception ex ){
-			LOG.error("fail writing alert to Kafka bus", ex);
-			status.successful = false;
-			status.errorMessage = ex.getMessage();
+		String policyId = alertEntity.getTags().get(Constants.POLICY_ID);
+		for(Map<String, String> kafkaConfig: this.kafaConfigs.get(policyId)) {
+			NotificationStatus status = new NotificationStatus();
+			try{
+				KafkaProducer producer = KafkaProducerSingleton.INSTANCE.getProducer(kafkaConfig);
+				producer.send(createRecord(alertEntity, kafkaConfig.get(NotificationConstants.TOPIC)));
+				status.successful = true;
+				status.errorMessage = "";
+			}catch(Exception ex ){
+				LOG.error("fail writing alert to Kafka bus", ex);
+				status.successful = false;
+				status.errorMessage = ex.getMessage();
+			}
+			this.statusList.add(status);
 		}
 	}
 
@@ -105,15 +114,14 @@ public class AlertKafkaPlugin implements NotificationPlugin {
 	 * @return
 	 * @throws Exception
 	 */
-	private ProducerRecord  createRecord(AlertAPIEntity entity ) throws Exception {
-		String policyId = entity.getTags().get(Constants.POLICY_ID);
-		ProducerRecord  record  = new ProducerRecord( this.kafaConfigs.get(policyId).get("topic"), NotificationPluginUtils.objectToStr(entity));
+	private ProducerRecord  createRecord(AlertAPIEntity entity, String topic) throws Exception {
+		ProducerRecord  record  = new ProducerRecord(topic, NotificationPluginUtils.objectToStr(entity));
 		return record;
 	}	
 	
 	@Override
-	public NotificationStatus getStatus() {
-		return status;
+	public List<NotificationStatus> getStatusList() {
+		return statusList;
 	}
 
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
index 53f3bb0..bc9e3c9 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/KafkaProducerSingleton.java
@@ -16,9 +16,11 @@
  */
 package org.apache.eagle.notification.plugin;
 
+import java.util.Map;
 import java.util.Properties;
 
 import com.typesafe.config.Config;
+import org.apache.eagle.notification.base.NotificationConstants;
 import org.apache.eagle.notification.utils.NotificationPluginUtils;
 import org.apache.kafka.clients.producer.KafkaProducer;
 
@@ -28,10 +30,11 @@ import org.apache.kafka.clients.producer.KafkaProducer;
 public enum KafkaProducerSingleton {
 	INSTANCE;	
 
-	public KafkaProducer<String, Object>  getProducer(Config config) throws Exception{
+	public KafkaProducer<String, Object>  getProducer(Map<String, String> config) throws Exception{
 		Properties configMap = new Properties();
-		configMap.put("bootstrap.servers", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
-		configMap.put("metadata.broker.list", NotificationPluginUtils.getPropValue(config, "kafka_broker"));
+		String broker_list = config.get(NotificationConstants.BROKER_LIST);
+		configMap.put("bootstrap.servers", broker_list);
+		configMap.put("metadata.broker.list", broker_list);
 		configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 		configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
 		configMap.put("request.required.acks", "1");	     

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
index 5780176..92ee0b5 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPlugin.java
@@ -33,15 +33,15 @@ public interface NotificationPlugin {
      * for initialization
      * @throws Exception
      */
-    public void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws  Exception;
+    void init(Config config, List<AlertDefinitionAPIEntity> initAlertDefs) throws  Exception;
 
     /**
      * Update Plugin if any change in Policy Definition
      * @param policy to be impacted
-     * @param  notificationConf
+     * @param  notificationConfCollection
      * @throws Exception
      */
-    public void update(String policy, Map<String,String> notificationConf , boolean isPolicyDelete ) throws  Exception;
+    void update(String policy, List<Map<String,String>> notificationConfCollection , boolean isPolicyDelete) throws  Exception;
 
     /**
      * Post a notification for the given alertEntity
@@ -49,11 +49,11 @@ public interface NotificationPlugin {
      * @throws Exception
      */
 
-    public void onAlert( AlertAPIEntity alertEntity ) throws  Exception;
+    void onAlert(AlertAPIEntity alertEntity) throws  Exception;
 
     /**
      * Returns Status of Notification Post
      * @return
      */
-    public NotificationStatus getStatus();
+    List<NotificationStatus> getStatusList();
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
index 2c77f1f..8e9e3b2 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/plugin/NotificationPluginManagerImpl.java
@@ -64,7 +64,7 @@ public class NotificationPluginManagerImpl implements NotificationPluginManager
                 policyNotificationMapping.put(entity.getTags().get(Constants.POLICY_ID) , plugins.values());
             }
         }catch (Exception ex ){
-            LOG.error("Error initializing poliy/notification mapping ", ex);
+            LOG.error("Error initializing policy/notification mapping ", ex);
             throw new IllegalStateException(ex);
         }
     }
@@ -74,7 +74,7 @@ public class NotificationPluginManagerImpl implements NotificationPluginManager
         String policyId = entity.getTags().get(Constants.POLICY_ID);
         Collection<NotificationPlugin> plugins = policyNotificationMapping.get(policyId);
         if(plugins == null || plugins.size() == 0) {
-            LOG.debug("no plugin found for policy " + policyId);
+            LOG.warn("no alert notification plugins found for policy " + policyId);
             return;
         }
         for(NotificationPlugin plugin : plugins){
@@ -115,20 +115,12 @@ public class NotificationPluginManagerImpl implements NotificationPluginManager
 
             // iterate current notifications and update it individually
             List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(alertDef.getNotificationDef());
-            for( Map<String,String> notificationConf : notificationConfigCollection ) {
-                String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
-                // for backward compatibility, use email for default notification type
-                if(notificationType == null){
-                    notificationType = NotificationConstants.EMAIL_NOTIFICATION;
-                }
-                NotificationPlugin plugin = plugins.get(notificationType);
-                if(plugin != null){
-                    plugin.update(policyId, notificationConf, false);
-                }
+            for(NotificationPlugin plugin: plugins.values()) {
+                plugin.update(policyId, notificationConfigCollection, false);
             }
 
             policyNotificationMapping.put(policyId, plugins.values());// update policy - notification types map
-            LOG.info("Successfully broadcasted policy updates to all Notification Plugins ...");
+            LOG.info("Successfully broadcast policy updates to all Notification Plugins ...");
         } catch (Exception e) {
             LOG.error("Error broadcasting policy notification changes ", e);
         }
@@ -141,12 +133,13 @@ public class NotificationPluginManagerImpl implements NotificationPluginManager
         // mapping from notificationType to plugin
         Map<String, NotificationPlugin>  notifications = new HashMap<>();
         List<Map<String,String>> notificationConfigCollection = NotificationPluginUtils.deserializeNotificationConfig(policy.getNotificationDef());
-        for( Map<String,String> notificationConf : notificationConfigCollection ){
+        for(Map<String,String> notificationConf : notificationConfigCollection ){
             String notificationType = notificationConf.get(NotificationConstants.NOTIFICATION_TYPE);
             // for backward compatibility, by default notification type is email if notification type is not specified
             if(notificationType == null){
                 LOG.warn("notificationType is null so use default notification type email for this policy  " + policy);
                 notifications.put(NotificationConstants.EMAIL_NOTIFICATION, plugins.get(NotificationConstants.EMAIL_NOTIFICATION));
+                notifications.put(NotificationConstants.EAGLE_STORE, plugins.get(NotificationConstants.EAGLE_STORE));
             }else if(!plugins.containsKey(notificationType)){
                 LOG.warn("No NotificationPlugin supports this notificationType " + notificationType);
             }else {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
index 350ecb5..e490be3 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/main/java/org/apache/eagle/notification/utils/NotificationPluginUtils.java
@@ -17,11 +17,11 @@
 
 package org.apache.eagle.notification.utils;
 
-
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigObject;
+import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.codehaus.jackson.map.ObjectMapper;
+import org.codehaus.jackson.map.type.CollectionType;
 
 import java.util.List;
 import java.util.Map;
@@ -30,6 +30,7 @@ import java.util.Map;
  * Common methods for Notification Plugin
  */
 public class NotificationPluginUtils {
+	private final static ObjectMapper OBJECT_MAPPER = TaggedLogAPIEntity.buildObjectMapper();
 	/**
 	 * Fetch Notification specific property value
 	 * @param key
@@ -50,9 +51,8 @@ public class NotificationPluginUtils {
 	 * @throws Exception
      */
 	public static List<Map<String,String>> deserializeNotificationConfig( String notificationDef ) throws Exception {
-		ObjectMapper mapper = new ObjectMapper();
-		CollectionType mapCollectionType = mapper.getTypeFactory().constructCollectionType(List.class, Map.class);
-		return mapper.readValue( notificationDef , mapCollectionType);
+		CollectionType mapCollectionType = OBJECT_MAPPER.getTypeFactory().constructCollectionType(List.class, Map.class);
+		return OBJECT_MAPPER.readValue(notificationDef, mapCollectionType);
 	}
 
 	/**
@@ -62,7 +62,6 @@ public class NotificationPluginUtils {
 	 * @throws Exception
      */
 	public static String objectToStr( Object obj ) throws  Exception {
-		ObjectMapper mapper = new ObjectMapper();
-		return mapper.writeValueAsString(obj);
+		return OBJECT_MAPPER.writeValueAsString(obj);
 	}
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
index 74c8e40..4a88964 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEagleStorePlugin.java
@@ -21,6 +21,7 @@ import com.typesafe.config.ConfigFactory;
 import junit.framework.Assert;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
 import org.apache.eagle.notification.plugin.AlertEagleStorePlugin;
 import org.junit.Ignore;
 import org.junit.Test;
@@ -42,7 +43,8 @@ public class TestAlertEagleStorePlugin {
 
         AlertAPIEntity alert = new AlertAPIEntity();
         alert.setDescription("");
+        alert.setAlertContext(new AlertContext().toJsonString());
         plugin.onAlert(alert);
-        Assert.assertTrue(plugin.getStatus().successful);
+        Assert.assertTrue(plugin.getStatusList().get(0).successful);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
index 447150d..9abed72 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertEmailPlugin.java
@@ -42,7 +42,7 @@ public class TestAlertEmailPlugin {
         AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
         def.setTags(new HashMap<String, String>());
         def.getTags().put(Constants.POLICY_ID, "testPolicyId");
-        def.setNotificationDef("[{\"notificationType\":\"email\",\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"last check point time lag found.\",\"flavor\":\"email\",\"id\":\"email_1\",\"tplFileName\":\"\"}]");
+        def.setNotificationDef("[{\"notificationType\":\"email\",\"sender\":\"eagle@apache.org\",\"recipients\":\"eagle@apache.org\",\"subject\":\"last check point time lag found.\",\"tplFileName\":\"\"}]");
         plugin.init(config, Arrays.asList(def));
 
         AlertAPIEntity alert = new AlertAPIEntity();
@@ -51,6 +51,6 @@ public class TestAlertEmailPlugin {
         alert.setDescription("");
         alert.setDecodedAlertContext(new AlertContext());
         plugin.onAlert(alert);
-        Assert.assertTrue(plugin.getStatus().successful);
+        Assert.assertTrue(plugin.getStatusList().get(0).successful);
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
index 318df22..814956e 100644
--- a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestAlertKafkaPlugin.java
@@ -42,7 +42,7 @@ public class TestAlertKafkaPlugin {
 		AlertDefinitionAPIEntity def = new AlertDefinitionAPIEntity();
 		def.setTags(new HashMap<String, String>());
 		def.getTags().put(Constants.POLICY_ID, "testPolicyId");
-		def.setNotificationDef("[{\"notificationType\":\"kafka\",\"topic\":\"testTopic\"}]");
+		def.setNotificationDef("[{\"notificationType\":\"kafka\",\"kafka_broker\":\"sandbox.hortonworks.com:6667\",\"topic\":\"sandbox_hdfs_audit_log\"}]");
 		plugin.init(config, Arrays.asList(def));
 
 		AlertAPIEntity alert = new AlertAPIEntity();
@@ -52,6 +52,6 @@ public class TestAlertKafkaPlugin {
 		alert.setAlertContext(new AlertContext().toJsonString());
 		plugin.onAlert(alert);
 		Thread.sleep(1000); // wait for message sent out
-		Assert.assertTrue(plugin.getStatus().successful);
+		Assert.assertTrue(plugin.getStatusList().get(0).successful);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java
new file mode 100644
index 0000000..2b6f25d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginManager.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.notifications.testcases;
+
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.notification.plugin.NotificationPluginManager;
+import org.apache.eagle.notification.plugin.NotificationPluginManagerImpl;
+import org.apache.eagle.policy.common.Constants;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.HashMap;
+
+public class TestNotificationPluginManager {
+    @Ignore
+    @Test
+    public void testUpdateNotificationPlugins() {
+        boolean isDelete = false;
+        AlertDefinitionAPIEntity alertDef = new AlertDefinitionAPIEntity();
+        alertDef.setTags(new HashMap<String, String>());
+        alertDef.getTags().put(Constants.POLICY_ID, "testPlugin");
+        alertDef.setNotificationDef("[]");
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.updateNotificationPlugins(alertDef, isDelete);
+        Assert.assertTrue(true);
+    }
+    @Ignore
+    @Test
+    public void testUpdateNotificationPlugins2() {
+        boolean isDelete = false;
+        AlertDefinitionAPIEntity alertDef = new AlertDefinitionAPIEntity();
+        alertDef.setTags(new HashMap<String, String>());
+        alertDef.getTags().put(Constants.POLICY_ID, "testEmptyPlugins");
+        alertDef.setNotificationDef("[{\"notificationType\":\"eagleStore\"},{\"notificationType\":\"kafka\",\"kafka_broker\":\"sandbox.hortonworks.com:6667\",\"topic\":\"testTopic\"}]");
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.updateNotificationPlugins(alertDef, isDelete);
+        Assert.assertTrue(true);
+    }
+
+    @Ignore
+    @Test
+    public void testUpdateNotificationPluginsWithDelete() {
+        boolean isDelete = true;
+        AlertDefinitionAPIEntity alertDef = new AlertDefinitionAPIEntity();
+        alertDef.setTags(new HashMap<String, String>());
+        alertDef.getTags().put(Constants.POLICY_ID, "testEmptyPlugins");
+        alertDef.setNotificationDef("[]");
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.updateNotificationPlugins(alertDef, isDelete);
+        Assert.assertTrue(true);
+    }
+
+    @Ignore
+    @Test
+    public void testMultipleNotificationInstance() {
+        AlertAPIEntity alert = new AlertAPIEntity();
+        alert.setTags(new HashMap<String, String>());
+        alert.getTags().put(Constants.POLICY_ID, "testPlugin");
+        alert.setDescription("");
+        alert.setAlertContext(new AlertContext().toJsonString());
+
+        Config config = ConfigFactory.load();
+        NotificationPluginManager manager = new NotificationPluginManagerImpl(config);
+        manager.notifyAlert(alert);
+        Assert.assertTrue(true);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java
new file mode 100644
index 0000000..022526c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-notification-plugin/src/test/java/org/apache/eagle/notifications/testcases/TestNotificationPluginUtils.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.notifications.testcases;
+
+import org.apache.eagle.notification.utils.NotificationPluginUtils;
+import org.junit.Assert;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import java.util.List;
+import java.util.Map;
+
+
+public class TestNotificationPluginUtils {
+    @Ignore
+    @Test
+    public void testDeserializeNotificationConfig() throws Exception {
+        String notificationDef = "[]";
+        List<Map<String,String>> list = NotificationPluginUtils.deserializeNotificationConfig(notificationDef);
+        Assert.assertTrue(list.isEmpty());
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
index 4816bc9..f543d63 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/EmailNotificationConfig.java
@@ -16,6 +16,7 @@
  */
 package org.apache.eagle.alert.config;
 
+@Deprecated
 public class EmailNotificationConfig extends NotificationConfig{
 	private static final long serialVersionUID = 1L;
 	private String sender;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
index 76f2d56..60c87dc 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/config/NotificationConfig.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
 
 import com.fasterxml.jackson.annotation.JsonTypeInfo;
 
+@Deprecated
 @JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "flavor", visible=true)
 public class NotificationConfig implements Serializable{
 	private static final long serialVersionUID = 1L;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
index ef45af3..af42dd3 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
@@ -33,6 +33,7 @@ import com.typesafe.config.ConfigObject;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+@Deprecated
 public class AlertEmailGenerator{
 	private String tplFile;
 	private String sender;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java b/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
index 43bcfe2..7f5bddd 100644
--- a/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
+++ b/eagle-core/eagle-alert/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
@@ -122,7 +122,7 @@ public class SiddhiAlertPolicyValidateProvider extends AlertPolicyValidateProvid
 	
 	@Override
 	public String PolicyType() {
-		return "siddhiCEPEngine";
+		return Constants.policyType.siddhiCEPEngine.name();
 	}
 	
 	@Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
index 76250e2..752c317 100644
--- a/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
+++ b/eagle-core/eagle-data-process/eagle-stream-process-api/src/main/scala/org/apache/eagle/datastream/utils/AlertExecutorConsumerUtils.scala
@@ -21,11 +21,10 @@ package org.apache.eagle.datastream.utils
 
 import java.util
 
-import org.apache.eagle.alert.dedup.{AlertEmailDeduplicationExecutor, AlertEntityDeduplicationExecutor}
+import org.apache.eagle.alert.dedup.AlertEntityDeduplicationExecutor
 import org.apache.eagle.alert.executor.AlertExecutor
 import org.apache.eagle.alert.notification.AlertNotificationExecutor
-import org.apache.eagle.alert.persist.AlertPersistExecutor
-import org.apache.eagle.datastream.core.{StreamConnector, FlatMapProducer, StreamProducer}
+import org.apache.eagle.datastream.core.{FlatMapProducer, StreamConnector, StreamProducer}
 import org.slf4j.{Logger, LoggerFactory}
 
 import scala.collection.mutable.ListBuffer
@@ -57,20 +56,19 @@ object AlertExecutorConsumerUtils {
       alertExecutorIdList.add(x.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getExecutorId));
     val alertDefDao = alertStreamProducers.head.asInstanceOf[FlatMapProducer[AnyRef, AnyRef]].mapper.asInstanceOf[AlertExecutor].getPolicyDefinitionDao
     val entityDedupExecutor: AlertEntityDeduplicationExecutor = new AlertEntityDeduplicationExecutor(alertExecutorIdList, alertDefDao)
-    val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
+    //val emailDedupExecutor: AlertEmailDeduplicationExecutor = new AlertEmailDeduplicationExecutor(alertExecutorIdList, alertDefDao)
     val notificationExecutor: AlertNotificationExecutor = new AlertNotificationExecutor(alertExecutorIdList, alertDefDao)
-    val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
+    //val persistExecutor: AlertPersistExecutor = new AlertPersistExecutor
 
     val entityDedupStreamProducer = FlatMapProducer(entityDedupExecutor)
-    val persistStreamProducer = FlatMapProducer(persistExecutor)
-    val emailDedupStreamProducer = FlatMapProducer(emailDedupExecutor)
+    //val persistStreamProducer = FlatMapProducer(persistExecutor)
+    //val emailDedupStreamProducer = FlatMapProducer(emailDedupExecutor)
     val notificationStreamProducer = FlatMapProducer(notificationExecutor)
-    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
-    toBeAddedEdges += StreamConnector(emailDedupStreamProducer, notificationStreamProducer)
+    //toBeAddedEdges += StreamConnector(entityDedupStreamProducer, persistStreamProducer)
+    toBeAddedEdges += StreamConnector(entityDedupStreamProducer, notificationStreamProducer)
 
     alertStreamProducers.foreach(sp => {
       toBeAddedEdges += StreamConnector(sp, entityDedupStreamProducer)
-      toBeAddedEdges += StreamConnector(sp, emailDedupStreamProducer)
     })
   }
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
index 0a96f6d..1ff8b27 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertAPIEntity.java
@@ -16,11 +16,14 @@
  */
 package org.apache.eagle.alert.entity;
 
+
 import org.apache.eagle.log.entity.meta.*;
 import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.common.metric.AlertContext;
 import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
+import org.codehaus.jackson.annotate.JsonIgnore;
 import org.codehaus.jackson.annotate.JsonIgnoreProperties;
+import org.codehaus.jackson.annotate.JsonProperty;
 import org.codehaus.jackson.map.annotate.JsonSerialize;
 
 @JsonSerialize(include=JsonSerialize.Inclusion.NON_NULL)
@@ -78,19 +81,22 @@ public class AlertAPIEntity extends TaggedLogAPIEntity{
 		_pcs.firePropertyChange("remediationCallback", null, null);
 	}
 
+    @JsonIgnore
 	public String getAlertContext() {
 		return alertContext;
 	}
 
+    @JsonProperty("alertContext")
 	public AlertContext getWrappedAlertContext() {
 		return AlertContext.fromJsonString(alertContext);
 	}
-	
+
+    @JsonIgnore
 	public void setAlertContext(String alertContext) {
 		this.alertContext = alertContext;
 		_pcs.firePropertyChange("alertContext", null, null);
 	}
-
+    @JsonProperty("alertContext")
 	public void setDecodedAlertContext(AlertContext alertContext) {
 		if(alertContext != null) this.alertContext = alertContext.toJsonString();
 		_pcs.firePropertyChange("alertContext", null, null);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
index 6bc6318..290e4ab 100644
--- a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/alert/entity/AlertNotificationEntity.java
@@ -63,4 +63,17 @@ public class AlertNotificationEntity extends TaggedLogAPIEntity {
         this.enabled = enabled;
         valueChanged("enabled");
     }
+
+    @Column("d")
+    private String fields;
+    public String getFields() {
+        return fields;
+    }
+
+    public void setFields(String fields) {
+        this.fields = fields;
+        valueChanged("fields");
+    }
+
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
index 07f8402..4a22987 100644
--- a/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
+++ b/eagle-security/eagle-security-hdfs-auditlog/src/main/resources/log4j.properties
@@ -13,7 +13,7 @@
 # See the License for the specific language governing permissions and
 # limitations under the License.
 
-log4j.rootLogger=INFO, stdout, DRFA
+log4j.rootLogger=DEBUG, stdout, DRFA
 
 eagle.log.dir=./logs
 eagle.log.file=eagle.log

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-webservice/src/main/webapp/app/public/css/main.css
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/app/public/css/main.css b/eagle-webservice/src/main/webapp/app/public/css/main.css
index 8bd6b7d..f07c3fb 100644
--- a/eagle-webservice/src/main/webapp/app/public/css/main.css
+++ b/eagle-webservice/src/main/webapp/app/public/css/main.css
@@ -566,6 +566,20 @@ body .modal-body .nav-stacked > li:last-child {
 	border-bottom: none;
 }
 
+body .box-body .nav-tabs-custom {
+	box-shadow: none;
+	margin-bottom: 0;
+}
+body .box-body .nav-tabs-custom > .nav-tabs > li:first-of-type.active > a {
+	border-left-color: #f4f4f4;
+}
+body .box-body .nav-tabs-custom > .nav-tabs > li > a {
+	padding: 8px 15px;
+}
+body .box-body .nav-tabs-custom > .tab-content {
+	padding: 10px 0;
+}
+
 /* Box */
 .box .guideline {
 	margin-top: 0;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-webservice/src/main/webapp/app/public/feature/common/controller.js
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/app/public/feature/common/controller.js b/eagle-webservice/src/main/webapp/app/public/feature/common/controller.js
index 6217538..6f27956 100644
--- a/eagle-webservice/src/main/webapp/app/public/feature/common/controller.js
+++ b/eagle-webservice/src/main/webapp/app/public/feature/common/controller.js
@@ -59,6 +59,26 @@
 		return Policy;
 	});
 
+	feature.service("Notification", function(Entities) {
+		var Notification = function () {};
+		Notification.map = {};
+
+		Notification.list = Entities.queryEntities("AlertNotificationService");
+		Notification.list._promise.then(function () {
+			$.each(Notification.list, function (i, notification) {
+				// Parse fields
+				notification.fieldList = common.parseJSON(notification.fields, []);
+
+				// Fill map
+				Notification.map[notification.tags.notificationType] = notification;
+			});
+		});
+
+		Notification.promise = Notification.list._promise;
+
+		return Notification;
+	});
+
 	// ==============================================================
 	// =                          Policies                          =
 	// ==============================================================
@@ -77,8 +97,6 @@
 		var _policyList = Entities.queryEntities("AlertDefinitionService", {site: Site.current().tags.site, application: $scope.application.tags.application});
 		_policyList._promise.then(function() {
 			$.each(_policyList, function(i, policy) {
-				policy.__mailStr = common.getValueByPath(common.parseJSON(policy.notificationDef, {}), "0.recipients", "");
-				policy.__mailList = policy.__mailStr.trim() === "" ? [] : policy.__mailStr.split(/[,;]/);
 				policy.__expression = common.parseJSON(policy.policyDef, {}).expression;
 
 				$scope.policyList.push(policy);
@@ -96,7 +114,7 @@
 				var _value = common.getValueByPath(item, path, "").toLowerCase();
 				return _value.indexOf(_key) !== -1;
 			}
-			return _hasKey(item, "tags.policyId") || _hasKey(item, "__expression") || _hasKey(item, "description") || _hasKey(item, "owner") || _hasKey(item, "__mailStr");
+			return _hasKey(item, "tags.policyId") || _hasKey(item, "__expression") || _hasKey(item, "description") || _hasKey(item, "owner");
 		};
 
 		$scope.updatePolicyStatus = Policy.updatePolicyStatus;
@@ -149,8 +167,8 @@
 			} else {
 				policy = $scope.policyList[0];
 
-				policy.__mailStr = common.getValueByPath(common.parseJSON(policy.notificationDef, {}), "0.recipients", "");
-				policy.__mailList = policy.__mailStr.trim() === "" ? [] : policy.__mailStr.split(/[,;]/);
+				policy.__notificationList = common.parseJSON(policy.notificationDef, []);
+
 				policy.__expression = common.parseJSON(policy.policyDef, {}).expression;
 
 				$scope.policy = policy;
@@ -259,7 +277,7 @@
 	});
 
 	// ======================== Policy Edit =========================
-	function policyCtrl(create, PageConfig, Site, Policy, $scope, $wrapState, $q, Entities, Application, Authorization) {
+	function policyCtrl(create, PageConfig, Site, Policy, $scope, $wrapState, $q, UI, Entities, Application, Authorization, Notification) {
 		PageConfig.pageTitle = create ? "Policy Create" : "Policy Edit";
 		PageConfig.pageSubTitle = Site.current().tags.site;
 		PageConfig
@@ -316,6 +334,8 @@
 			]
 		};
 
+		$scope.Notification = Notification;
+
 		$scope.create = create;
 		$scope.encodedRowkey = $wrapState.param.filter;
 
@@ -324,6 +344,50 @@
 		$scope.policy = {};
 
 		// ==========================================
+		// =              Notification              =
+		// ==========================================
+		$scope.notificationTabHolder = null;
+
+		$scope.newNotification = function (notificationType) {
+			var __notification = {
+				notificationType: notificationType
+			};
+
+			$.each(Notification.map[notificationType].fieldList, function (i, field) {
+				__notification[field.name] = field.value || "";
+			});
+
+			$scope.policy.__.notification.push(__notification);
+		};
+
+		Notification.promise.then(function () {
+			$scope.menu = Authorization.isRole('ROLE_ADMIN') ? [
+				{icon: "cog", title: "Configuration", list: [
+					{icon: "trash", title: "Delete", danger: true, func: function () {
+						var notification = $scope.notificationTabHolder.selectedPane.data;
+						UI.deleteConfirm(notification.notificationType).then(null, null, function(holder) {
+							common.array.remove(notification, $scope.policy.__.notification);
+							holder.closeFunc();
+						});
+					}}
+				]},
+				{icon: "plus", title: "New", list: $.map(Notification.list, function(notification) {
+					return {
+						icon: "plus",
+						title: notification.tags.notificationType,
+						func: function () {
+							$scope.newNotification(notification.tags.notificationType);
+							setTimeout(function() {
+								$scope.notificationTabHolder.setSelect(-1);
+								$scope.$apply();
+							}, 0);
+						}
+					};
+				})}
+			] : [];
+		});
+
+		// ==========================================
 		// =            Data Preparation            =
 		// ==========================================
 		// Steam list
@@ -487,8 +551,7 @@
 							conditions: {},
 							notification: [],
 							dedupe: {
-								alertDedupIntervalMin: 10,
-								emailDedupIntervalMin: 10
+								alertDedupIntervalMin: 10
 							},
 							policy: {},
 							window: "externalTime",
@@ -537,7 +600,7 @@
 								title: "OPS",
 								content: "Policy not found!"
 							}, function() {
-								$location.path("/common/policyList");
+								$wrapState.path("/common/policyList");
 								$scope.$apply();
 							});
 							return;
@@ -894,12 +957,7 @@
 
 				// notificationDef
 				$scope.policy.__.notification = $scope.policy.__.notification || [];
-				var _notificationUnit = $scope.policy.__.notification[0];
-				if(_notificationUnit) {
-					_notificationUnit.flavor = "email";
-					_notificationUnit.id = "email_1";
-					_notificationUnit.tplFileName = "";
-				}
+
 				$scope.policy.notificationDef = JSON.stringify($scope.policy.__.notification);
 
 				// policyDef
@@ -990,12 +1048,12 @@
 		});
 	}
 
-	feature.controller('policyCreate', function(PageConfig, Site, Policy, $scope, $wrapState, $q, Entities, Application, Authorization) {
+	feature.controller('policyCreate', function(PageConfig, Site, Policy, $scope, $wrapState, $q, UI, Entities, Application, Authorization, Notification) {
 		var _args = [true];
 		_args.push.apply(_args, arguments);
 		policyCtrl.apply(this, _args);
 	}, "policyEdit");
-	feature.controller('policyEdit', function(PageConfig, Site, Policy, $scope, $wrapState, $q, Entities, Application, Authorization) {
+	feature.controller('policyEdit', function(PageConfig, Site, Policy, $scope, $wrapState, $q, UI, Entities, Application, Authorization, Notification) {
 		PageConfig.lockSite = true;
 		var _args = [false];
 		_args.push.apply(_args, arguments);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyDetail.html
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyDetail.html b/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyDetail.html
index 17f5bf5..cdddc43 100644
--- a/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyDetail.html
+++ b/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyDetail.html
@@ -37,10 +37,27 @@
 				<div class="inline-group">
 					<dl><dt>Description</dt><dd>{{policy.description}}</dd></dl>
 				</div>
-				<div class="inline-group">
+				<!--div class="inline-group">
 					<dl><dt>Alert</dt><dd>
 						<a href="mailto:{{mail}}" ng-repeat="mail in policy.__mailList track by $index" style="margin-right: 10px;">{{mail}}</a>
+						<div tabs>
+							<pane ng-repeat="notification in policy.__notificationList track by $index" data-title="{{notification.notificationType}}">
+							</pane>
+						</div>
 					</dd></dl>
+				</div-->
+				<label>Notification</label>
+				<div tabs>
+					<pane ng-repeat="notification in policy.__notificationList track by $index" data-title="{{notification.notificationType}}">
+						<table class="table table-bordered">
+							<tbody>
+								<tr ng-repeat="(key, value) in notification track by $index">
+									<th width="30%">{{key}}</th>
+									<td>{{value}}</td>
+								</tr>
+							</tbody>
+						</table>
+					</pane>
 				</div>
 			</div>
 			<div class="col-xs-4 text-right" ng-show="Auth.isRole('ROLE_ADMIN')">

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyEdit.html
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyEdit.html b/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyEdit.html
index b3ad0a2..33d4cde 100644
--- a/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyEdit.html
+++ b/eagle-webservice/src/main/webapp/app/public/feature/common/page/policyEdit.html
@@ -261,15 +261,6 @@
 							<input type="number" class="form-control" ng-model="policy.__.dedupe.alertDedupIntervalMin" placeholder="[Minute] Number only. Suggestion: 720">
 						</div>
 					</div>
-					<div class="col-xs-3">
-						<div class="form-group">
-							<label>
-								Email De-Dup Interval(min)
-								<span class="fa fa-question-circle" uib-tooltip="The minimun time interval of email"> </span>
-							</label>
-							<input type="number" class="form-control" ng-model="policy.__.dedupe.emailDedupIntervalMin" placeholder="[Minute] Number only. Suggestion: 1440">
-						</div>
-					</div>
 					<div class="col-xs-2">
 						<div class="form-group">
 							<label>
@@ -302,18 +293,24 @@
 
 				<hr/>
 
-				<div class="form-group">
-					<label>Sender</label>
-					<input type="text" class="form-control" value="noreply-hadoop-eagle@company1.com" placeholder="Enter Sender. e.g. sender@eaxmple.com" ng-model="policy.__.notification[0].sender">
-				</div>
-				<div class="form-group">
-					<label>Recipients</label>
-					<input type="text" class="form-control" placeholder="Enter Recipients. Split with ','. e.g. usera@example.com, userb@example.com" ng-model="policy.__.notification[0].recipients">
-				</div>
-				<div class="form-group">
-					<label>Subject</label>
-					<input type="text" class="form-control" placeholder="Enter Subject" ng-model="policy.__.notification[0].subject">
+				<label>Notification</label>
+				<div tabs menu="menu" holder="notificationTabHolder" ng-show="policy.__.notification.length">
+					<pane ng-repeat="notification in policy.__.notification track by $index" data-data="notification" data-title="{{notification.notificationType}}">
+						<div class="form-group" ng-repeat="field in Notification.map[notification.notificationType].fieldList track by $index">
+							<label>{{field.name}}</label>
+							<input type="text" class="form-control" ng-model="notification[field.name]">
+						</div>
+						<p class="text-muted" ng-if="Notification.map[notification.notificationType].fieldList.length === 0">No configuration required</p>
+					</pane>
 				</div>
+				<ul ng-show="policy.__.notification.length === 0">
+					<li ng-repeat="notification in Notification.list track by $index">
+						<a ng-click="newNotification(notification.tags.notificationType)">+ New {{notification.tags.notificationType}} Notification</a>
+					</li>
+				</ul>
+
+				<hr/>
+
 				<div class="form-group">
 					<label>Description</label>
 					<textarea class="form-control" placeholder="Policy description" ng-model="policy.description"></textarea>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/12e4395f/eagle-webservice/src/main/webapp/app/public/js/components/tabs.js
----------------------------------------------------------------------
diff --git a/eagle-webservice/src/main/webapp/app/public/js/components/tabs.js b/eagle-webservice/src/main/webapp/app/public/js/components/tabs.js
index 57bc8e4..21c4a4a 100644
--- a/eagle-webservice/src/main/webapp/app/public/js/components/tabs.js
+++ b/eagle-webservice/src/main/webapp/app/public/js/components/tabs.js
@@ -56,6 +56,9 @@ eagleComponents.directive('tabs', function() {
 			$scope.setSelect = function(pane) {
 				if(typeof pane === "string") {
 					pane = common.array.find(pane, $scope.paneList, "title");
+				} else if(typeof pane === "number") {
+					pane = (pane + $scope.paneList.length) % $scope.paneList.length;
+					pane = $scope.paneList[pane];
 				}
 
 				$scope.activePane = $scope.selectedPane || pane;