You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/12 08:48:00 UTC

[7/8] incubator-eagle git commit: EAGLE-79 Provide aggregation and persistence DSL support

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
index b846418..27be5d6 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertDeduplicationExecutorBase.java
@@ -16,29 +16,30 @@
  */
 package org.apache.eagle.alert.dedup;
 
-import org.apache.eagle.alert.common.AlertConstants;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.alert.config.DeduplicatorConfig;
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.policy.DynamicPolicyLoader;
-import org.apache.eagle.alert.policy.PolicyLifecycleMethods;
+import org.apache.eagle.policy.DynamicPolicyLoader;
+import org.apache.eagle.policy.PolicyLifecycleMethods;
 import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor2;
 import org.apache.eagle.datastream.Tuple2;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
 
-public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods {
+public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
 	private static final long serialVersionUID = 1L;
 	private static final Logger LOG = LoggerFactory.getLogger(AlertDeduplicationExecutorBase.class);
 	protected Config config;
@@ -46,14 +47,14 @@ public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExec
 
 	private List<String> alertExecutorIdList;
 	private volatile CopyOnWriteHashMap<String, DefaultDeduplicator<AlertAPIEntity>> alertDedups;
-	private AlertDefinitionDAO dao;
+	private PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao;
 
 	public enum DEDUP_TYPE {
 		ENTITY,
 		EMAIL
 	}
 
-	public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, AlertDefinitionDAO dao){
+	public AlertDeduplicationExecutorBase(List<String> alertExecutorIdList, DEDUP_TYPE dedupType, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
 		this.alertExecutorIdList = alertExecutorIdList;
 		this.dedupType = dedupType;
 		this.dao = dao;
@@ -90,7 +91,7 @@ public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExec
         String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
 	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;	    	    
 	    try {
-	 		initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+	 		initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
 	    }
 	    catch (Exception ex) {
  			LOG.error("fail to initialize initialAlertDefs: ", ex);
@@ -106,7 +107,7 @@ public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExec
                        try {
                           DefaultDeduplicator<AlertAPIEntity> deduplicator = createAlertDedup(alertDef);
                           if (deduplicator != null)
-                              tmpDeduplicators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), deduplicator);
+                              tmpDeduplicators.put(alertDef.getTags().get(Constants.POLICY_ID), deduplicator);
                           else LOG.warn("The dedup interval is not set, alertDef: " + alertDef);
                         }
                         catch (Throwable t) {
@@ -121,7 +122,7 @@ public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExec
 
 		alertDedups = new CopyOnWriteHashMap<>();
 		alertDedups.putAll(tmpDeduplicators);
-		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
+		DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
 		policyLoader.init(initialAlertDefs, dao, config);
 		for (String alertExecutorId : alertExecutorIdList) {
 		 	policyLoader.addPolicyChangeListener(alertExecutorId, this);
@@ -159,7 +160,7 @@ public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExec
 			DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
 			if (dedup != null) {
 				synchronized(alertDedups) {		
-					alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
+					alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
 				}
 			}
 		}
@@ -172,7 +173,7 @@ public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExec
 			DefaultDeduplicator<AlertAPIEntity> dedup = createAlertDedup(alertDef);
 			if (dedup != null) {
 				synchronized(alertDedups) {
-					alertDedups.put(alertDef.getTags().get(AlertConstants.POLICY_ID), dedup);
+					alertDedups.put(alertDef.getTags().get(Constants.POLICY_ID), dedup);
 				}
 			}
 		}
@@ -184,7 +185,7 @@ public abstract class AlertDeduplicationExecutorBase extends JavaStormStreamExec
 			LOG.info("alert dedup config deleted " + alertDef);
 			// no cleanup to do, just remove it
 			synchronized(alertDedups) {		
-				alertDedups.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
+				alertDedups.remove(alertDef.getTags().get(Constants.POLICY_ID));
 			}
 		}
 	}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
index 81cd78f..8947d2c 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEmailDeduplicationExecutor.java
@@ -16,7 +16,7 @@
  */
 package org.apache.eagle.alert.dedup;
 
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 
 import java.util.List;
 
@@ -24,7 +24,7 @@ public class AlertEmailDeduplicationExecutor extends AlertDeduplicationExecutorB
 
 	private static final long serialVersionUID = 1L;
 
-	public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
+	public AlertEmailDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
 		super(alertExecutorIdList, DEDUP_TYPE.EMAIL, dao);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
index 2974801..b30dbda 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/dedup/AlertEntityDeduplicationExecutor.java
@@ -16,15 +16,16 @@
  */
 package org.apache.eagle.alert.dedup;
 
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-
 import java.util.List;
 
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
 public class AlertEntityDeduplicationExecutor extends AlertDeduplicationExecutorBase {
 
 	private static final long serialVersionUID = 1L;
 
-	public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
+	public AlertEntityDeduplicationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao){
 		super(alertExecutorIdList, DEDUP_TYPE.ENTITY, dao);
 	}
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
new file mode 100644
index 0000000..2eee6c5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutor.java
@@ -0,0 +1,41 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.executor;
+
+import org.apache.eagle.policy.ResultRender;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.policy.PolicyPartitioner;
+import org.apache.eagle.policy.executor.PolicyProcessExecutor;
+import org.apache.eagle.alert.siddhi.SiddhiAlertAPIEntityRender;
+
+public class AlertExecutor extends PolicyProcessExecutor<AlertDefinitionAPIEntity, AlertAPIEntity> {
+
+	private final SiddhiAlertAPIEntityRender resultRender = new SiddhiAlertAPIEntityRender();
+
+	public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
+			PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefinitionDao, String[] sourceStreams) {
+		super(alertExecutorId, partitioner, numPartitions, partitionSeq, alertDefinitionDao, sourceStreams,
+				AlertDefinitionAPIEntity.class);
+	}
+
+	@Override
+	public ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity> getResultRender() {
+		return resultRender;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
new file mode 100644
index 0000000..75b00a2
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/executor/AlertExecutorCreationUtils.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.executor;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.policy.DefaultPolicyPartitioner;
+import org.apache.eagle.policy.PolicyPartitioner;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+
+/**
+ * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
+ *
+ * <br/><br/>
+ * Explanations for programId, alertExecutorId and policy<br/><br/>
+ * - programId - distributed or single-process program for example one storm topology<br/>
+ * - alertExecutorId - one process/thread which executes multiple policies<br/>
+ * - policy - some rules to be evaluated<br/>
+ *
+ * <br/>
+ *
+ * Normally the mapping is like following:
+ * <pre>
+ * programId (1:N) alertExecutorId
+ * alertExecutorId (1:N) policy
+ * </pre>
+ */
+public class AlertExecutorCreationUtils {
+	private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class);
+
+
+    /**
+     * Build DAG Tasks based on persisted alert definition and schemas from eagle store.
+     *
+     * <h3>Require configuration:</h3>
+     *
+     * <ul>
+     * <li>eagleProps.site: program site id.</li>
+     * <li>eagleProps.dataSource: program data source.</li>
+     * <li>alertExecutorConfigs: only configured executor will be built into execution tasks.</li>
+     * </ul>
+     *
+     * <h3>Steps:</h3>
+     *
+     * <ol>
+     * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li>
+     * <li>(dataSource) => Map[alertExecutorId:String,streamName:List[String]]</li>
+     * <li>(site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]</li>
+     * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]</li>
+     * </ol>
+     */
+	public static AlertExecutor[] createAlertExecutors(Config config, PolicyDefinitionDAO<AlertDefinitionAPIEntity> alertDefDAO,
+			List<String> streamNames, String alertExecutorId) throws Exception{
+		// Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId
+        int numPartitions =1;
+        String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName();
+        String alertExecutorConfigsKey = "alertExecutorConfigs";
+        if(config.hasPath(alertExecutorConfigsKey)) {
+            Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey);
+            if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorId)) {
+                Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
+                int parts = 0;
+                if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
+                numPartitions = parts == 0 ? 1 : parts;
+                if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner");
+            }
+        }
+
+        return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls);
+	}
+
+    /**
+     * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"])
+     */
+	public static AlertExecutor[] createAlertExecutors(PolicyDefinitionDAO alertDefDAO, List<String> sourceStreams,
+                                                          String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
+		LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls);
+
+		PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance();
+		AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions];
+        String[] _sourceStreams = sourceStreams.toArray(new String[0]);
+
+		for(int i = 0; i < numPartitions; i++){
+			alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams);
+		}	
+		return alertExecutors;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
index 2a61d76..1147328 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertEmailGenerator.java
@@ -23,7 +23,7 @@ import java.util.ArrayList;
 import java.util.List;
 import java.util.concurrent.*;
 
-import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.policy.common.Constants;
 import org.apache.eagle.alert.common.AlertEmailSender;
 import org.apache.eagle.alert.email.AlertEmailComponent;
 import org.apache.eagle.alert.email.AlertEmailContext;
@@ -61,8 +61,8 @@ public class AlertEmailGenerator{
 		List<AlertEmailComponent> components = new ArrayList<AlertEmailComponent>();
 		components.add(component);		
 		email.setComponents(components);
-		if (entity.getAlertContext().getProperty(AlertConstants.SUBJECT) != null) {
-			email.setSubject(entity.getAlertContext().getProperty(AlertConstants.SUBJECT));
+		if (entity.getAlertContext().getProperty(Constants.SUBJECT) != null) {
+			email.setSubject(entity.getAlertContext().getProperty(Constants.SUBJECT));
 		}
 		else email.setSubject(subject);
 		email.setVelocityTplFile(tplFile);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
index 4723dec..7200865 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/AlertNotificationExecutor.java
@@ -16,17 +16,21 @@
  */
 package org.apache.eagle.alert.notification;
 
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+
 import org.apache.eagle.alert.config.EmailNotificationConfig;
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
 import org.apache.eagle.alert.entity.AlertAPIEntity;
 import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.policy.DynamicPolicyLoader;
-import org.apache.eagle.alert.policy.PolicyLifecycleMethods;
+import org.apache.eagle.policy.DynamicPolicyLoader;
+import org.apache.eagle.policy.PolicyLifecycleMethods;
 import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
 import org.apache.eagle.datastream.Collector;
 import org.apache.eagle.datastream.JavaStormStreamExecutor1;
@@ -34,16 +38,17 @@ import org.apache.eagle.datastream.Tuple1;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.util.*;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ThreadPoolExecutor;
-import java.util.concurrent.TimeUnit;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
 
 /**
  * notify alert by email, sms or other means
  * currently we only implements email notification
  */
-public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods {
+public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String> implements PolicyLifecycleMethods<AlertDefinitionAPIEntity> {
 
 	private static final long serialVersionUID = 1690354365435407034L;
 	private static final Logger LOG = LoggerFactory.getLogger(AlertNotificationExecutor.class);
@@ -51,7 +56,7 @@ public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String>
 
 	private List<String> alertExecutorIdList;
 	private volatile CopyOnWriteHashMap<String, List<AlertEmailGenerator>> alertEmailGeneratorsMap;
-	private AlertDefinitionDAO dao;
+	private PolicyDefinitionDAO dao;
 
     private final static int DEFAULT_THREAD_POOL_CORE_SIZE = 4;
     private final static int DEFAULT_THREAD_POOL_MAX_SIZE = 8;
@@ -59,7 +64,7 @@ public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String>
 
     private transient ThreadPoolExecutor executorPool;
 
-    public AlertNotificationExecutor(List<String> alertExecutorIdList, AlertDefinitionDAO dao){
+    public AlertNotificationExecutor(List<String> alertExecutorIdList, PolicyDefinitionDAO dao){
 		this.alertExecutorIdList = alertExecutorIdList;
 		this.dao = dao;
 	}
@@ -109,7 +114,7 @@ public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String>
         String dataSource = config.getString("eagleProps.dataSource");
 	    Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
 	    try {
-	 		initialAlertDefs = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+	 		initialAlertDefs = dao.findActivePoliciesGroupbyExecutorId(site, dataSource);
 	    }
 	    catch (Exception ex) {
  			LOG.error("fail to initialize initialAlertDefs: ", ex);
@@ -134,7 +139,7 @@ public class AlertNotificationExecutor extends JavaStormStreamExecutor1<String>
 		
 		alertEmailGeneratorsMap = new CopyOnWriteHashMap<String, List<AlertEmailGenerator>>();
 		alertEmailGeneratorsMap.putAll(tmpEmailGenerators);				
-		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
+		DynamicPolicyLoader<AlertDefinitionAPIEntity> policyLoader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
 		policyLoader.init(initialAlertDefs, dao, config);
 		for (String alertExecutorId : alertExecutorIdList) {
 			policyLoader.addPolicyChangeListener(alertExecutorId, this);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java
deleted file mode 100644
index 3c7de31..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/notification/UrlBuilder.java
+++ /dev/null
@@ -1,64 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.eagle.alert.notification;
-
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.common.EagleBase64Wrapper;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.eagle.log.entity.HBaseInternalLogHelper;
-import org.apache.eagle.log.entity.InternalLog;
-import org.apache.eagle.log.entity.RowkeyBuilder;
-import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
-import org.mortbay.util.UrlEncoded;
-import java.util.Map;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class UrlBuilder {
-
-    private static final Logger logger = LoggerFactory.getLogger(UrlBuilder.class);
-
-    public static String getEncodedRowkey(AlertAPIEntity entity) throws Exception {
-        InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass()));
-        return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log));
-    }
-
-    public static String buildAlertDetailUrl(String host, int port, AlertAPIEntity entity) {
-        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/alertDetail/";
-        try {
-            return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity));
-        }
-        catch (Exception ex) {
-            logger.error("Fail to populate encodedRowkey for alert Entity" + entity.toString());
-            return "N/A";
-        }
-    }
-
-    public static String buiildPolicyDetailUrl(String host, int port, Map<String, String> tags) {
-        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/policyDetail?";
-        String format = "policy=%s&site=%s&executor=%s";
-        String policy = tags.get(AlertConstants.POLICY_ID);
-        String site = tags.get(EagleConfigConstants.SITE);
-        String alertExecutorID = tags.get(AlertConstants.ALERT_EXECUTOR_ID);
-        if (policy != null && site != null && alertExecutorID != null) {
-            return baseUrl + String.format(format, policy, site, alertExecutorID);
-        }
-        return "N/A";
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java
deleted file mode 100644
index 3d7b4fb..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DefaultPolicyPartitioner.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-
-public class DefaultPolicyPartitioner implements PolicyPartitioner{
-	@Override
-	public int partition(int numTotalPartitions, String policyType,
-			String policyId) {
-		final int prime = 31;
-		int result = 1;
-		result = result * prime + policyType.hashCode();
-		result = result < 0 ? result*-1 : result;
-		result = result * prime + policyId.hashCode();
-		result = result < 0 ? result*-1 : result;
-		return result % numTotalPartitions;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
deleted file mode 100644
index 0e76deb..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/DynamicPolicyLoader.java
+++ /dev/null
@@ -1,270 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.ArrayList;
-import java.util.Collection;
-import java.util.Date;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.apache.commons.collections.CollectionUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.alert.dao.AlertDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import com.netflix.config.AbstractPollingScheduler;
-import com.netflix.config.ConcurrentCompositeConfiguration;
-import com.netflix.config.DynamicConfiguration;
-import com.netflix.config.FixedDelayPollingScheduler;
-import com.netflix.config.PollListener;
-import com.netflix.config.PollResult;
-import com.netflix.config.PolledConfigurationSource;
-import com.sun.jersey.client.impl.CopyOnWriteHashMap;
-import com.typesafe.config.Config;
-
-/**
- * JVM level singleton, so multiple alert executor may share the same policy loader
- */
-public class DynamicPolicyLoader {
-	private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
-	
-	private final int defaultInitialDelayMillis = 30*1000;
-	private final int defaultDelayMillis = 60*1000;
-	private final boolean defaultIgnoreDeleteFromSource = true;
-    /**
-     * one alertExecutor may have multiple instances, that is why there is a list of PolicyLifecycleMethods
-     */
-	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>> policyChangeListeners = new CopyOnWriteHashMap<String, List<PolicyLifecycleMethods>>();
-    private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>>();
-	private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
-	private volatile boolean initialized = false;
-	
-	public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMethods alertExecutor){
-		synchronized(policyChangeListeners) {
-			if (policyChangeListeners.get(alertExecutorId) == null) {
-				policyChangeListeners.put(alertExecutorId, new ArrayList<PolicyLifecycleMethods>());
-			}
-			policyChangeListeners.get(alertExecutorId).add(alertExecutor);
-		}
-	}
-
-    public void addPolicyDistributionReporter(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
-        synchronized(policyDistributionUpdaters) {
-            if(policyDistributionUpdaters.get(alertExecutorId) == null) {
-                policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
-            }
-            policyDistributionUpdaters.get(alertExecutorId).add(policyDistUpdater);
-        }
-    }
-	
-	public static DynamicPolicyLoader getInstance(){
-		return instance;
-	}
-	
-	/**
-	 * singleton with init would be good for unit test as well, and it ensures that
-	 * initialization happens only once before you use it.  
-	 * @param config
-	 * @param dao
-	 */
-	public void init(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, 
-			AlertDefinitionDAO dao, Config config){
-		if(!initialized){
-			synchronized(this){
-				if(!initialized){
-					internalInit(initialAlertDefs, dao, config);
-					initialized = true;
-				}
-			}
-		}
-	}
-	
-	/**
-	 * map from alertExecutorId+partitionId to AlertExecutor which implements PolicyLifecycleMethods
-	 * @param initialAlertDefs
-	 * @param dao
-	 * @param config
-	 */
-	private void internalInit(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs,
-			AlertDefinitionDAO dao, Config config){
-		if(!config.getBoolean("dynamicConfigSource.enabled")) {
-            return;
-        }
-		AbstractPollingScheduler scheduler = new FixedDelayPollingScheduler(
-                config.getInt("dynamicConfigSource.initDelayMillis"),
-                config.getInt("dynamicConfigSource.delayMillis"),
-                false
-        );
-
-		scheduler.addPollListener(new PollListener(){
-			@Override
-			public void handleEvent(EventType eventType, PollResult lastResult,
-					Throwable exception) {
-				if (lastResult == null) {
-					LOG.error("The lastResult is null, something must be wrong, probably the eagle service is dead!");
-					throw new RuntimeException("The lastResult is null, probably the eagle service is dead! ", exception);
-				}
-				Map<String, Object> added = lastResult.getAdded();
-				Map<String, Object> changed = lastResult.getChanged();
-				Map<String, Object> deleted = lastResult.getDeleted();
-				for(Map.Entry<String, List<PolicyLifecycleMethods>> entry : policyChangeListeners.entrySet()){
-					String alertExecutorId = entry.getKey();
-					for (PolicyLifecycleMethods policyLifecycleMethod : entry.getValue()) {
-						Map<String, AlertDefinitionAPIEntity> addedPolicies = (Map<String, AlertDefinitionAPIEntity>)added.get(trimPartitionNum(alertExecutorId));
-						if(addedPolicies != null && addedPolicies.size() > 0){
-							policyLifecycleMethod.onPolicyCreated(addedPolicies);
-						}
-						Map<String, AlertDefinitionAPIEntity> changedPolicies = (Map<String, AlertDefinitionAPIEntity>)changed.get(trimPartitionNum(alertExecutorId));
-						if(changedPolicies != null && changedPolicies.size() > 0){
-							policyLifecycleMethod.onPolicyChanged(changedPolicies);
-						}
-						Map<String, AlertDefinitionAPIEntity> deletedPolicies = (Map<String, AlertDefinitionAPIEntity>)deleted.get(trimPartitionNum(alertExecutorId));
-						if(deletedPolicies != null && deletedPolicies.size() > 0){
-							policyLifecycleMethod.onPolicyDeleted(deletedPolicies);
-						}
-					}
-				}
-
-                // notify policyDistributionUpdaters
-                for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
-                    for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
-                        policyDistributionUpdateMethod.report();
-                    }
-                }
-			}
-			private String trimPartitionNum(String alertExecutorId){
-				int i = alertExecutorId.lastIndexOf('_');
-				if(i != -1){
-					return alertExecutorId.substring(0, i);
-				}
-				return alertExecutorId;
-			}
-		});
-		
-		ConcurrentCompositeConfiguration finalConfig = new ConcurrentCompositeConfiguration();
-		      
-		PolledConfigurationSource source = new DynamicPolicySource(initialAlertDefs, dao, config);
-
-		try{
-			DynamicConfiguration dbSourcedConfiguration = new DynamicConfiguration(source, scheduler);
-			finalConfig.addConfiguration(dbSourcedConfiguration);
-		}catch(Exception ex){
-			LOG.warn("Fail loading from DB, continue without DB sourced configuration", ex);
-		}
-	}
-	
-	public static class DynamicPolicySource implements PolledConfigurationSource{
-		private static Logger LOG = LoggerFactory.getLogger(DynamicPolicySource.class);
-		private Config config;
-		private AlertDefinitionDAO dao;
-		/**
-		 * mapping from alertExecutorId to list of policies 
-		 */
-		private Map<String, Map<String, AlertDefinitionAPIEntity>> cachedAlertDefs;
-		
-		public DynamicPolicySource(Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs, AlertDefinitionDAO dao, Config config){
-			this.cachedAlertDefs = initialAlertDefs;
-			this.dao = dao;
-			this.config = config;
-		}
-
-		public PollResult poll(boolean initial, Object checkPoint) throws Exception {
-			LOG.info("Poll policy from eagle service " +  config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) +
-					":" + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) );
-			Map<String, Map<String, AlertDefinitionAPIEntity>> newAlertDefs = 
-					dao.findActiveAlertDefsGroupbyAlertExecutorId(config.getString("eagleProps.site"),
-                            config.getString("eagleProps.dataSource"));
-			
-			// compare runtime alertDefs with cachedAlertDefs and figure out what are added/deleted/updated
-			Map<String, Object> added = new HashMap<String, Object>();
-			Map<String, Object> changed = new HashMap<String, Object>();
-			Map<String, Object> deleted = new HashMap<String, Object>();
-			
-			Set<String> newAlertExecutorIds = newAlertDefs.keySet();
-			Set<String> cachedAlertExecutorIds = cachedAlertDefs.keySet();
-			
-			// dynamically adding new alert executor is not supported, because alert executor is pre-built while program starts up
-			Collection<String> addedAlertExecutorIds = CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds);
-			if(addedAlertExecutorIds != null && addedAlertExecutorIds.size() > 0){
-				LOG.warn("New alertExecutorIds are found : " + addedAlertExecutorIds);
-			}
-			
-			// if one alert executor is missing, it means all policy under that alert executor should be removed
-			Collection<String> deletedAlertExecutorIds = CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds);
-			if(deletedAlertExecutorIds != null && deletedAlertExecutorIds.size() > 0){
-				LOG.warn("Some alertExecutorIds are deleted : " + deletedAlertExecutorIds);
-				for(String deletedAlertExecutorId : deletedAlertExecutorIds){
-					deleted.put(deletedAlertExecutorId, cachedAlertDefs.get(deletedAlertExecutorId));
-				}
-			}
-			
-			// we need calculate added/updated/deleted policy for all executors which are not deleted
-//			Collection<String> updatedAlertExecutorIds = CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds);
-            Collection<String> updatedAlertExecutorIds = newAlertExecutorIds;
-			for(String updatedAlertExecutorId : updatedAlertExecutorIds){
-				Map<String, AlertDefinitionAPIEntity> newPolicies = newAlertDefs.get(updatedAlertExecutorId);
-				Map<String, AlertDefinitionAPIEntity> cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId);
-				PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, added, changed, deleted);
-			}
-			
-			cachedAlertDefs = newAlertDefs;
-
-			return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
-		}
-	}
-	
-	public static class PolicyComparator{
-		public static void compare(String alertExecutorId, Map<String, AlertDefinitionAPIEntity> newPolicies, Map<String, AlertDefinitionAPIEntity> cachedPolicies, 
-				Map<String, Object> added, Map<String, Object> changed, Map<String, Object> deleted){
-			Set<String> newPolicyIds = newPolicies.keySet();
-            Set<String> cachedPolicyIds = cachedPolicies != null ? cachedPolicies.keySet() : new HashSet<String>();
-			Collection<String> addedPolicyIds = CollectionUtils.subtract(newPolicyIds, cachedPolicyIds);
-			Collection<String> deletedPolicyIds = CollectionUtils.subtract(cachedPolicyIds, newPolicyIds);
-			Collection<String> changedPolicyIds = CollectionUtils.intersection(cachedPolicyIds, newPolicyIds);
-			if(addedPolicyIds != null && addedPolicyIds.size() > 0){
-				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
-				for(String addedPolicyId : addedPolicyIds){
-					tmp.put(addedPolicyId, newPolicies.get(addedPolicyId));
-				}
-				added.put(alertExecutorId, tmp);
-			}
-			if(deletedPolicyIds != null && deletedPolicyIds.size() > 0){
-				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
-				for(String deletedPolicyId : deletedPolicyIds){
-					tmp.put(deletedPolicyId, cachedPolicies.get(deletedPolicyId));
-				}
-				deleted.put(alertExecutorId, tmp);
-			}
-			if(changedPolicyIds != null && changedPolicyIds.size() > 0){
-				Map<String, AlertDefinitionAPIEntity> tmp = new HashMap<String, AlertDefinitionAPIEntity>();
-				for(String changedPolicyId : changedPolicyIds){
-					// check if policy is really changed
-					if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){
-						tmp.put(changedPolicyId, newPolicies.get(changedPolicyId));
-					}
-				}
-				changed.put(alertExecutorId, tmp);
-			}
-		}
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java
deleted file mode 100644
index 5bf546c..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PartitionUtils.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public class PartitionUtils {
-	
-	public static boolean accept(AlertDefinitionAPIEntity alertDef, PolicyPartitioner partitioner, int numPartitions, int partitionSeq){
-		int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
-		if(targetPartitionSeq == partitionSeq)
-			return true;
-		return false;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
deleted file mode 100644
index 47bfea2..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistStatsDAOLogReporter.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-
-/**
- * just append log
- */
-public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
-    private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
-
-    @Override
-    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
-        if(policyIds != null){
-            StringBuilder sb = new StringBuilder();
-            sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
-            for(String policyId : policyIds){
-                sb.append(policyId + ",");
-            }
-            sb.append("]");
-            LOG.info(sb.toString());
-        }else{
-            LOG.warn("No policies are assigned to " + policyGroupId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
deleted file mode 100644
index 1cc14cc..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionReportMethods.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-/**
- * framework will call report method, it is AlertExecutor's responsibility to report policy distribution information
- */
-public interface PolicyDistributionReportMethods {
-    public void report();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
deleted file mode 100644
index d6bbec9..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStats.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-/**
- * fields for a policy distribution statistics
- */
-public class PolicyDistributionStats {
-    private String policyGroupId;   // normally groupId is alertExecutorId
-    private String policyId;
-    private boolean markDown;       // true if this policy is marked down, false otherwise
-    private double weight;          // comprehensive factors for policy overhead
-
-    public String getPolicyId() {
-        return policyId;
-    }
-
-    public void setPolicyId(String policyId) {
-        this.policyId = policyId;
-    }
-
-    public boolean isMarkDown() {
-        return markDown;
-    }
-
-    public void setMarkDown(boolean markDown) {
-        this.markDown = markDown;
-    }
-
-    public double getWeight() {
-        return weight;
-    }
-
-    public void setWeight(double weight) {
-        this.weight = weight;
-    }
-
-    public String getPolicyGroupId() {
-        return policyGroupId;
-    }
-
-    public void setPolicyGroupId(String policyGroupId) {
-        this.policyGroupId = policyGroupId;
-    }
-
-    public String toString(){
-        StringBuilder sb = new StringBuilder();
-        sb.append("policyId:");
-        sb.append(policyId);
-        sb.append(", markDown:");
-        sb.append(markDown);
-        sb.append(", weight:");
-        sb.append(weight);
-
-        return sb.toString();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
deleted file mode 100644
index 5abe303..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistributionStatsDAO.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-import java.util.Set;
-
-public interface PolicyDistributionStatsDAO {
-    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
deleted file mode 100644
index 96a28f1..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyDistroStatsLogReporter.java
+++ /dev/null
@@ -1,50 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Set;
-
-/**
- * just append log
- */
-public class PolicyDistroStatsLogReporter implements PolicyDistributionStatsDAO{
-    private static Logger LOG = LoggerFactory.getLogger(PolicyDistroStatsLogReporter.class);
-
-    @Override
-    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
-        if(policyIds != null){
-            StringBuilder sb = new StringBuilder();
-            sb.append("policyDistributionStats for " + policyGroupId +", total: " + policyIds.size() + ", [");
-            for(String policyId : policyIds){
-                sb.append(policyId + ",");
-            }
-            if(policyIds.size() > 0){
-                sb.deleteCharAt(sb.length()-1);
-            }
-            sb.append("]");
-            LOG.info(sb.toString());
-        }else{
-            LOG.warn("No policies are assigned to " + policyGroupId);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java
deleted file mode 100644
index 63588b8..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluator.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.dataproc.core.ValuesArray;
-
-public interface PolicyEvaluator {
-	/**
-	 * take input and evaluate expression
-	 * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
-	 * @param input
-	 * @throws Exception
-	 */
-	public void evaluate(ValuesArray input) throws Exception;
-	
-	/**
-	 * notify policy evaluator that policy is updated
-	 */
-	public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef);
-	
-	/**
-	 * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
-	 */
-	public void onPolicyDelete();
-	
-	/**
-	 * get additional context
-	 */	
-	public Map<String, String> getAdditionalContext();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
deleted file mode 100644
index 03060b9..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyEvaluatorServiceProvider.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.List;
-
-import com.fasterxml.jackson.databind.Module;
-
-/**
- * to provide extensibility, we need a clear differentiation between framework job and provider logic
- * policy evaluator framework:
- * - connect to eagle data source
- * - read all policy definitions
- * - compare with cached policy definitions
- * - figure out if policy is created, deleted or updated
- *   - if policy is created, then invoke onPolicyCreated
- *   - if policy is deleted, then invoke onPolicyDeleted
- *   - if policy is updated, then invoke onPolicyUpdated
- * - for policy report, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
- * - specify # of executors for this alert executor id
- * - dynamically balance # of policies evaluated by each alert executor
- *   - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
- * 
- * policy evaluator business features:
- * - register mapping between policy type and PolicyEvaluator
- * - create evaluator engine runtime when configuration is changed
- *
- */
-public interface PolicyEvaluatorServiceProvider {
-	String getPolicyType();
-	Class<? extends PolicyEvaluator> getPolicyEvaluator();
-	List<Module> getBindingModules();
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java
deleted file mode 100644
index 66ecd6b..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyLifecycleMethods.java
+++ /dev/null
@@ -1,27 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-
-public interface PolicyLifecycleMethods {
-	void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added);
-	void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed);
-	void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java
deleted file mode 100644
index 5e5dc8d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyManager.java
+++ /dev/null
@@ -1,61 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.HashMap;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-import java.util.ServiceLoader;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.fasterxml.jackson.databind.Module;
-
-public class PolicyManager {
-	private final static Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
-	private static PolicyManager instance = new PolicyManager();
-
-	private ServiceLoader<PolicyEvaluatorServiceProvider> loader;
-	
-	private Map<String, Class<? extends PolicyEvaluator>> policyEvaluators = new HashMap<String, Class<? extends PolicyEvaluator>>();
-	private Map<String, List<Module>> policyModules = new HashMap<String, List<Module>>();
-	
-	private PolicyManager(){
-		loader = ServiceLoader.load(PolicyEvaluatorServiceProvider.class);
-		Iterator<PolicyEvaluatorServiceProvider> iter = loader.iterator();
-		while(iter.hasNext()){
-			PolicyEvaluatorServiceProvider factory = iter.next();
-			LOG.info("Supported policy type : " + factory.getPolicyType());
-			policyEvaluators.put(factory.getPolicyType(), factory.getPolicyEvaluator());
-			policyModules.put(factory.getPolicyType(), factory.getBindingModules());
-		}
-	}
-	
-	public static PolicyManager getInstance(){
-		return instance;
-	}
-	
-	public Class<? extends PolicyEvaluator> getPolicyEvaluator(String policyType){
-		return policyEvaluators.get(policyType);
-	}
-	
-	public List<Module> getPolicyModules(String policyType){
-		return policyModules.get(policyType);
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java
deleted file mode 100644
index 2047256..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/policy/PolicyPartitioner.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.io.Serializable;
-
-/**
- * partition policies so that policies can be distributed into different alert evaluators
- */
-public interface PolicyPartitioner extends Serializable {
-	int partition(int numTotalPartitions, String policyType, String policyId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java
deleted file mode 100644
index 36d8281..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/AttributeType.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-/**
- * @see org.wso2.siddhi.query.api.definition.Attribute.Type
- */
-public enum AttributeType {
-	STRING,
-	LONG,
-	INTEGER,
-	BOOL,
-    FLOAT,
-    DOUBLE
-//    , OBJECT
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java
deleted file mode 100644
index c245a24..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.apache.eagle.executor.AlertExecutor;
-import org.apache.eagle.alert.policy.PolicyEvaluator;
-import org.apache.eagle.datastream.Collector;
-
-public class EagleAlertContext {
-	
-	public AlertExecutor alertExecutor;
-	
-	public String policyId;
-	
-	public PolicyEvaluator evaluator;
-	
-	public Collector outputCollector;
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
new file mode 100644
index 0000000..a67bae5
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRender.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.common.metric.AlertContext;
+import org.apache.eagle.policy.PolicyEvaluationContext;
+import org.apache.eagle.policy.ResultRender;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.policy.common.UrlBuilder;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
+import org.apache.eagle.policy.siddhi.SiddhiQueryCallbackImpl;
+import org.apache.eagle.policy.siddhi.StreamMetadataManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.Serializable;
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+public class SiddhiAlertAPIEntityRender implements ResultRender<AlertDefinitionAPIEntity, AlertAPIEntity>, Serializable {
+
+	public static final Logger LOG = LoggerFactory.getLogger(SiddhiAlertAPIEntityRender.class);
+	public static final String source = ManagementFactory.getRuntimeMXBean().getName();
+
+	@Override
+	@SuppressWarnings("unchecked")
+	public AlertAPIEntity render(Config config, List<Object> results, PolicyEvaluationContext<AlertDefinitionAPIEntity, AlertAPIEntity> siddhiAlertContext, long timestamp) {
+		List<String> rets = SiddhiQueryCallbackImpl.convertToString(results);
+		SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity> evaluator = (SiddhiPolicyEvaluator<AlertDefinitionAPIEntity, AlertAPIEntity>) siddhiAlertContext.evaluator;
+		String alertExecutorId = siddhiAlertContext.alertExecutor.getExecutorId();
+		AlertAPIEntity entity = new AlertAPIEntity();
+		AlertContext context = new AlertContext();
+		String sourceStreams = evaluator.getAdditionalContext().get(Constants.SOURCE_STREAMS);
+		String[] sourceStreamsArr = sourceStreams.split(",");		
+		List<String> attrRenameList = evaluator.getOutputStreamAttrNameList();		
+		Map<String, String> tags = new HashMap<String, String>();
+		for (String sourceStream : sourceStreamsArr) {
+			 List<AlertStreamSchemaEntity> list = StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim());
+			 for (AlertStreamSchemaEntity alertStream : list) {
+				 if (alertStream.getUsedAsTag() != null && alertStream.getUsedAsTag() == true) {
+					 String attrName = alertStream.getTags().get(Constants.ATTR_NAME);
+					 tags.put(attrName, rets.get(attrRenameList.indexOf(attrName)));
+				 }				 
+			 }			 
+		}
+
+		for (int index = 0; index < rets.size(); index++) {
+			//attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList.
+			context.addProperty(attrRenameList.get(index + 1), rets.get(index));
+		}
+
+		StringBuilder sb = new StringBuilder();
+		for (Entry<String, String> entry : context.getProperties().entrySet()) {
+			String key = entry.getKey();
+			String value = entry.getValue();
+			sb.append(key + "=\"" + value + "\" ");			
+		}
+		context.addAll(evaluator.getAdditionalContext());
+		String policyId = context.getProperty(Constants.POLICY_ID);
+		String alertMessage = "The Policy \"" + policyId + "\" has been detected with the below information: " + sb.toString() ;
+		String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
+		String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+
+		context.addProperty(Constants.ALERT_EVENT, sb.toString());
+		context.addProperty(Constants.ALERT_MESSAGE, alertMessage);
+		context.addProperty(Constants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
+		context.addProperty(EagleConfigConstants.DATA_SOURCE, dataSource);
+		context.addProperty(EagleConfigConstants.SITE, site);
+		entity.setTimestamp(timestamp);
+		/** If we need to add severity tag, we should add severity filed in AbstractpolicyDefinition, and pass it down **/
+		tags.put(EagleConfigConstants.SITE, site);
+		tags.put(EagleConfigConstants.DATA_SOURCE, dataSource);
+		tags.put(Constants.SOURCE_STREAMS, context.getProperty(Constants.SOURCE_STREAMS));
+		tags.put(Constants.POLICY_ID, context.getProperty(Constants.POLICY_ID));
+		tags.put(Constants.ALERT_SOURCE, source);
+		tags.put(Constants.ALERT_EXECUTOR_ID, alertExecutorId);
+		entity.setTags(tags);
+
+		context.addProperty(Constants.POLICY_DETAIL_URL, UrlBuilder.buiildPolicyDetailUrl(host, port, tags));
+		context.addProperty(Constants.ALERT_DETAIL_URL, UrlBuilder.buildAlertDetailUrl(host, port, entity));
+		entity.setAlertContext(context);
+		return entity;
+	}	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
deleted file mode 100644
index 4f38f11..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
+++ /dev/null
@@ -1,100 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.lang.management.ManagementFactory;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-import java.util.Map.Entry;
-
-import org.apache.eagle.alert.common.AlertConstants;
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.notification.UrlBuilder;
-import org.apache.eagle.common.config.EagleConfigConstants;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import org.apache.eagle.common.DateTimeUtil;
-import org.apache.eagle.common.metric.AlertContext;
-import com.typesafe.config.Config;
-
-public class SiddhiAlertAPIEntityRendner {
-
-	public static final Logger LOG = LoggerFactory.getLogger(SiddhiAlertAPIEntityRendner.class);
-	public static final String source = ManagementFactory.getRuntimeMXBean().getName();
-	
-	public static AlertAPIEntity render(Config config, List<String> rets, EagleAlertContext siddhiAlertContext, long timestamp) {
-		SiddhiPolicyEvaluator evaluator = (SiddhiPolicyEvaluator)siddhiAlertContext.evaluator;
-		String alertExecutorId = siddhiAlertContext.alertExecutor.getAlertExecutorId();
-		AlertAPIEntity entity = new AlertAPIEntity();
-		AlertContext context = new AlertContext();
-		String sourceStreams = evaluator.getAdditionalContext().get(AlertConstants.SOURCE_STREAMS);
-		String[] sourceStreamsArr = sourceStreams.split(",");		
-		List<String> attrRenameList = evaluator.getOutputStreamAttrNameList();		
-		Map<String, String> tags = new HashMap<String, String>();
-		for (String sourceStream : sourceStreamsArr) {
-			 List<AlertStreamSchemaEntity> list = StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim());
-			 for (AlertStreamSchemaEntity alertStream : list) {
-				 if (alertStream.getUsedAsTag() != null && alertStream.getUsedAsTag() == true) {
-					 String attrName = alertStream.getTags().get(AlertConstants.ATTR_NAME);
-					 tags.put(attrName, rets.get(attrRenameList.indexOf(attrName)));
-				 }				 
-			 }			 
-		}
-
-		for (int index = 0; index < rets.size(); index++) {
-			//attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList.
-			context.addProperty(attrRenameList.get(index + 1), rets.get(index));
-		}
-
-		StringBuilder sb = new StringBuilder();
-		for (Entry<String, String> entry : context.getProperties().entrySet()) {
-			String key = entry.getKey();
-			String value = entry.getValue();
-			sb.append(key + "=\"" + value + "\" ");			
-		}
-		context.addAll(evaluator.getAdditionalContext());
-		String policyId = context.getProperty(AlertConstants.POLICY_ID); 
-		String alertMessage = "The Policy \"" + policyId + "\" has been detected with the below information: " + sb.toString() ;
-		String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
-		String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
-		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-		Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
-		context.addProperty(AlertConstants.ALERT_EVENT, sb.toString());
-		context.addProperty(AlertConstants.ALERT_MESSAGE, alertMessage);
-		context.addProperty(AlertConstants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
-		context.addProperty(EagleConfigConstants.DATA_SOURCE, dataSource);
-		context.addProperty(EagleConfigConstants.SITE, site);
-		entity.setTimestamp(timestamp);
-		/** If we need to add severity tag, we should add severity filed in AbstractpolicyDefinition, and pass it down **/
-		tags.put(EagleConfigConstants.SITE, site);
-		tags.put(EagleConfigConstants.DATA_SOURCE, dataSource);
-		tags.put(AlertConstants.SOURCE_STREAMS, context.getProperty(AlertConstants.SOURCE_STREAMS));		
-		tags.put(AlertConstants.POLICY_ID, context.getProperty(AlertConstants.POLICY_ID));		
-		tags.put(AlertConstants.ALERT_SOURCE, source);
-		tags.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId);
-		entity.setTags(tags);
-
-		context.addProperty(AlertConstants.POLICY_DETAIL_URL, UrlBuilder.buiildPolicyDetailUrl(host, port, tags));
-		context.addProperty(AlertConstants.ALERT_DETAIL_URL, UrlBuilder.buildAlertDetailUrl(host, port, entity));
-		entity.setAlertContext(context);
-		return entity;
-	}	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java
deleted file mode 100644
index f89e506..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java
+++ /dev/null
@@ -1,26 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.apache.eagle.alert.entity.AlertAPIEntity;
-
-import java.util.List;
-
-public interface SiddhiAlertHandler {
-
-	void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts);
-}