You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/01/12 08:47:56 UTC

[3/8] incubator-eagle git commit: EAGLE-79 Provide aggregation and persistence DSL support

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java
new file mode 100644
index 0000000..7cb8e1f
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/DynamicPolicyLoader.java
@@ -0,0 +1,268 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import com.netflix.config.*;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.*;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ *
+ * @param <T>
+ */
+public class DynamicPolicyLoader<T extends AbstractPolicyDefinitionEntity> {
+	private static final Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
+	
+	private final int defaultInitialDelayMillis = 30*1000;
+	private final int defaultDelayMillis = 60*1000;
+	private final boolean defaultIgnoreDeleteFromSource = true;
+    /**
+     * one alertExecutor may have multiple instances, that is why there is a list of PolicyLifecycleMethods
+     */
+	private volatile CopyOnWriteHashMap<String, List<PolicyLifecycleMethods<T>>> policyChangeListeners = new CopyOnWriteHashMap<>();
+    private volatile CopyOnWriteHashMap<String, List<PolicyDistributionReportMethods>> policyDistributionUpdaters = new CopyOnWriteHashMap<>();
+	private static DynamicPolicyLoader instance = new DynamicPolicyLoader();
+	private volatile boolean initialized = false;
+	
+	public void addPolicyChangeListener(String alertExecutorId, PolicyLifecycleMethods<T> alertExecutor){
+		synchronized(policyChangeListeners) {
+			if (policyChangeListeners.get(alertExecutorId) == null) {
+				policyChangeListeners.put(alertExecutorId, new ArrayList<PolicyLifecycleMethods<T>>());
+			}
+			policyChangeListeners.get(alertExecutorId).add(alertExecutor);
+		}
+	}
+
+	private static ConcurrentHashMap<Class, DynamicPolicyLoader> maps = new ConcurrentHashMap<Class, DynamicPolicyLoader>();
+    public void addPolicyDistributionReporter(String alertExecutorId, PolicyDistributionReportMethods policyDistUpdater){
+        synchronized(policyDistributionUpdaters) {
+            if(policyDistributionUpdaters.get(alertExecutorId) == null) {
+                policyDistributionUpdaters.put(alertExecutorId, new ArrayList<PolicyDistributionReportMethods>());
+            }
+            policyDistributionUpdaters.get(alertExecutorId).add(policyDistUpdater);
+        }
+    }
+	
+	@SuppressWarnings("unchecked")
+	public static <K extends AbstractPolicyDefinitionEntity> DynamicPolicyLoader<K> getInstanceOf(Class<K> clz) {
+		if (maps.containsKey(clz)) {
+			return maps.get(clz);
+		} else {
+			DynamicPolicyLoader<K> loader = new DynamicPolicyLoader<K>();
+			maps.putIfAbsent(clz, loader);
+			return maps.get(clz);
+		}
+	}
+	
+	/**
+	 * singleton with init would be good for unit test as well, and it ensures that
+	 * initialization happens only once before you use it.  
+	 * @param config
+	 * @param dao
+	 */
+	public void init(Map<String, Map<String, T>> initialAlertDefs, 
+			PolicyDefinitionDAO<T> dao, Config config){
+		if(!initialized){
+			synchronized(this){
+				if(!initialized){
+					internalInit(initialAlertDefs, dao, config);
+					initialized = true;
+				}
+			}
+		}
+	}
+	
+	/**
+	 * map from alertExecutorId+partitionId to AlertExecutor which implements PolicyLifecycleMethods
+	 * @param initialAlertDefs
+	 * @param dao
+	 * @param config
+	 */
+	private void internalInit(Map<String, Map<String, T>> initialAlertDefs,
+			PolicyDefinitionDAO<T> dao, Config config){
+		if(!config.getBoolean("dynamicConfigSource.enabled")) {
+            return;
+        }
+		AbstractPollingScheduler scheduler = new FixedDelayPollingScheduler(
+                config.getInt("dynamicConfigSource.initDelayMillis"),
+                config.getInt("dynamicConfigSource.delayMillis"),
+                false
+        );
+
+		scheduler.addPollListener(new PollListener(){
+			@SuppressWarnings("unchecked")
+			@Override
+			public void handleEvent(EventType eventType, PollResult lastResult,
+					Throwable exception) {
+				if (lastResult == null) {
+					LOG.error("The lastResult is null, something must be wrong, probably the eagle service is dead!");
+					throw new RuntimeException("The lastResult is null, probably the eagle service is dead! ", exception);
+				}
+				Map<String, Object> added = lastResult.getAdded();
+				Map<String, Object> changed = lastResult.getChanged();
+				Map<String, Object> deleted = lastResult.getDeleted();
+				for(Map.Entry<String, List<PolicyLifecycleMethods<T>>> entry : policyChangeListeners.entrySet()){
+					String alertExecutorId = entry.getKey();
+					for (PolicyLifecycleMethods<T> policyLifecycleMethod : entry.getValue()) {
+						Map<String, T> addedPolicies = (Map<String, T>)added.get(trimPartitionNum(alertExecutorId));
+						if(addedPolicies != null && addedPolicies.size() > 0){
+							policyLifecycleMethod.onPolicyCreated(addedPolicies);
+						}
+						Map<String, T> changedPolicies = (Map<String, T>)changed.get(trimPartitionNum(alertExecutorId));
+						if(changedPolicies != null && changedPolicies.size() > 0){
+							policyLifecycleMethod.onPolicyChanged(changedPolicies);
+						}
+						Map<String, T> deletedPolicies = (Map<String, T>)deleted.get(trimPartitionNum(alertExecutorId));
+						if(deletedPolicies != null && deletedPolicies.size() > 0){
+							policyLifecycleMethod.onPolicyDeleted(deletedPolicies);
+						}
+					}
+				}
+
+                // notify policyDistributionUpdaters
+                for(Map.Entry<String, List<PolicyDistributionReportMethods>> entry : policyDistributionUpdaters.entrySet()){
+                    for(PolicyDistributionReportMethods policyDistributionUpdateMethod : entry.getValue()){
+                        policyDistributionUpdateMethod.report();
+                    }
+                }
+			}
+			private String trimPartitionNum(String alertExecutorId){
+				int i = alertExecutorId.lastIndexOf('_');
+				if(i != -1){
+					return alertExecutorId.substring(0, i);
+				}
+				return alertExecutorId;
+			}
+		});
+		
+		ConcurrentCompositeConfiguration finalConfig = new ConcurrentCompositeConfiguration();
+		      
+		PolledConfigurationSource source = new DynamicPolicySource<T>(initialAlertDefs, dao, config);
+
+		try{
+			DynamicConfiguration dbSourcedConfiguration = new DynamicConfiguration(source, scheduler);
+			finalConfig.addConfiguration(dbSourcedConfiguration);
+		}catch(Exception ex){
+			LOG.warn("Fail loading from DB, continue without DB sourced configuration", ex);
+		}
+	}
+	
+	public static class DynamicPolicySource<M extends AbstractPolicyDefinitionEntity> implements PolledConfigurationSource {
+		private static Logger LOG = LoggerFactory.getLogger(DynamicPolicySource.class);
+		private Config config;
+		private PolicyDefinitionDAO<M> dao;
+		/**
+		 * mapping from alertExecutorId to list of policies 
+		 */
+		private Map<String, Map<String, M>> cachedAlertDefs;
+		
+		public DynamicPolicySource(Map<String, Map<String, M>> initialAlertDefs, PolicyDefinitionDAO<M> dao, Config config){
+			this.cachedAlertDefs = initialAlertDefs;
+			this.dao = dao;
+			this.config = config;
+		}
+
+		public PollResult poll(boolean initial, Object checkPoint) throws Exception {
+			LOG.info("Poll policy from eagle service " +  config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST) +
+					":" + config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT) );
+			Map<String, Map<String, M>> newAlertDefs = 
+					dao.findActivePoliciesGroupbyExecutorId(config.getString("eagleProps.site"),
+                            config.getString("eagleProps.dataSource"));
+			
+			// compare runtime alertDefs with cachedAlertDefs and figure out what are added/deleted/updated
+			Map<String, Object> added = new HashMap<String, Object>();
+			Map<String, Object> changed = new HashMap<String, Object>();
+			Map<String, Object> deleted = new HashMap<String, Object>();
+			
+			Set<String> newAlertExecutorIds = newAlertDefs.keySet();
+			Set<String> cachedAlertExecutorIds = cachedAlertDefs.keySet();
+			
+			// dynamically adding new alert executor is not supported, because alert executor is pre-built while program starts up
+			Collection<String> addedAlertExecutorIds = CollectionUtils.subtract(newAlertExecutorIds, cachedAlertExecutorIds);
+			if(addedAlertExecutorIds != null && addedAlertExecutorIds.size() > 0){
+				LOG.warn("New alertExecutorIds are found : " + addedAlertExecutorIds);
+			}
+			
+			// if one alert executor is missing, it means all policy under that alert executor should be removed
+			Collection<String> deletedAlertExecutorIds = CollectionUtils.subtract(cachedAlertExecutorIds, newAlertExecutorIds);
+			if(deletedAlertExecutorIds != null && deletedAlertExecutorIds.size() > 0){
+				LOG.warn("Some alertExecutorIds are deleted : " + deletedAlertExecutorIds);
+				for(String deletedAlertExecutorId : deletedAlertExecutorIds){
+					deleted.put(deletedAlertExecutorId, cachedAlertDefs.get(deletedAlertExecutorId));
+				}
+			}
+			
+			// we need calculate added/updated/deleted policy for all executors which are not deleted
+//			Collection<String> updatedAlertExecutorIds = CollectionUtils.intersection(newAlertExecutorIds, cachedAlertExecutorIds);
+            Collection<String> updatedAlertExecutorIds = newAlertExecutorIds;
+			for(String updatedAlertExecutorId : updatedAlertExecutorIds){
+				Map<String, M> newPolicies = newAlertDefs.get(updatedAlertExecutorId);
+				Map<String, M> cachedPolicies = cachedAlertDefs.get(updatedAlertExecutorId);
+				PolicyComparator.compare(updatedAlertExecutorId, newPolicies, cachedPolicies, added, changed, deleted);
+			}
+			
+			cachedAlertDefs = newAlertDefs;
+
+			return PollResult.createIncremental(added, changed, deleted, new Date().getTime());
+		}
+	}
+	
+	public static class PolicyComparator {
+		
+		public static <M extends AbstractPolicyDefinitionEntity> void compare(String alertExecutorId, Map<String, M> newPolicies, Map<String, M> cachedPolicies, 
+				Map<String, Object> added, Map<String, Object> changed, Map<String, Object> deleted){
+			Set<String> newPolicyIds = newPolicies.keySet();
+            Set<String> cachedPolicyIds = cachedPolicies != null ? cachedPolicies.keySet() : new HashSet<String>();
+			Collection<String> addedPolicyIds = CollectionUtils.subtract(newPolicyIds, cachedPolicyIds);
+			Collection<String> deletedPolicyIds = CollectionUtils.subtract(cachedPolicyIds, newPolicyIds);
+			Collection<String> changedPolicyIds = CollectionUtils.intersection(cachedPolicyIds, newPolicyIds);
+			if(addedPolicyIds != null && addedPolicyIds.size() > 0){
+				Map<String, M> tmp = new HashMap<String, M>();
+				for(String addedPolicyId : addedPolicyIds){
+					tmp.put(addedPolicyId, newPolicies.get(addedPolicyId));
+				}
+				added.put(alertExecutorId, tmp);
+			}
+			if(deletedPolicyIds != null && deletedPolicyIds.size() > 0){
+				Map<String, M> tmp = new HashMap<String, M>();
+				for(String deletedPolicyId : deletedPolicyIds){
+					tmp.put(deletedPolicyId, cachedPolicies.get(deletedPolicyId));
+				}
+				deleted.put(alertExecutorId, tmp);
+			}
+			if(changedPolicyIds != null && changedPolicyIds.size() > 0){
+				Map<String, M> tmp = new HashMap<String, M>();
+				for(String changedPolicyId : changedPolicyIds){
+					// check if policy is really changed
+					if(!newPolicies.get(changedPolicyId).equals(cachedPolicies.get(changedPolicyId))){
+						tmp.put(changedPolicyId, newPolicies.get(changedPolicyId));
+					}
+				}
+				changed.put(alertExecutorId, tmp);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java
new file mode 100644
index 0000000..bc9a13f
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PartitionUtils.java
@@ -0,0 +1,31 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+
+public class PartitionUtils {
+	
+	public static boolean accept(AlertDefinitionAPIEntity alertDef, PolicyPartitioner partitioner, int numPartitions, int partitionSeq){
+		int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(Constants.POLICY_TYPE), alertDef.getTags().get(Constants.POLICY_ID));
+		if(targetPartitionSeq == partitionSeq)
+			return true;
+		return false;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java
new file mode 100644
index 0000000..30868f3
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistStatsDAOLogReporter.java
@@ -0,0 +1,47 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * just append log
+ */
+public class PolicyDistStatsDAOLogReporter implements PolicyDistributionStatsDAO{
+    private static Logger LOG = LoggerFactory.getLogger(PolicyDistStatsDAOLogReporter.class);
+
+    @Override
+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
+        if(policyIds != null){
+            StringBuilder sb = new StringBuilder();
+            sb.append("policyDistirbutionStats for " + policyGroupId + "[" + "total: " + policyIds.size() + ", ");
+            for(String policyId : policyIds){
+                sb.append(policyId + ",");
+            }
+            sb.append("]");
+            LOG.info(sb.toString());
+        }else{
+            LOG.warn("No policies are assigned to " + policyGroupId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java
new file mode 100644
index 0000000..7181857
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionReportMethods.java
@@ -0,0 +1,27 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+/**
+ * framework will call report method, it is AlertExecutor's responsibility to report policy distribution information
+ */
+public interface PolicyDistributionReportMethods {
+    void report();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java
new file mode 100644
index 0000000..7a70c95
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStats.java
@@ -0,0 +1,74 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+/**
+ * fields for a policy distribution statistics
+ */
+public class PolicyDistributionStats {
+    private String policyGroupId;   // normally groupId is alertExecutorId
+    private String policyId;
+    private boolean markDown;       // true if this policy is marked down, false otherwise
+    private double weight;          // comprehensive factors for policy overhead
+
+    public String getPolicyId() {
+        return policyId;
+    }
+
+    public void setPolicyId(String policyId) {
+        this.policyId = policyId;
+    }
+
+    public boolean isMarkDown() {
+        return markDown;
+    }
+
+    public void setMarkDown(boolean markDown) {
+        this.markDown = markDown;
+    }
+
+    public double getWeight() {
+        return weight;
+    }
+
+    public void setWeight(double weight) {
+        this.weight = weight;
+    }
+
+    public String getPolicyGroupId() {
+        return policyGroupId;
+    }
+
+    public void setPolicyGroupId(String policyGroupId) {
+        this.policyGroupId = policyGroupId;
+    }
+
+    public String toString(){
+        StringBuilder sb = new StringBuilder();
+        sb.append("policyId:");
+        sb.append(policyId);
+        sb.append(", markDown:");
+        sb.append(markDown);
+        sb.append(", weight:");
+        sb.append(weight);
+
+        return sb.toString();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java
new file mode 100644
index 0000000..12b2b83
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistributionStatsDAO.java
@@ -0,0 +1,26 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+import java.util.Set;
+
+public interface PolicyDistributionStatsDAO {
+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java
new file mode 100644
index 0000000..c4033f0
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyDistroStatsLogReporter.java
@@ -0,0 +1,50 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.policy;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Set;
+
+/**
+ * just append log
+ */
+public class PolicyDistroStatsLogReporter implements PolicyDistributionStatsDAO{
+    private static Logger LOG = LoggerFactory.getLogger(PolicyDistroStatsLogReporter.class);
+
+    @Override
+    public void reportPolicyMembership(String policyGroupId, Set<String> policyIds) {
+        if(policyIds != null){
+            StringBuilder sb = new StringBuilder();
+            sb.append("policyDistributionStats for " + policyGroupId +", total: " + policyIds.size() + ", [");
+            for(String policyId : policyIds){
+                sb.append(policyId + ",");
+            }
+            if(policyIds.size() > 0){
+                sb.deleteCharAt(sb.length()-1);
+            }
+            sb.append("]");
+            LOG.info(sb.toString());
+        }else{
+            LOG.warn("No policies are assigned to " + policyGroupId);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
new file mode 100644
index 0000000..7dad895
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluationContext.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.policy.executor.IPolicyExecutor;
+
+public class PolicyEvaluationContext<T extends AbstractPolicyDefinitionEntity, K> {
+	
+	public IPolicyExecutor<T, K> alertExecutor;
+	
+	public String policyId;
+	
+	public PolicyEvaluator<T> evaluator;
+	
+	public Collector outputCollector;
+	
+	public ResultRender<T, K> resultRender;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
new file mode 100644
index 0000000..8dce80b
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluator.java
@@ -0,0 +1,51 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.dataproc.core.ValuesArray;
+
+/***
+ * 
+ * @param <T> - The policy definition entity
+ */
+public interface PolicyEvaluator<T extends AbstractPolicyDefinitionEntity> {
+	/**
+	 * take input and evaluate expression
+	 * input has 3 fields, first is siddhiAlertContext, second one is streamName, the third is map of attribute name/value
+	 * @param input
+	 * @throws Exception
+	 */
+	public void evaluate(ValuesArray input) throws Exception;
+	
+	/**
+	 * notify policy evaluator that policy is updated
+	 */
+	public void onPolicyUpdate(T newAlertDef);
+	
+	/**
+	 * notify policy evaluator that policy is deleted, here is cleanup work for this policy evaluator
+	 */
+	public void onPolicyDelete();
+	
+	/**
+	 * get additional context
+	 */	
+	public Map<String, String> getAdditionalContext();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java
new file mode 100644
index 0000000..f862883
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyEvaluatorServiceProvider.java
@@ -0,0 +1,49 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.List;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+import com.fasterxml.jackson.databind.Module;
+
+/**
+ * to provide extensibility, we need a clear differentiation between framework job and provider logic
+ * policy evaluator framework:
+ * - connect to eagle data source
+ * - read all policy definitions
+ * - compare with cached policy definitions
+ * - figure out if policy is created, deleted or updated
+ *   - if policy is created, then invoke onPolicyCreated
+ *   - if policy is deleted, then invoke onPolicyDeleted
+ *   - if policy is updated, then invoke onPolicyUpdated
+ * - for policy report, replace old evaluator engine with new evaluator engine which is created by policy evaluator provider
+ * - specify # of executors for this alert executor id
+ * - dynamically balance # of policies evaluated by each alert executor
+ *   - use zookeeper to balance. eaglePolicies/${alertExecutorId}/${alertExecutorInstanceId} => list of policies
+ * 
+ * policy evaluator business features:
+ * - register mapping between policy type and PolicyEvaluator
+ * - create evaluator engine runtime when configuration is changed
+ *
+ */
+public interface PolicyEvaluatorServiceProvider<T extends AbstractPolicyDefinitionEntity> {
+	String getPolicyType();
+	Class<? extends PolicyEvaluator<T>> getPolicyEvaluator();
+	List<Module> getBindingModules();
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java
new file mode 100644
index 0000000..ad5d5c9
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyLifecycleMethods.java
@@ -0,0 +1,27 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+public interface PolicyLifecycleMethods<T extends AbstractPolicyDefinitionEntity> {
+	void onPolicyCreated(Map<String, T> added);
+	void onPolicyChanged(Map<String, T> changed);
+	void onPolicyDeleted(Map<String, T> deleted);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java
new file mode 100644
index 0000000..3f3581d
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyManager.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.ServiceLoader;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.fasterxml.jackson.databind.Module;
+
+public class PolicyManager {
+	private final static Logger LOG = LoggerFactory.getLogger(PolicyManager.class);
+	private static PolicyManager instance = new PolicyManager();
+
+	private ServiceLoader<PolicyEvaluatorServiceProvider> loader;
+	
+	private Map<String, Class<? extends PolicyEvaluator>> policyEvaluators = new HashMap<String, Class<? extends PolicyEvaluator>>();
+	private Map<String, List<Module>> policyModules = new HashMap<String, List<Module>>();
+	
+	private PolicyManager(){
+		loader = ServiceLoader.load(PolicyEvaluatorServiceProvider.class);
+		Iterator<PolicyEvaluatorServiceProvider> iter = loader.iterator();
+		while(iter.hasNext()){
+			PolicyEvaluatorServiceProvider factory = iter.next();
+			LOG.info("Supported policy type : " + factory.getPolicyType());
+			policyEvaluators.put(factory.getPolicyType(), factory.getPolicyEvaluator());
+			policyModules.put(factory.getPolicyType(), factory.getBindingModules());
+		}
+	}
+	
+	public static PolicyManager getInstance(){
+		return instance;
+	}
+	
+	public Class<? extends PolicyEvaluator> getPolicyEvaluator(String policyType){
+		return policyEvaluators.get(policyType);
+	}
+	
+	public List<Module> getPolicyModules(String policyType){
+		return policyModules.get(policyType);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
new file mode 100644
index 0000000..fa9620c
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/PolicyPartitioner.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import java.io.Serializable;
+
+/**
+ * partition policies so that policies can be distributed into different alert evaluators
+ */
+public interface PolicyPartitioner extends Serializable {
+	int partition(int numTotalPartitions, String policyType, String policyId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
new file mode 100644
index 0000000..cc59880
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/ResultRender.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+import java.util.List;
+
+/**
+ * @since Dec 17, 2015
+ *
+ */
+public interface ResultRender<T extends AbstractPolicyDefinitionEntity, K> {
+
+	K render(Config config, List<Object> rets, PolicyEvaluationContext<T, K> siddhiAlertContext, long timestamp);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
new file mode 100644
index 0000000..0499431
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/Constants.java
@@ -0,0 +1,70 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.common;
+
+import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
+
+public class Constants {
+	public final static String ALERT_SERVICE_ENDPOINT_NAME = "AlertService";
+	public final static String ALERT_DEFINITION_SERVICE_ENDPOINT_NAME = "AlertDefinitionService";
+	public final static String ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME = "AlertStreamSchemaService";
+	public final static String ALERT_DATA_SOURCE_SERVICE_ENDPOINT_NAME = "AlertDataSourceService";
+	public final static String ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME = "AlertExecutorService";
+	public final static String ALERT_STREAM_SERVICE_ENDPOINT_NAME = "AlertStreamService";
+	public static final String ALERT_EMAIL_ORIGIN_PROPERTY = "alertEmailOrigin";
+	public static final String ALERT_TIMESTAMP_PROPERTY = "alertTimestamp";
+	
+	public final static String AGGREGATE_DEFINITION_SERVICE_ENDPOINT_NAME = "AggregateDefinitionService";
+
+	public static final String ALERT_EMAIL_TIME_PROPERTY = "timestamp";
+	public static final String ALERT_EMAIL_COUNT_PROPERTY = "count";
+	public static final String ALERT_EMAIL_ALERTLIST_PROPERTY = "alertList";
+
+	public static final String URL = "url";
+	public static final String ALERT_SOURCE = "alertSource";
+	public static final String ALERT_MESSAGE = "alertMessage";
+	public static final String SUBJECT = "subject";
+	public static final String ALERT_EXECUTOR_ID = PolicyDefinitionEntityDAOImpl.ALERT_EXECUTOR_ID;// "alertExecutorId";
+	public static final String POLICY_NAME = "policyName";
+	public static final String POLICY_ID = "policyId";
+    public static final String SOURCE_STREAMS = "sourceStreams";
+    public static final String ALERT_EVENT = "alertEvent";
+	public static final String POLICY_DETAIL_URL = "policyDetailUrl";
+	public static final String ALERT_DETAIL_URL = "alertDetailUrl";
+
+	public static final String POLICY_DEFINITION = "policyDefinition";
+	public static final String POLICY_TYPE = "policyType";
+	public static final String STREAM_NAME = "streamName";
+	public static final String ATTR_NAME = "attrName";
+
+	public static final String ALERT_EXECUTOR_CONFIGS = "alertExecutorConfigs";
+	public static final String PARALLELISM = "parallelism";
+	public static final String PARTITIONER = "partitioner";
+	public static final String SOURCE = "source";
+	public static final String PARTITIONSEQ = "partitionSeq";
+	// policy definition status
+	public static final String EAGLE_DEFAULT_POLICY_NAME = "eagleQuery";
+
+	public enum policyType {
+		siddhiCEPEngine,
+		MachineLearning;
+
+		policyType() {
+		}
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java
new file mode 100644
index 0000000..b10c6c8
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/common/UrlBuilder.java
@@ -0,0 +1,63 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.policy.common;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.common.EagleBase64Wrapper;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.log.entity.HBaseInternalLogHelper;
+import org.apache.eagle.log.entity.InternalLog;
+import org.apache.eagle.log.entity.RowkeyBuilder;
+import org.apache.eagle.log.entity.meta.EntityDefinitionManager;
+import org.mortbay.util.UrlEncoded;
+import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class UrlBuilder {
+
+    private static final Logger logger = LoggerFactory.getLogger(UrlBuilder.class);
+
+    public static String getEncodedRowkey(AlertAPIEntity entity) throws Exception {
+        InternalLog log = HBaseInternalLogHelper.convertToInternalLog(entity, EntityDefinitionManager.getEntityDefinitionByEntityClass(entity.getClass()));
+        return EagleBase64Wrapper.encodeByteArray2URLSafeString(RowkeyBuilder.buildRowkey(log));
+    }
+
+    public static String buildAlertDetailUrl(String host, int port, AlertAPIEntity entity) {
+        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/alertDetail/";
+        try {
+            return baseUrl + UrlEncoded.encodeString(getEncodedRowkey(entity));
+        }
+        catch (Exception ex) {
+            logger.error("Fail to populate encodedRowkey for alert Entity" + entity.toString());
+            return "N/A";
+        }
+    }
+
+    public static String buiildPolicyDetailUrl(String host, int port, Map<String, String> tags) {
+        String baseUrl = "http://" + host + ":" + String.valueOf(port) + "/eagle-service/#/dam/policyDetail?";
+        String format = "policy=%s&site=%s&executor=%s";
+        String policy = tags.get(Constants.POLICY_ID);
+        String site = tags.get(EagleConfigConstants.SITE);
+        String alertExecutorID = tags.get(Constants.ALERT_EXECUTOR_ID);
+        if (policy != null && site != null && alertExecutorID != null) {
+            return baseUrl + String.format(format, policy, site, alertExecutorID);
+        }
+        return "N/A";
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java
new file mode 100644
index 0000000..a0999e6
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/config/AbstractPolicyDefinition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.config;
+
+import com.fasterxml.jackson.annotation.JsonIgnoreProperties;
+import com.fasterxml.jackson.annotation.JsonTypeInfo;
+
+/**
+ * base fields for all policy definition
+ */
+@JsonTypeInfo(use = JsonTypeInfo.Id.NAME, property = "type", visible=true)
+@JsonIgnoreProperties(ignoreUnknown = true)
+public class AbstractPolicyDefinition {
+	private String type;
+    /**
+     * @return type in string
+     */
+	public String getType() {
+		return type;
+	}
+
+    /**
+     * @param type set type value
+     */
+	public void setType(String type) {
+		this.type = type;
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java
new file mode 100644
index 0000000..650defc
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.alert.entity.AlertDataSourceEntity;
+
+import java.util.List;
+
+public interface AlertDataSourceDAO {
+    List<AlertDataSourceEntity> findAlertDataSourceBySite(String site) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java
new file mode 100644
index 0000000..ca037a3
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDataSourceDAOImpl.java
@@ -0,0 +1,61 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertDataSourceEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class AlertDataSourceDAOImpl implements AlertDataSourceDAO{
+    private final Logger LOG = LoggerFactory.getLogger(AlertDataSourceDAOImpl.class);
+    private final EagleServiceConnector connector;
+
+    public AlertDataSourceDAOImpl(EagleServiceConnector connector){
+        this.connector = connector;
+    }
+
+    @Override
+    public List<AlertDataSourceEntity> findAlertDataSourceBySite(String site) throws Exception{
+        try {
+            IEagleServiceClient client = new EagleServiceClientImpl(connector);
+            String query = Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\"]{*}";
+            GenericServiceAPIResponseEntity<AlertDataSourceEntity> response =  client.search()
+                    .startTime(0)
+                    .endTime(10 * DateUtils.MILLIS_PER_DAY)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .send();
+            client.close();
+            if (response.getException() != null) {
+                throw new Exception("Got an exception when query eagle service: " + response.getException());
+            }
+            return response.getObj();
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception when query stream metadata service ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
new file mode 100644
index 0000000..39c5284
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertDefinitionDAOImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Utility methods to load alert definitions for a program
+ */
+public class AlertDefinitionDAOImpl implements PolicyDefinitionDAO<AlertDefinitionAPIEntity> {
+
+	private static final long serialVersionUID = 7717408104714443056L;
+	private static final Logger LOG = LoggerFactory.getLogger(AlertDefinitionDAOImpl.class);
+	private final EagleServiceConnector connector;
+
+	public AlertDefinitionDAOImpl(EagleServiceConnector connector){
+		this.connector = connector;
+	}
+
+    @Override
+	public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) throws Exception {
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(connector);
+			String query = Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME + "[@site=\"" + site + "\" AND @dataSource=\"" + dataSource + "\"]{*}";
+			GenericServiceAPIResponseEntity<AlertDefinitionAPIEntity> response =  client.search()
+																		                .pageSize(Integer.MAX_VALUE)
+																		                .query(query)
+																	                    .send();
+			client.close();
+			if (response.getException() != null) {
+				throw new Exception("Got an exception when query eagle service: " + response.getException()); 
+			}
+			List<AlertDefinitionAPIEntity> list = response.getObj();
+			List<AlertDefinitionAPIEntity> enabledList = new ArrayList<AlertDefinitionAPIEntity>();
+			for (AlertDefinitionAPIEntity entity : list) {
+				if (entity.isEnabled()) enabledList.add(entity);
+			}
+			return enabledList;
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception when query alert Def service", ex);
+			throw new IllegalStateException(ex);
+		}					   
+	}
+
+    @Override
+	public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
+		List<AlertDefinitionAPIEntity> list = findActivePolicies(site, dataSource);
+		Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
+			for (AlertDefinitionAPIEntity entity : list) {
+				String executorID = entity.getTags().get(Constants.ALERT_EXECUTOR_ID);
+				if (map.get(executorID) == null) {
+					map.put(executorID, new HashMap<String, AlertDefinitionAPIEntity>());
+				}
+				map.get(executorID).put(entity.getTags().get("policyId"), entity);
+			}
+		return map;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java
new file mode 100644
index 0000000..09c6266
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAO.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.alert.entity.AlertExecutorEntity;
+
+import java.util.List;
+
+public interface AlertExecutorDAO {
+    List<AlertExecutorEntity> findAlertExecutorByDataSource(String dataSource) throws Exception;
+    List<AlertExecutorEntity> findAlertExecutor(String dataSource, String alertExecutorId) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java
new file mode 100644
index 0000000..0b7b9e3
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertExecutorDAOImpl.java
@@ -0,0 +1,88 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.List;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertExecutorEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AlertExecutorDAOImpl implements AlertExecutorDAO{
+	
+    private static final Logger LOG = LoggerFactory.getLogger(AlertExecutorDAOImpl.class);
+    
+    private final EagleServiceConnector connector;
+
+    public AlertExecutorDAOImpl(EagleServiceConnector connector){
+        this.connector = connector;
+    }
+
+    @Override
+    public List<AlertExecutorEntity> findAlertExecutorByDataSource(String dataSource) throws Exception{
+        try {
+            IEagleServiceClient client = new EagleServiceClientImpl(connector);
+            String query = Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
+            GenericServiceAPIResponseEntity<AlertExecutorEntity> response =  client.search()
+                    .startTime(0)
+                    .endTime(10 * DateUtils.MILLIS_PER_DAY)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .send();
+            client.close();
+            if (response.getException() != null) {
+                throw new Exception("Got an exception when query eagle service: " + response.getException());
+            }
+            return response.getObj();
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception when query stream metadata service ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+
+    @Override
+    public List<AlertExecutorEntity> findAlertExecutor(String dataSource, String alertExecutorId) throws Exception{
+        try {
+            IEagleServiceClient client = new EagleServiceClientImpl(connector);
+            String query = Constants.ALERT_EXECUTOR_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\""
+                    + " AND @alertExecutorId=\"" + alertExecutorId + "\""
+                    + "]{*}";
+            GenericServiceAPIResponseEntity<AlertExecutorEntity> response =  client.search()
+                    .startTime(0)
+                    .endTime(10 * DateUtils.MILLIS_PER_DAY)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .send();
+            client.close();
+            if (response.getException() != null) {
+                throw new Exception("Got an exception when query eagle service: " + response.getException());
+            }
+            return response.getObj();
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception when query stream metadata service ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java
new file mode 100644
index 0000000..fd3dbb9
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.alert.entity.AlertStreamEntity;
+
+import java.util.List;
+
+public interface AlertStreamDAO {
+    List<AlertStreamEntity> findAlertStreamByDataSource(String dataSource) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java
new file mode 100644
index 0000000..537a62f
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamDAOImpl.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertStreamEntity;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.apache.commons.lang.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.List;
+
+public class AlertStreamDAOImpl implements AlertStreamDAO{
+    private final Logger LOG = LoggerFactory.getLogger(AlertStreamDAOImpl.class);
+    private final EagleServiceConnector connector;
+
+    public AlertStreamDAOImpl(EagleServiceConnector connector){
+        this.connector = connector;
+    }
+
+    public List<AlertStreamEntity> findAlertStreamByDataSource(String dataSource) throws Exception{
+        try {
+            IEagleServiceClient client = new EagleServiceClientImpl(connector);
+            String query = Constants.ALERT_STREAM_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
+            GenericServiceAPIResponseEntity<AlertStreamEntity> response =  client.search()
+                    .startTime(0)
+                    .endTime(10 * DateUtils.MILLIS_PER_DAY)
+                    .pageSize(Integer.MAX_VALUE)
+                    .query(query)
+                    .send();
+            client.close();
+            if (response.getException() != null) {
+                throw new Exception("Got an exception when query eagle service: " + response.getException());
+            }
+            return response.getObj();
+        }
+        catch (Exception ex) {
+            LOG.error("Got an exception when query stream metadata service ", ex);
+            throw new IllegalStateException(ex);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java
new file mode 100644
index 0000000..95a2186
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAO.java
@@ -0,0 +1,25 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.List;
+
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+
+public interface AlertStreamSchemaDAO {
+	List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java
new file mode 100644
index 0000000..af1fb3a
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/AlertStreamSchemaDAOImpl.java
@@ -0,0 +1,84 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.util.List;
+
+import org.apache.commons.lang.time.DateUtils;
+import org.apache.eagle.policy.common.Constants;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+public class AlertStreamSchemaDAOImpl implements AlertStreamSchemaDAO {
+	private final Logger LOG = LoggerFactory.getLogger(AlertStreamSchemaDAOImpl.class);
+	
+	private final String eagleServiceHost;
+	private final Integer eagleServicePort;
+	private String username;
+	private String password;
+
+	public AlertStreamSchemaDAOImpl(String eagleServiceHost, Integer eagleServicePort) {
+		this(eagleServiceHost, eagleServicePort, null, null);
+	}
+
+	public AlertStreamSchemaDAOImpl(String eagleServiceHost, Integer eagleServicePort, String username, String password) {
+		this.eagleServiceHost = eagleServiceHost;
+		this.eagleServicePort = eagleServicePort;
+		this.username = username;
+		this.password = password;
+	}
+
+	public AlertStreamSchemaDAOImpl(Config config) {
+		this.eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		this.eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+		if (config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) &&
+			config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD)) {
+			this.username = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME);
+			this.password = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD);
+		}
+	}
+	
+	@Override
+	public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(eagleServiceHost, eagleServicePort, username, password);
+			String query = Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME + "[@dataSource=\"" + dataSource + "\"]{*}";
+			GenericServiceAPIResponseEntity<AlertStreamSchemaEntity> response =  client.search()
+																		                .startTime(0)
+																		                .endTime(10 * DateUtils.MILLIS_PER_DAY)
+																		                .pageSize(Integer.MAX_VALUE)
+																		                .query(query)
+																	                    .send();
+			client.close();
+			if (response.getException() != null) {
+				throw new Exception("Got an exception when query eagle service: " + response.getException()); 
+			}			
+			return response.getObj();
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception when query stream metadata service ", ex);
+			throw new IllegalStateException(ex);
+		}					   
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java
new file mode 100644
index 0000000..74c167b
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionDAO.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import java.io.Serializable;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+
+/**
+ * @param <T> - Policy definition type
+ */
+public interface PolicyDefinitionDAO<T extends AbstractPolicyDefinitionEntity> extends Serializable{
+	/**
+	 * find list of active alert definitions for one specific site and dataSource
+	 * @return
+	 */
+	List<T> findActivePolicies(String site, String dataSource) throws Exception;
+	
+	/**
+	 * find map from alertExecutorId to map from policy Id to alert definition for one specific site and dataSource
+	 * Map from alertExecutorId to map from policyId to policy definition
+       (site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]
+	 * @return
+	 */
+	Map<String, Map<String, T>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception;
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
new file mode 100644
index 0000000..c1bd24e
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/dao/PolicyDefinitionEntityDAOImpl.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.dao;
+
+import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.service.client.EagleServiceConnector;
+import org.apache.eagle.service.client.IEagleServiceClient;
+import org.apache.eagle.service.client.impl.EagleServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * @since Dec 17, 2015
+ *
+ */
+public class PolicyDefinitionEntityDAOImpl<T extends AbstractPolicyDefinitionEntity> implements PolicyDefinitionDAO<T> {
+
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(PolicyDefinitionEntityDAOImpl.class);
+	public static final String ALERT_EXECUTOR_ID = "alertExecutorId";
+	private final EagleServiceConnector connector;
+	private final String servicePointName;
+
+	public PolicyDefinitionEntityDAOImpl(EagleServiceConnector connector, String serviceName){
+		this.connector = connector;
+		this.servicePointName = serviceName;
+	}
+
+    @Override
+	public List<T> findActivePolicies(String site, String dataSource) throws Exception {
+		try {
+			IEagleServiceClient client = new EagleServiceClientImpl(connector);
+			String query = servicePointName + "[@site=\"" + site + "\" AND @dataSource=\"" + dataSource + "\"]{*}";
+			GenericServiceAPIResponseEntity<T> response = client.search()
+												                .pageSize(Integer.MAX_VALUE)
+												                .query(query)
+											                    .send();
+			client.close();
+			if (response.getException() != null) {
+				throw new Exception("Got an exception when query eagle service: " + response.getException()); 
+			}
+			List<T> list = response.getObj();
+			List<T> enabledList = new ArrayList<T>();
+			for (T entity : list) {
+				if (entity.isEnabled()) enabledList.add(entity);
+			}
+			return enabledList;
+		}
+		catch (Exception ex) {
+			LOG.error("Got an exception when query alert Def service", ex);
+			throw new IllegalStateException(ex);
+		}					   
+	}
+    
+    
+
+    @Override
+	public Map<String, Map<String, T>> findActivePoliciesGroupbyExecutorId(String site, String dataSource)
+			throws Exception {
+		List<T> list = findActivePolicies(site, dataSource);
+		Map<String, Map<String, T>> map = new HashMap<String, Map<String, T>>();
+		for (T entity : list) {
+			// support both executorId and legacy alertExecutorId
+			String executorID = entity.getTags().containsKey("executorId") ? entity.getTags().get("executorId")
+					: entity.getTags().get(ALERT_EXECUTOR_ID);
+
+			if (map.get(executorID) == null) {
+				map.put(executorID, new HashMap<String, T>());
+			}
+			map.get(executorID).put(entity.getTags().get("policyId"), entity);
+		}
+		return map;
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/da8f419c/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
new file mode 100644
index 0000000..c9d28a2
--- /dev/null
+++ b/eagle-core/eagle-policy/eagle-policy-base/src/main/java/org/apache/eagle/policy/executor/IPolicyExecutor.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.policy.executor;
+
+import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
+import org.apache.eagle.policy.siddhi.SiddhiEvaluationHandler;
+
+/**
+ * Created on 1/10/16.
+ */
+public interface IPolicyExecutor<T extends AbstractPolicyDefinitionEntity, K> extends SiddhiEvaluationHandler<T, K> {
+    String getExecutorId();
+
+    int getPartitionSeq();
+}