You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:52 UTC

[45/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java
new file mode 100644
index 0000000..c245a24
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/EagleAlertContext.java
@@ -0,0 +1,32 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import org.apache.eagle.executor.AlertExecutor;
+import org.apache.eagle.alert.policy.PolicyEvaluator;
+import org.apache.eagle.datastream.Collector;
+
+public class EagleAlertContext {
+	
+	public AlertExecutor alertExecutor;
+	
+	public String policyId;
+	
+	public PolicyEvaluator evaluator;
+	
+	public Collector outputCollector;
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
new file mode 100644
index 0000000..4f38f11
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertAPIEntityRendner.java
@@ -0,0 +1,100 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Map.Entry;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.notification.UrlBuilder;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import org.apache.eagle.common.DateTimeUtil;
+import org.apache.eagle.common.metric.AlertContext;
+import com.typesafe.config.Config;
+
+public class SiddhiAlertAPIEntityRendner {
+
+	public static final Logger LOG = LoggerFactory.getLogger(SiddhiAlertAPIEntityRendner.class);
+	public static final String source = ManagementFactory.getRuntimeMXBean().getName();
+	
+	public static AlertAPIEntity render(Config config, List<String> rets, EagleAlertContext siddhiAlertContext, long timestamp) {
+		SiddhiPolicyEvaluator evaluator = (SiddhiPolicyEvaluator)siddhiAlertContext.evaluator;
+		String alertExecutorId = siddhiAlertContext.alertExecutor.getAlertExecutorId();
+		AlertAPIEntity entity = new AlertAPIEntity();
+		AlertContext context = new AlertContext();
+		String sourceStreams = evaluator.getAdditionalContext().get(AlertConstants.SOURCE_STREAMS);
+		String[] sourceStreamsArr = sourceStreams.split(",");		
+		List<String> attrRenameList = evaluator.getOutputStreamAttrNameList();		
+		Map<String, String> tags = new HashMap<String, String>();
+		for (String sourceStream : sourceStreamsArr) {
+			 List<AlertStreamSchemaEntity> list = StreamMetadataManager.getInstance().getMetadataEntitiesForStream(sourceStream.trim());
+			 for (AlertStreamSchemaEntity alertStream : list) {
+				 if (alertStream.getUsedAsTag() != null && alertStream.getUsedAsTag() == true) {
+					 String attrName = alertStream.getTags().get(AlertConstants.ATTR_NAME);
+					 tags.put(attrName, rets.get(attrRenameList.indexOf(attrName)));
+				 }				 
+			 }			 
+		}
+
+		for (int index = 0; index < rets.size(); index++) {
+			//attrRenameList.get(0) -> "eagleAlertContext". We need to skip "eagleAlertContext", index is from 1 for attRenameList.
+			context.addProperty(attrRenameList.get(index + 1), rets.get(index));
+		}
+
+		StringBuilder sb = new StringBuilder();
+		for (Entry<String, String> entry : context.getProperties().entrySet()) {
+			String key = entry.getKey();
+			String value = entry.getValue();
+			sb.append(key + "=\"" + value + "\" ");			
+		}
+		context.addAll(evaluator.getAdditionalContext());
+		String policyId = context.getProperty(AlertConstants.POLICY_ID); 
+		String alertMessage = "The Policy \"" + policyId + "\" has been detected with the below information: " + sb.toString() ;
+		String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
+		String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+		String host = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		Integer port = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+
+		context.addProperty(AlertConstants.ALERT_EVENT, sb.toString());
+		context.addProperty(AlertConstants.ALERT_MESSAGE, alertMessage);
+		context.addProperty(AlertConstants.ALERT_TIMESTAMP_PROPERTY, DateTimeUtil.millisecondsToHumanDateWithSeconds(System.currentTimeMillis()));
+		context.addProperty(EagleConfigConstants.DATA_SOURCE, dataSource);
+		context.addProperty(EagleConfigConstants.SITE, site);
+		entity.setTimestamp(timestamp);
+		/** If we need to add severity tag, we should add severity filed in AbstractpolicyDefinition, and pass it down **/
+		tags.put(EagleConfigConstants.SITE, site);
+		tags.put(EagleConfigConstants.DATA_SOURCE, dataSource);
+		tags.put(AlertConstants.SOURCE_STREAMS, context.getProperty(AlertConstants.SOURCE_STREAMS));		
+		tags.put(AlertConstants.POLICY_ID, context.getProperty(AlertConstants.POLICY_ID));		
+		tags.put(AlertConstants.ALERT_SOURCE, source);
+		tags.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId);
+		entity.setTags(tags);
+
+		context.addProperty(AlertConstants.POLICY_DETAIL_URL, UrlBuilder.buiildPolicyDetailUrl(host, port, tags));
+		context.addProperty(AlertConstants.ALERT_DETAIL_URL, UrlBuilder.buildAlertDetailUrl(host, port, entity));
+		entity.setAlertContext(context);
+		return entity;
+	}	
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java
new file mode 100644
index 0000000..f89e506
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiAlertHandler.java
@@ -0,0 +1,26 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+
+import java.util.List;
+
+public interface SiddhiAlertHandler {
+
+	void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java
new file mode 100644
index 0000000..39eb0c9
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyDefinition.java
@@ -0,0 +1,42 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+
+/**
+ * siddhi policy definition has the following format
+ * {
+        "type":"SiddhiCEPEngine",
+		"expression" : "from every b1=HeapUsage[metric == 'eagle.metric.gc'] -> a1=FullGCEvent[eventName == 'full gc'] -> b2=HeapUsage[metric == b1.metric and host == b1.host and value >= b1.value * 0.8] within 100 sec select a1.eventName, b1.metric, b2.timestamp, 60 as timerange insert into GCMonitor; "
+	}
+ */
+public class SiddhiPolicyDefinition extends AbstractPolicyDefinition {
+	private String expression;
+
+	public String getExpression() {
+		return expression;
+	}
+	public void setExpression(String expression) {
+		this.expression = expression;
+	}
+	
+	@Override
+	public String toString(){
+		return expression;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
new file mode 100644
index 0000000..6ba2826
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluator.java
@@ -0,0 +1,264 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.lang.reflect.Field;
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.concurrent.ArrayBlockingQueue;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.policy.PolicyManager;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.query.api.execution.query.Query;
+import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.policy.PolicyEvaluator;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import com.typesafe.config.Config;
+
+/**
+ * when policy is updated or deleted, SiddhiManager.shutdown should be invoked to release resources.
+ * during this time, synchronization is important
+ */
+public class SiddhiPolicyEvaluator implements PolicyEvaluator{
+	private final static Logger LOG = LoggerFactory.getLogger(SiddhiPolicyEvaluator.class);	
+	public static final int DEFAULT_QUEUE_SIZE = 1000;
+	private final BlockingQueue<AlertAPIEntity> queue = new ArrayBlockingQueue<AlertAPIEntity>(DEFAULT_QUEUE_SIZE);
+	private volatile SiddhiRuntime siddhiRuntime;
+	private String[] sourceStreams;
+	private boolean needValidation;
+	private String policyId;
+	private Config config;
+	private final static String EXECUTION_PLAN_NAME = "query";
+	
+	/**
+	 * everything dependent on policyDef should be together and switched in runtime
+	 */
+	public static class SiddhiRuntime{
+		QueryCallback callback;
+		Map<String, InputHandler> siddhiInputHandlers;
+		SiddhiManager siddhiManager;
+		SiddhiPolicyDefinition policyDef;
+		List<String> outputFields;
+		String executionPlanName;
+	}
+	
+	public SiddhiPolicyEvaluator(Config config, String policyName, AbstractPolicyDefinition policyDef, String[] sourceStreams){
+		this(config, policyName, policyDef, sourceStreams, false);
+	}
+	
+	public SiddhiPolicyEvaluator(Config config, String policyId, AbstractPolicyDefinition policyDef, String[] sourceStreams, boolean needValidation){
+		this.config = config;
+		this.policyId = policyId;
+		this.needValidation = needValidation;
+		this.sourceStreams = sourceStreams; 
+		init(policyDef);
+	}
+	
+	public void init(AbstractPolicyDefinition policyDef){			
+		siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
+	}
+
+	public static String addContextFieldIfNotExist(String expression) {		
+		// select fieldA, fieldB --> select eagleAlertContext, fieldA, fieldB
+		int pos = expression.indexOf("select ") + 7;
+		int index = pos;
+		boolean isSelectStarPattern = true;
+		while(index < expression.length()) {
+			if (expression.charAt(index) == ' ') index++;
+			else if (expression.charAt(index) == '*') break;
+			else {
+				isSelectStarPattern = false;
+				break;
+			}
+		}
+		if (isSelectStarPattern) return expression;
+		StringBuilder sb = new StringBuilder();
+		sb.append(expression.substring(0, pos));
+		sb.append(SiddhiStreamMetadataUtils.EAGLE_ALERT_CONTEXT_FIELD + ",");
+		sb.append(expression.substring(pos, expression.length()));
+		return sb.toString();
+	}
+
+	private SiddhiRuntime createSiddhiRuntime(SiddhiPolicyDefinition policyDef){
+		SiddhiManager siddhiManager = new SiddhiManager();
+		Map<String, InputHandler> siddhiInputHandlers = new HashMap<String, InputHandler>();
+
+		StringBuilder sb = new StringBuilder();		
+		for(String sourceStream : sourceStreams){
+			String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(sourceStream);
+			LOG.info("Siddhi stream definition : " + streamDef);
+			sb.append(streamDef);
+		}
+		
+		String expression = addContextFieldIfNotExist(policyDef.getExpression());
+		String executionPlan = sb.toString() + " @info(name = '" + EXECUTION_PLAN_NAME + "') " +  expression;
+		ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
+		
+		for(String sourceStream : sourceStreams){			
+			siddhiInputHandlers.put(sourceStream, executionPlanRuntime.getInputHandler(sourceStream));
+		}
+		executionPlanRuntime.start();
+
+		QueryCallback callback = new SiddhiQueryCallbackImpl(config, this);		
+
+		LOG.info("Siddhi query: " + expression);
+		executionPlanRuntime.addCallback(EXECUTION_PLAN_NAME, callback);
+
+		List<String> outputFields = new ArrayList<String>();
+		try {
+	        Field field = QueryCallback.class.getDeclaredField(EXECUTION_PLAN_NAME);
+	        field.setAccessible(true);
+	        Query query = (Query)field.get(callback);
+	        List<OutputAttribute> list = query.getSelector().getSelectionList();
+	        for (OutputAttribute output : list) {	        	
+	        	outputFields.add(output.getRename());
+	        }
+		}
+		catch (Exception ex) {
+			LOG.error("Got an Exception when initial outputFields ", ex);
+		}
+		SiddhiRuntime runtime = new SiddhiRuntime();
+		runtime.siddhiInputHandlers = siddhiInputHandlers;
+		runtime.siddhiManager = siddhiManager;
+		runtime.callback = callback;
+		runtime.policyDef = policyDef;
+		runtime.outputFields = outputFields;
+		runtime.executionPlanName = executionPlanRuntime.getName();
+		return runtime;
+	}
+	
+	/**
+	 * 1. input has 3 fields, first is siddhi context, second is streamName, the last one is map of attribute name/value
+	 * 2. runtime check for input data (This is very expensive, so we ignore for now)
+	 *     the size of input map should be equal to size of attributes which stream metadata defines
+	 *     the attribute names should be equal to attribute names which stream metadata defines
+	 *     the input field cannot be null
+	 */
+	@SuppressWarnings({ "rawtypes", "unchecked" })
+	@Override
+	public void evaluate(ValuesArray data) throws Exception {
+		if(LOG.isDebugEnabled()) LOG.debug("Siddhi policy evaluator consumers data :" + data);
+        Object siddhiAlertContext = data.get(0);
+		String streamName = (String)data.get(1);
+		SortedMap map = (SortedMap)data.get(2);
+		validateEventInRuntime(streamName, map);
+		synchronized(siddhiRuntime){
+			//insert siddhiAlertContext into the first field
+			List<Object> input = new ArrayList<>();
+			input.add(siddhiAlertContext);
+			putAttrsIntoInputStream(input, streamName, map);
+			siddhiRuntime.siddhiInputHandlers.get(streamName).send(input.toArray(new Object[0]));
+		}
+	}
+
+	/**
+	 * this is a heavy operation, we should avoid to use
+	 * @param sourceStream
+	 * @param data
+	 */
+	private void validateEventInRuntime(String sourceStream, SortedMap data){
+		if(!needValidation)
+			return;
+		SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(sourceStream);
+		if(!map.keySet().equals(data.keySet()))
+			throw new IllegalStateException("incoming data schema is different from supported data schema, incoming data: " + data.keySet() + ", schema: " + map.keySet());
+	}
+
+	private void putAttrsIntoInputStream(List<Object> input, String streamName, SortedMap map) {
+		if(!needValidation) {
+			input.addAll(map.values());
+			return;
+		}
+		for (Object key : map.keySet()) {
+			Object value = map.get(key);
+			if (value == null) {
+				input.add(SiddhiStreamMetadataUtils.getAttrDefaultValue(streamName, (String)key));
+			}
+			else input.add(value);
+		}
+	}
+
+	@Override
+	public void onPolicyUpdate(AlertDefinitionAPIEntity newAlertDef) {
+		AbstractPolicyDefinition policyDef = null;
+		try {
+			policyDef = JsonSerDeserUtils.deserialize(newAlertDef.getPolicyDef(), 
+					AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(newAlertDef.getTags().get(AlertConstants.POLICY_TYPE)));
+		}
+		catch (Exception ex) {
+			LOG.error("Initial policy def error, ", ex);
+		}
+		SiddhiRuntime previous = siddhiRuntime;
+		siddhiRuntime = createSiddhiRuntime((SiddhiPolicyDefinition)policyDef);
+		synchronized(previous){
+			previous.siddhiManager.getExecutionPlanRuntime(previous.executionPlanName).shutdown();
+		}
+	}
+	
+	@Override
+	public void onPolicyDelete(){
+		synchronized(siddhiRuntime){
+			LOG.info("Going to shutdown siddhi execution plan, planName: " + siddhiRuntime.executionPlanName);
+			siddhiRuntime.siddhiManager.getExecutionPlanRuntime(siddhiRuntime.executionPlanName).shutdown();
+			LOG.info("Siddhi execution plan " + siddhiRuntime.executionPlanName + " is successfully shutdown ");
+		}
+	}
+	
+	@Override
+	public String toString(){
+		// show the policyDef
+		return siddhiRuntime.policyDef.toString();
+	}
+	
+	public String[] getStreamNames() {
+		return sourceStreams;
+	}
+
+	public Map<String, String> getAdditionalContext() {
+		Map<String, String> context = new HashMap<String, String>();
+		StringBuilder sourceStreams = new StringBuilder();
+		for (String streamName : getStreamNames()) {
+			sourceStreams.append(streamName + ",");
+		}
+		if (sourceStreams.length() > 0) {
+			sourceStreams.deleteCharAt(sourceStreams.length() - 1);
+		}
+		context.put(AlertConstants.SOURCE_STREAMS, sourceStreams.toString());
+		context.put(AlertConstants.POLICY_ID, policyId);
+		return context;
+	}
+
+	public List<String> getOutputStreamAttrNameList() {
+		return siddhiRuntime.outputFields;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
new file mode 100644
index 0000000..168b04f
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiPolicyEvaluatorServiceProviderImpl.java
@@ -0,0 +1,45 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.policy.PolicyEvaluator;
+import org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider;
+import com.fasterxml.jackson.databind.Module;
+import com.fasterxml.jackson.databind.jsontype.NamedType;
+import com.fasterxml.jackson.databind.module.SimpleModule;
+
+public class SiddhiPolicyEvaluatorServiceProviderImpl implements PolicyEvaluatorServiceProvider {
+	@Override
+	public String getPolicyType() {
+		return AlertConstants.policyType.siddhiCEPEngine.name();
+	}
+
+	@Override
+	public Class<? extends PolicyEvaluator> getPolicyEvaluator() {
+		return SiddhiPolicyEvaluator.class;
+	}
+
+	@Override
+	public List<Module> getBindingModules() {
+		Module module1 = new SimpleModule(AlertConstants.POLICY_DEFINITION).registerSubtypes(new NamedType(SiddhiPolicyDefinition.class, getPolicyType()));
+		return Arrays.asList(module1);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
new file mode 100644
index 0000000..44b9c77
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiQueryCallbackImpl.java
@@ -0,0 +1,85 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.List;
+
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.executor.AlertExecutor;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.typesafe.config.Config;
+
+public class SiddhiQueryCallbackImpl extends QueryCallback{
+
+	private SiddhiPolicyEvaluator evaluator;
+	public static final Logger LOG = LoggerFactory.getLogger(SiddhiQueryCallbackImpl.class);
+	public static final ObjectMapper mapper = new ObjectMapper();	
+	public Config config;
+	
+	public SiddhiQueryCallbackImpl(Config config, SiddhiPolicyEvaluator evaluator) {
+		this.config = config;		
+		this.evaluator = evaluator;
+	}
+	
+	public List<String> getOutputMessage(Event event) {
+		Object[] data = event.getData();
+		List<String> rets = new ArrayList<String>();
+		boolean isFirst = true;
+		for (Object object : data) {
+			// The first field is siddhiAlertContext, skip it
+			if (isFirst) {
+				isFirst = false;
+				continue;
+			}
+			String value = null;
+			if (object instanceof Double) {
+				value = String.valueOf((Double)object);
+			}
+			else if (object instanceof Integer) {
+				value = String.valueOf((Integer)object);
+			}
+			else if (object instanceof Long) {
+				value = String.valueOf((Long)object);
+			}
+			else if (object instanceof String) {
+				value = (String)object;
+			}
+			else if (object instanceof Boolean) {
+				value = String.valueOf((Boolean)object);
+			}
+			rets.add(value);
+		}
+		return rets;
+	}
+	
+	@Override
+	public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+		Object[] data = inEvents[0].getData();
+		EagleAlertContext siddhiAlertContext = (EagleAlertContext)data[0];
+		List<String> rets = getOutputMessage(inEvents[0]);
+		AlertAPIEntity alert = SiddhiAlertAPIEntityRendner.render(config, rets, siddhiAlertContext, timeStamp);
+		AlertExecutor alertExecutor = siddhiAlertContext.alertExecutor;
+		alertExecutor.onAlerts(siddhiAlertContext, Arrays.asList(alert));
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
new file mode 100644
index 0000000..ba495dd
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/SiddhiStreamMetadataUtils.java
@@ -0,0 +1,103 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.util.Map;
+import java.util.SortedMap;
+
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * convert metadata entities for a stream to stream definition for siddhi cep engine
+ * define stream HeapUsage (metric string, host string, value double, timestamp long)
+ */
+public class SiddhiStreamMetadataUtils {
+	private final static Logger LOG = LoggerFactory.getLogger(SiddhiStreamMetadataUtils.class);
+	
+	public final static String EAGLE_ALERT_CONTEXT_FIELD = "eagleAlertContext";
+
+	public static SortedMap<String, AlertStreamSchemaEntity> getAttrMap(String streamName) {
+		SortedMap<String, AlertStreamSchemaEntity> map = StreamMetadataManager.getInstance().getMetadataEntityMapForStream(streamName);
+		if(map == null || map.size() == 0){
+			throw new IllegalStateException("alert stream schema should never be empty");
+		}
+		return map;
+	}
+
+	/**
+     * @see org.wso2.siddhi.query.api.definition.Attribute.Type
+     * make sure StreamMetadataManager.init is invoked before this method
+	 * @param streamName
+	 * @return
+	 */
+	public static String convertToStreamDef(String streamName){
+		SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
+		StringBuilder sb = new StringBuilder();		
+		sb.append(EAGLE_ALERT_CONTEXT_FIELD + " object,");
+		for(Map.Entry<String, AlertStreamSchemaEntity> entry : map.entrySet()){
+			String attrName = entry.getKey();
+			sb.append(attrName);
+			sb.append(" ");
+			String attrType = entry.getValue().getAttrType();
+			if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
+				sb.append("string");
+			}else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
+				sb.append("int");
+			}else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
+				sb.append("long");
+			}else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
+				sb.append("bool");
+			}else if(attrType.equalsIgnoreCase(AttributeType.FLOAT.name())){
+                sb.append("float");
+            }else if(attrType.equalsIgnoreCase(AttributeType.DOUBLE.name())){
+                sb.append("double");
+            }else{
+				LOG.warn("AttrType is not recognized, ignore : " + attrType);
+			}
+			sb.append(",");
+		}
+		if(sb.length() > 0){
+			sb.deleteCharAt(sb.length()-1);
+		}
+		
+		String siddhiStreamDefFormat = "define stream " + streamName + "(" + "%s" + ");";
+		return String.format(siddhiStreamDefFormat, sb.toString());
+	}
+
+	public static Object getAttrDefaultValue(String streamName, String attrName){
+		SortedMap<String, AlertStreamSchemaEntity> map = getAttrMap(streamName);
+		AlertStreamSchemaEntity entity = map.get(attrName);
+		if (entity.getDefaultValue() != null) {
+			return entity.getDefaultValue();
+		}
+		else {
+			String attrType = entity.getAttrType();
+			if (attrType.equalsIgnoreCase(AttributeType.STRING.name())) {
+				return "NA";
+			} else if (attrType.equalsIgnoreCase(AttributeType.INTEGER.name()) || attrType.equalsIgnoreCase(AttributeType.LONG.name())) {
+				return -1;
+			} else if (attrType.equalsIgnoreCase(AttributeType.BOOL.name())) {
+				return true;
+			} else {
+				LOG.warn("AttrType is not recognized: " + attrType + ", treat it as string");
+				return "N/A";
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java
new file mode 100644
index 0000000..618d245
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/StreamMetadataManager.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.SortedMap;
+import java.util.TreeMap;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.commons.collections.map.UnmodifiableMap;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * centralized memory where all stream metadata sit on, it is not mutable data
+ */
+public class StreamMetadataManager {
+	private static final Logger LOG = LoggerFactory.getLogger(StreamMetadataManager.class);
+	
+	private static StreamMetadataManager instance = new StreamMetadataManager();
+	private Map<String, List<AlertStreamSchemaEntity>> map = new HashMap<String, List<AlertStreamSchemaEntity>>();
+	private Map<String, SortedMap<String, AlertStreamSchemaEntity>> map2 = new HashMap<String, SortedMap<String, AlertStreamSchemaEntity>>();
+	private volatile boolean initialized = false;
+	
+	private StreamMetadataManager(){
+	}
+	
+	public static StreamMetadataManager getInstance(){
+		return instance;
+	}
+	
+	private void internalInit(Config config, AlertStreamSchemaDAO dao){
+		try{
+			String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+			List<AlertStreamSchemaEntity> list = dao.findAlertStreamSchemaByDataSource(dataSource);
+			if(list == null)
+				return;
+			for (AlertStreamSchemaEntity entity : list) {
+				String streamName = entity.getTags().get(AlertConstants.STREAM_NAME);
+				if (map.get(streamName) == null) {
+					map.put(streamName, new ArrayList<AlertStreamSchemaEntity>());
+					map2.put(streamName, new TreeMap<String, AlertStreamSchemaEntity>());
+				}
+				map.get(streamName).add(entity);
+				map2.get(streamName).put(entity.getTags().get(AlertConstants.ATTR_NAME), entity);
+			}
+		}catch(Exception ex){
+			LOG.error("Fail building metadata manger", ex);
+			throw new IllegalStateException(ex);
+		}
+	}
+	
+	/**
+	 * singleton with init would be good for unit test as well, and it ensures that
+	 * initialization happens only once before you use it.  
+	 * @param config
+	 * @param dao
+	 */
+	public void init(Config config, AlertStreamSchemaDAO dao){
+		if(!initialized){
+			synchronized(this){
+				if(!initialized){
+                    if(LOG.isDebugEnabled()) LOG.debug("Initializing ...");
+					internalInit(config, dao);
+					initialized = true;
+                    LOG.info("Successfully initialized");
+				}
+			}
+		}else{
+            LOG.info("Already initialized, skip");
+        }
+	}
+
+	// Only for unit test purpose
+	public void reset() {
+		synchronized (this) {
+			initialized = false;
+			map.clear();
+			map2.clear();
+		}
+	}
+
+	private void ensureInitialized(){
+		if(!initialized)
+			throw new IllegalStateException("StreamMetadataManager should be initialized before using it");
+	}
+	
+	public List<AlertStreamSchemaEntity> getMetadataEntitiesForStream(String streamName){
+		ensureInitialized();
+		return getMetadataEntitiesForAllStreams().get(streamName);
+	}
+	
+	public Map<String, List<AlertStreamSchemaEntity>> getMetadataEntitiesForAllStreams(){
+		ensureInitialized();
+		return UnmodifiableMap.decorate(map);
+	}
+	
+	public SortedMap<String, AlertStreamSchemaEntity> getMetadataEntityMapForStream(String streamName){
+		ensureInitialized();
+		return getMetadataEntityMapForAllStreams().get(streamName);
+	}
+	
+	public Map<String, SortedMap<String, AlertStreamSchemaEntity>> getMetadataEntityMapForAllStreams(){
+		ensureInitialized();
+		return UnmodifiableMap.decorate(map2);
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
new file mode 100644
index 0000000..cf76134
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/ContainsIgnoreCaseExtension.java
@@ -0,0 +1,87 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.siddhi.extension;
+
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+public class ContainsIgnoreCaseExtension extends FunctionExecutor {
+
+    Attribute.Type returnType = Attribute.Type.BOOL;
+
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 2) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:containsIgnoreCase() function, required 2, " +
+                    "but found " + attributeExpressionExecutors.length);
+        }
+        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:containsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+        }
+        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:containsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+        }
+    }
+
+    @Override
+    protected Object execute(Object[] data) {
+        if (data[0] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. First argument cannot be null");
+        }
+        if (data[1] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:containsIgnoreCase() function. Second argument cannot be null");
+        }
+        String str1 = (String)data[0];
+        String str2 = (String)data[1];
+        return str1.toUpperCase().contains(str2.toUpperCase());
+    }
+
+    @Override
+    protected Object execute(Object data) {
+        return null; //Since the containsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
+    }
+
+    @Override
+    public void start() {
+        //Nothing to start
+    }
+
+    @Override
+    public void stop() {
+        //Nothing to stop
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+
+    @Override
+    public Object[] currentState() {
+        return new Object[]{};
+    }
+
+    @Override
+    public void restoreState(Object[] state) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
new file mode 100644
index 0000000..0b6e7ec
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/EqualsIgnoreCaseExtension.java
@@ -0,0 +1,91 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.siddhi.extension;
+
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
+import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.core.executor.function.FunctionExecutor;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+public class EqualsIgnoreCaseExtension extends FunctionExecutor {
+
+    Attribute.Type returnType = Attribute.Type.BOOL;
+
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 2) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:equalsIgnoreCase() function, required 2, " +
+                    "but found " + attributeExpressionExecutors.length);
+        }
+        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:equalsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+        }
+        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:equalsIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+        }
+    }
+
+    @Override
+    protected Object execute(Object[] data) {
+        if (data[0] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. First argument cannot be null");
+        }
+        if (data[1] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:equalsIgnoreCase() function. Second argument cannot be null");
+        }
+        String str1 = (String)data[0];
+        String str2 = (String)data[1];
+        return str1.equalsIgnoreCase(str2);
+    }
+
+    @Override
+    protected Object execute(Object data) {
+        return null; //Since the equalsIgnoreCase function takes in 2 parameters, this method does not get called. Hence, not implemented.
+    }
+
+    @Override
+    public void start() {
+        //Nothing to start
+    }
+
+    @Override
+    public void stop() {
+        //Nothing to stop
+    }
+
+    @Override
+    public Attribute.Type getReturnType() {
+        return returnType;
+    }
+
+    @Override
+    public Object[] currentState() {
+        return new Object[]{};
+    }
+
+    @Override
+    public void restoreState(Object[] state) {
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
new file mode 100644
index 0000000..0bf80de
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/alert/siddhi/extension/RegexpIgnoreCaseFunctionExtension.java
@@ -0,0 +1,89 @@
+/*
+ * Copyright (c) 2015, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.eagle.alert.siddhi.extension;
+
+import org.wso2.siddhi.core.config.ExecutionPlanContext;
+import org.wso2.siddhi.core.exception.ExecutionPlanRuntimeException;
+import org.wso2.siddhi.core.executor.ConstantExpressionExecutor;
+import org.wso2.siddhi.core.executor.ExpressionExecutor;
+import org.wso2.siddhi.extension.string.RegexpFunctionExtension;
+import org.wso2.siddhi.query.api.definition.Attribute;
+import org.wso2.siddhi.query.api.exception.ExecutionPlanValidationException;
+
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+/**
+ * regexpIgnoreCase(string, regex)
+ * Tells whether or not this 'string' matches the given regular expression 'regex'.
+ * Accept Type(s): (STRING,STRING)
+ * Return Type(s): BOOLEAN
+ */
+public class RegexpIgnoreCaseFunctionExtension extends RegexpFunctionExtension {
+
+    //state-variables
+    boolean isRegexConstant = false;
+    String regexConstant;
+    Pattern patternConstant;
+
+    @Override
+    protected void init(ExpressionExecutor[] attributeExpressionExecutors, ExecutionPlanContext executionPlanContext) {
+        if (attributeExpressionExecutors.length != 2) {
+            throw new ExecutionPlanValidationException("Invalid no of arguments passed to str:regexpIgnoreCase() function, required 2, " +
+                    "but found " + attributeExpressionExecutors.length);
+        }
+        if (attributeExpressionExecutors[0].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the first argument of str:regexpIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[0].getReturnType().toString());
+        }
+        if (attributeExpressionExecutors[1].getReturnType() != Attribute.Type.STRING) {
+            throw new ExecutionPlanValidationException("Invalid parameter type found for the second argument of str:regexpIgnoreCase() function, " +
+                    "required "+ Attribute.Type.STRING+", but found "+attributeExpressionExecutors[1].getReturnType().toString());
+        }
+        if(attributeExpressionExecutors[1] instanceof ConstantExpressionExecutor){
+            isRegexConstant = true;
+            regexConstant = (String) ((ConstantExpressionExecutor) attributeExpressionExecutors[1]).getValue();
+            patternConstant = Pattern.compile(regexConstant, Pattern.CASE_INSENSITIVE);
+        }
+    }
+
+    @Override
+    protected Object execute(Object[] data) {
+        String regex;
+        Pattern pattern;
+        Matcher matcher;
+
+        if (data[0] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. First argument cannot be null");
+        }
+        if (data[1] == null) {
+            throw new ExecutionPlanRuntimeException("Invalid input given to str:regexpIgnoreCase() function. Second argument cannot be null");
+        }
+        String source = (String) data[0];
+
+        if(!isRegexConstant){
+            regex = (String) data[1];
+            pattern = Pattern.compile(regex, Pattern.CASE_INSENSITIVE);
+            matcher = pattern.matcher(source);
+            return matcher.matches();
+
+        } else {
+            matcher = patternConstant.matcher(source);
+            return matcher.matches();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
new file mode 100644
index 0000000..4f0f4b3
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutor.java
@@ -0,0 +1,412 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.executor;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.config.AbstractPolicyDefinition;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.JavaStormStreamExecutor2;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.metric.CountingMetric;
+import org.apache.eagle.metric.Metric;
+import org.apache.eagle.metric.report.EagleSerivceMetricReport;
+import com.sun.jersey.client.impl.CopyOnWriteHashMap;
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.policy.*;
+import org.apache.eagle.alert.siddhi.EagleAlertContext;
+import org.apache.eagle.alert.siddhi.SiddhiAlertHandler;
+import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.commons.lang3.time.DateUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.management.ManagementFactory;
+import java.util.HashMap;
+import java.util.List;
+import java.util.ArrayList;
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.ConcurrentHashMap;
+
+public class AlertExecutor extends JavaStormStreamExecutor2<String, AlertAPIEntity> implements PolicyLifecycleMethods, SiddhiAlertHandler {
+	private static final long serialVersionUID = 1L;
+
+	private static final Logger LOG = LoggerFactory.getLogger(AlertExecutor.class);
+
+	private String alertExecutorId;
+	private volatile CopyOnWriteHashMap<String, PolicyEvaluator> policyEvaluators;
+	private PolicyPartitioner partitioner;
+	private int numPartitions;
+	private int partitionSeq;
+	private Config config;
+	private Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs;
+	private AlertDefinitionDAO alertDefinitionDao;
+	private String[] sourceStreams;
+	private static String EAGLE_EVENT_COUNT = "eagle.event.count";
+	private static String EAGLE_POLICY_EVAL_COUNT = "eagle.policy.eval.count";
+	private static String EAGLE_POLICY_EVAL_FAIL_COUNT = "eagle.policy.eval.fail.count";
+	private static String EAGLE_ALERT_COUNT = "eagle.alert.count";
+	private static String EAGLE_ALERT_FAIL_COUNT = "eagle.alert.fail.count";
+	private	 static long MERITE_GRANULARITY = DateUtils.MILLIS_PER_MINUTE;
+	private Map<String, Metric> metricMap; // metricMap's key = metricName[#policyId]
+	private Map<String, Map<String, String>> dimensionsMap; // cache it for performance
+	private Map<String, String> baseDimensions;
+	private Thread metricReportThread;
+	private EagleSerivceMetricReport metricReport;
+
+	public AlertExecutor(String alertExecutorId, PolicyPartitioner partitioner, int numPartitions, int partitionSeq,
+                         AlertDefinitionDAO alertDefinitionDao, String[] sourceStreams){
+		this.alertExecutorId = alertExecutorId;
+		this.partitioner = partitioner;
+		this.numPartitions = numPartitions;
+		this.partitionSeq = partitionSeq;
+		this.alertDefinitionDao = alertDefinitionDao;
+		this.sourceStreams = sourceStreams;
+	}
+	
+	public String getAlertExecutorId(){
+		return this.alertExecutorId;
+	}
+	
+	public int getNumPartitions() {
+		return this.numPartitions;
+	}
+	
+	public int getPartitionSeq(){
+		return this.partitionSeq;
+	}
+	
+	public PolicyPartitioner getPolicyPartitioner() {
+		return this.partitioner;
+	}
+	
+	public Map<String, Map<String, AlertDefinitionAPIEntity>> getInitialAlertDefs() {
+		return this.initialAlertDefs;
+	}
+		
+	public AlertDefinitionDAO getAlertDefinitionDao() {
+		return alertDefinitionDao;
+	}
+	
+	@Override
+	public void prepareConfig(Config config) {
+		this.config = config;
+	}
+	
+	public void initMetricReportor() {
+		String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+
+		String username = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) ?
+				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.USERNAME) : null;
+		String password = config.hasPath(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) ?
+				          config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PASSWORD) : null;
+		this.metricReport = new EagleSerivceMetricReport(eagleServiceHost, eagleServicePort, username, password);
+
+		metricMap = new ConcurrentHashMap<String, Metric>();
+		baseDimensions = new HashMap<String, String>();
+		baseDimensions.put(AlertConstants.ALERT_EXECUTOR_ID, alertExecutorId);
+		baseDimensions.put(AlertConstants.PARTITIONSEQ, String.valueOf(partitionSeq));
+		baseDimensions.put(AlertConstants.SOURCE, ManagementFactory.getRuntimeMXBean().getName());
+		baseDimensions.put(EagleConfigConstants.DATA_SOURCE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE));
+		baseDimensions.put(EagleConfigConstants.SITE, config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE));
+
+		dimensionsMap = new HashMap<String, Map<String, String>>();
+		this.metricReportThread = new Thread() {
+			@Override
+			public void run() {
+				runMetricReporter();
+			}
+		};
+		this.metricReportThread.setDaemon(true);
+		metricReportThread.start();
+	}
+
+	@Override
+	public void init() {
+		// initialize StreamMetadataManager before it is used
+		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAOImpl(config));
+		// for each AlertDefinition, to create a PolicyEvaluator
+		Map<String, PolicyEvaluator> tmpPolicyEvaluators = new HashMap<String, PolicyEvaluator>();
+		
+        String site = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.SITE);
+		String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+		try {
+			initialAlertDefs = alertDefinitionDao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+		}
+		catch (Exception ex) {
+			LOG.error("fail to initialize initialAlertDefs: ", ex);
+            throw new IllegalStateException("fail to initialize initialAlertDefs: ", ex);
+		}
+        if(initialAlertDefs == null || initialAlertDefs.isEmpty()){
+            LOG.warn("No alert definitions was found for site: " + site + ", dataSource: " + dataSource);
+        }
+        else if (initialAlertDefs.get(alertExecutorId) != null) { 
+			for(AlertDefinitionAPIEntity alertDef : initialAlertDefs.get(alertExecutorId).values()){
+				int part = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
+				if (part == partitionSeq) {
+					tmpPolicyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), createPolicyEvaluator(alertDef));
+				}
+			}
+		}
+		
+		policyEvaluators = new CopyOnWriteHashMap<>();
+		// for efficency, we don't put single policy evaluator 
+		policyEvaluators.putAll(tmpPolicyEvaluators);
+		DynamicPolicyLoader policyLoader = DynamicPolicyLoader.getInstance();
+		
+		policyLoader.init(initialAlertDefs, alertDefinitionDao, config);
+		policyLoader.addPolicyChangeListener(alertExecutorId + "_" + partitionSeq, this);
+		LOG.info("Alert Executor created, partitionSeq: " + partitionSeq + " , numPartitions: " + numPartitions);
+        LOG.info("All policy evaluators: " + policyEvaluators);
+		
+		initMetricReportor();
+	}
+
+    /**
+     * Create PolicyEvaluator instance according to policyType-mapped policy evaluator class
+     *
+     * @param alertDef alert definition
+     * @return PolicyEvaluator instance
+     */
+	private PolicyEvaluator createPolicyEvaluator(AlertDefinitionAPIEntity alertDef){
+		String policyType = alertDef.getTags().get(AlertConstants.POLICY_TYPE);
+		Class<? extends PolicyEvaluator> evalCls = PolicyManager.getInstance().getPolicyEvaluator(policyType);
+		if(evalCls == null){
+			String msg = "No policy evaluator defined for policy type : " + policyType;
+			LOG.error(msg);
+			throw new IllegalStateException(msg);
+		}
+		
+		// check out whether strong incoming data validation is necessary
+        String needValidationConfigKey= AlertConstants.ALERT_EXECUTOR_CONFIGS + "." + alertExecutorId + ".needValidation";
+
+        // Default: true
+        boolean needValidation = !config.hasPath(needValidationConfigKey) || config.getBoolean(needValidationConfigKey);
+
+		AbstractPolicyDefinition policyDef = null;
+		try {
+			policyDef = JsonSerDeserUtils.deserialize(alertDef.getPolicyDef(), AbstractPolicyDefinition.class, PolicyManager.getInstance().getPolicyModules(policyType));
+		} catch (Exception ex) {
+			LOG.error("Fail initial alert policy def: "+alertDef.getPolicyDef(), ex);
+		}
+		PolicyEvaluator pe;
+		try{
+            // Create evaluator instances
+			pe = evalCls.getConstructor(Config.class, String.class, AbstractPolicyDefinition.class, String[].class, boolean.class).newInstance(config, alertDef.getTags().get("policyId"), policyDef, sourceStreams, needValidation);
+		}catch(Exception ex){
+			LOG.error("Fail creating new policyEvaluator", ex);
+			LOG.warn("Broken policy definition and stop running : " + alertDef.getPolicyDef());
+			throw new IllegalStateException(ex);
+		}
+		return pe;
+	}
+
+    /**
+     * verify both alertExecutor logic name and partition id
+     * @param alertDef alert definition
+     *
+     * @return whether accept the alert definition
+     */
+	private boolean accept(AlertDefinitionAPIEntity alertDef){
+        if(!alertDef.getTags().get("alertExecutorId").equals(alertExecutorId)) {
+            if(LOG.isDebugEnabled()){
+                LOG.debug("alertDef does not belong to this alertExecutorId : " + alertExecutorId + ", alertDef : " + alertDef);
+            }
+            return false;
+        }
+		int targetPartitionSeq = partitioner.partition(numPartitions, alertDef.getTags().get(AlertConstants.POLICY_TYPE), alertDef.getTags().get(AlertConstants.POLICY_ID));
+		if(targetPartitionSeq == partitionSeq)
+			return true;
+		return false;
+	}
+	
+	public long trim(long value, long granularity) {
+		return value / granularity * granularity;
+	}
+
+	public void runMetricReporter() {
+		while(true) {
+			try {
+				long current = System.currentTimeMillis();
+				List<Metric> metricList = new ArrayList<Metric>();
+				synchronized (this.metricMap) {
+					for (Entry<String, Metric> entry : metricMap.entrySet()) {
+						String name = entry.getKey();
+						Metric metric = entry.getValue();
+						long previous = metric.getTimestamp();
+						if (current > previous + MERITE_GRANULARITY) {
+							metricList.add(metric);
+							metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), metric.getDemensions(), metric.getMetricName()));
+						}
+					}
+				}
+				if (metricList.size() > 0) {
+					LOG.info("Going to persist alert metrics, size: " + metricList.size());
+					metricReport.emit(metricList);
+				}
+				try {
+					Thread.sleep(MERITE_GRANULARITY / 2);
+					} catch (InterruptedException ex) { /* Do nothing */ }
+				}
+			catch (Throwable t) {
+				LOG.error("Got a throwable in metricReporter " , t);
+			}
+		}
+	}
+
+	public void updateCounter(String name, Map<String, String> dimensions, double value) {
+		long current = System.currentTimeMillis();
+		synchronized (metricMap) {
+			if (metricMap.get(name) == null) {
+				String metricName = name.split("#")[0];
+				metricMap.put(name, new CountingMetric(trim(current, MERITE_GRANULARITY), dimensions, metricName));
+			}
+			metricMap.get(name).update(value);
+		}
+	}
+	
+	public void updateCounter(String name, Map<String, String> dimensions) {
+		updateCounter(name, dimensions, 1.0);
+	}
+	
+	public Map<String, String> getDimensions(String policyId) {
+		if (dimensionsMap.get(policyId) == null) {
+			Map<String, String> newDimensions = new HashMap<String, String>(baseDimensions);
+			newDimensions.put(AlertConstants.POLICY_ID, policyId);
+			dimensionsMap.put(policyId, newDimensions);
+		}
+		return dimensionsMap.get(policyId);
+	}
+	
+	public String getMetricKey(String metricName, String policy) {
+		return metricName + "#" + policy;
+	}
+
+    /**
+     * within this single executor, execute all PolicyEvaluator sequentially
+     * the contract for input:
+     * 1. total # of fields for input is 3, which is fixed
+     * 2. the first field is key
+     * 3. the second field is stream name
+     * 4. the third field is value which is java SortedMap
+     */
+    @Override
+    public void flatMap(java.util.List<Object> input, Collector<Tuple2<String, AlertAPIEntity>> outputCollector){
+        if(input.size() != 3)
+            throw new IllegalStateException("AlertExecutor always consumes exactly 3 fields: key, stream name and value(SortedMap)");
+        if(LOG.isDebugEnabled()) LOG.debug("Msg is coming " + input.get(2));
+        if(LOG.isDebugEnabled()) LOG.debug("Current policyEvaluators: " + policyEvaluators.keySet().toString());
+
+        updateCounter(EAGLE_EVENT_COUNT, baseDimensions);
+        try{
+            synchronized(this.policyEvaluators) {
+                for(Entry<String, PolicyEvaluator> entry : policyEvaluators.entrySet()){
+                    String policyId = entry.getKey();
+                    PolicyEvaluator evaluator = entry.getValue();
+                    updateCounter(getMetricKey(EAGLE_POLICY_EVAL_COUNT, policyId), getDimensions(policyId));
+                    try {
+                        EagleAlertContext siddhiAlertContext = new EagleAlertContext();
+                        siddhiAlertContext.alertExecutor = this;
+                        siddhiAlertContext.policyId = policyId;
+                        siddhiAlertContext.evaluator = evaluator;
+                        siddhiAlertContext.outputCollector = outputCollector;
+                        evaluator.evaluate(new ValuesArray(siddhiAlertContext, input.get(1), input.get(2)));
+                    }
+                    catch (Exception ex) {
+                        LOG.error("Got an exception, but continue to run " + input.get(2).toString(), ex);
+                        updateCounter(getMetricKey(EAGLE_POLICY_EVAL_FAIL_COUNT, policyId), getDimensions(policyId));
+                    }
+                }
+            }
+        } catch(Exception ex){
+            LOG.error(alertExecutorId + ", partition " + partitionSeq + ", error fetching alerts, but continue to run", ex);
+            updateCounter(EAGLE_ALERT_FAIL_COUNT, baseDimensions);
+        }
+    }
+
+	@Override
+	public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
+		if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy added : " + added + " policyEvaluators " + policyEvaluators);
+		for(AlertDefinitionAPIEntity alertDef : added.values()){
+			if(!accept(alertDef))
+				continue;
+			LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really added " + alertDef);
+			PolicyEvaluator newEvaluator = createPolicyEvaluator(alertDef);
+			if(newEvaluator != null){
+				synchronized(this.policyEvaluators) {
+					policyEvaluators.put(alertDef.getTags().get(AlertConstants.POLICY_ID), newEvaluator);
+				}
+			}
+		}
+	}
+
+	@Override
+	public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
+		if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy changed : " + changed);
+		for(AlertDefinitionAPIEntity alertDef : changed.values()){
+			if(!accept(alertDef))
+				continue;
+			LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really changed " + alertDef);
+			synchronized(this.policyEvaluators) {
+				PolicyEvaluator pe = policyEvaluators.get(alertDef.getTags().get(AlertConstants.POLICY_ID));
+				pe.onPolicyUpdate(alertDef);
+			}
+		}
+	}
+
+	@Override
+	public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
+		if(LOG.isDebugEnabled()) LOG.debug(alertExecutorId + ", partition " + partitionSeq + " policy deleted : " + deleted);
+		for(AlertDefinitionAPIEntity alertDef : deleted.values()){
+			if(!accept(alertDef))
+				continue;
+			LOG.info(alertExecutorId + ", partition " + partitionSeq + " policy really deleted " + alertDef);
+			String policyId = alertDef.getTags().get(AlertConstants.POLICY_ID);
+			synchronized(this.policyEvaluators) {			
+				if (policyEvaluators.containsKey(policyId)) {
+					PolicyEvaluator pe = policyEvaluators.remove(alertDef.getTags().get(AlertConstants.POLICY_ID));
+					pe.onPolicyDelete();
+				}
+			}
+		}
+	}
+
+	@Override
+	public void onAlerts(EagleAlertContext context, List<AlertAPIEntity> alerts) {
+		if(alerts != null && !alerts.isEmpty()){
+			String policyId = context.policyId;
+            LOG.info(String.format("Detected %s alerts for policy %s",alerts.size(),policyId));
+			Collector outputCollector = context.outputCollector;
+			PolicyEvaluator evaluator = context.evaluator;
+			updateCounter(getMetricKey(EAGLE_ALERT_COUNT, policyId), getDimensions(policyId), alerts.size());
+			for (AlertAPIEntity entity : alerts) {
+				synchronized(this) {
+					outputCollector.collect(new Tuple2(policyId, entity));
+				}
+				if(LOG.isDebugEnabled()) LOG.debug("A new alert is triggered: "+alertExecutorId + ", partition " + partitionSeq + ", Got an alert with output context: " + entity.getAlertContext() + ", for policy " + evaluator);
+			}
+		}
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
new file mode 100644
index 0000000..c073939
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/java/org/apache/eagle/executor/AlertExecutorCreationUtils.java
@@ -0,0 +1,133 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.executor;
+
+import org.apache.eagle.alert.common.AlertConstants;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
+import org.apache.eagle.alert.dao.AlertExecutorDAOImpl;
+import org.apache.eagle.alert.entity.AlertExecutorEntity;
+import org.apache.eagle.alert.policy.DefaultPolicyPartitioner;
+import org.apache.eagle.alert.policy.PolicyPartitioner;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigValue;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+
+/**
+ * Create alert executors and provide callback for programmer to link alert executor to immediate parent executors
+ *
+ * <br/><br/>
+ * Explanations for programId, alertExecutorId and policy<br/><br/>
+ * - programId - distributed or single-process program for example one storm topology<br/>
+ * - alertExecutorId - one process/thread which executes multiple policies<br/>
+ * - policy - some rules to be evaluated<br/>
+ *
+ * <br/>
+ *
+ * Normally the mapping is like following:
+ * <pre>
+ * programId (1:N) alertExecutorId
+ * alertExecutorId (1:N) policy
+ * </pre>
+ */
+public class AlertExecutorCreationUtils {
+	private final static Logger LOG = LoggerFactory.getLogger(AlertExecutorCreationUtils.class);
+
+    public static AlertExecutor[] createAlertExecutors(Config config, String alertExecutorId) throws Exception{
+        // Read site and dataSource from configuration.
+        String dataSource = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.DATA_SOURCE);
+        LOG.info("Loading alerting definitions for dataSource: " + dataSource);
+
+        // Get map from alertExecutorId to alert stream
+        // (dataSource) => Map[alertExecutorId:String,streamName:List[String]]
+        List<String> streamNames = new ArrayList<String>();
+        AlertExecutorDAOImpl alertExecutorDAO = new AlertExecutorDAOImpl(config);
+        List<AlertExecutorEntity> alertExecutorEntities = alertExecutorDAO.findAlertExecutor(dataSource, alertExecutorId);
+        for(AlertExecutorEntity entity : alertExecutorEntities){
+            streamNames.add(entity.getTags().get(AlertConstants.STREAM_NAME));
+        }
+
+        if(streamNames.isEmpty()){
+            throw new IllegalStateException("upstream names should not be empty for alert " + alertExecutorId);
+        }
+        return createAlertExecutors(config, new AlertDefinitionDAOImpl(config),
+                streamNames, alertExecutorId);
+    }
+
+    /**
+     * Build DAG Tasks based on persisted alert definition and schemas from eagle store.
+     *
+     * <h3>Require configuration:</h3>
+     *
+     * <ul>
+     * <li>eagleProps.site: program site id.</li>
+     * <li>eagleProps.dataSource: program data source.</li>
+     * <li>alertExecutorConfigs: only configured executor will be built into execution tasks.</li>
+     * </ul>
+     *
+     * <h3>Steps:</h3>
+     *
+     * <ol>
+     * <li>(upstreamTasks) => Map[streamName:String,upstreamTask:Task]</li>
+     * <li>(dataSource) => Map[alertExecutorId:String,streamName:List[String]]</li>
+     * <li>(site,dataSource) => Map[alertExecutorId,Map[policyId,alertDefinition]]</li>
+     * <li>(config["alertExecutorConfigs"]) => AlertExecutor(alertExecutorID, partitioner, numPartitions, partitionSeq, alertDefs, alertDefDAO, sourceStreams)[]</li>
+     * </ol>
+     */
+	public static AlertExecutor[] createAlertExecutors(Config config, AlertDefinitionDAO alertDefDAO,
+			List<String> streamNames, String alertExecutorId) throws Exception{
+		// Read `alertExecutorConfigs` from configuration and get config for this alertExecutorId
+        int numPartitions =1;
+        String partitionerCls = DefaultPolicyPartitioner.class.getCanonicalName();
+        String alertExecutorConfigsKey = "alertExecutorConfigs";
+        if(config.hasPath(alertExecutorConfigsKey)) {
+            Map<String, ConfigValue> alertExecutorConfigs = config.getObject(alertExecutorConfigsKey);
+            if(alertExecutorConfigs !=null && alertExecutorConfigs.containsKey(alertExecutorConfigs)) {
+                Map<String, Object> alertExecutorConfig = (Map<String, Object>) alertExecutorConfigs.get(alertExecutorId).unwrapped();
+                int parts = 0;
+                if(alertExecutorConfig.containsKey("parallelism")) parts = (int) (alertExecutorConfig.get("parallelism"));
+                numPartitions = parts == 0 ? 1 : parts;
+                if(alertExecutorConfig.containsKey("partitioner")) partitionerCls = (String) alertExecutorConfig.get("partitioner");
+            }
+        }
+
+        return createAlertExecutors(alertDefDAO, streamNames, alertExecutorId, numPartitions, partitionerCls);
+	}
+
+    /**
+     * Build alert executors and assign alert definitions between these executors by partitioner (alertExecutorConfigs["${alertExecutorId}"]["partitioner"])
+     */
+	public static AlertExecutor[] createAlertExecutors(AlertDefinitionDAO alertDefDAO, List<String> sourceStreams,
+                                                          String alertExecutorID, int numPartitions, String partitionerCls) throws Exception{
+		LOG.info("Creating alert executors with alertExecutorID: " + alertExecutorID + ", numPartitions: " + numPartitions + ", Partition class is: "+ partitionerCls);
+
+		PolicyPartitioner partitioner = (PolicyPartitioner)Class.forName(partitionerCls).newInstance();
+		AlertExecutor[] alertExecutors = new AlertExecutor[numPartitions];
+        String[] _sourceStreams = sourceStreams.toArray(new String[0]);
+
+		for(int i = 0; i < numPartitions; i++){
+			alertExecutors[i] = new AlertExecutor(alertExecutorID, partitioner, numPartitions, i, alertDefDAO,_sourceStreams);
+		}	
+		return alertExecutors;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider
deleted file mode 100644
index 887047a..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/eagle.alert.policy.PolicyEvaluatorServiceProvider
+++ /dev/null
@@ -1 +0,0 @@
-eagle.alert.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
new file mode 100644
index 0000000..c683983
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/META-INF/services/org.apache.eagle.alert.policy.PolicyEvaluatorServiceProvider
@@ -0,0 +1 @@
+org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluatorServiceProviderImpl
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext
index 1c19f50..4d5e237 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/main/resources/str.siddhiext
@@ -30,6 +30,6 @@ trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
 upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
 hex=org.wso2.siddhi.extension.string.HexFunctionExtension
 unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-equalsIgnoreCase=eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension
+equalsIgnoreCase=org.apache.eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
+containsIgnoreCase=org.apache.eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
+regexpIgnoreCase=org.apache.eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java
deleted file mode 100644
index e6cfd9f..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/cep/TestSiddhiEvaluator.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.cep;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.dao.AlertDefinitionDAOImpl;
-import eagle.alert.dao.AlertStreamSchemaDAO;
-import eagle.alert.dao.AlertStreamSchemaDAOImpl;
-import eagle.alert.entity.AlertAPIEntity;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import eagle.alert.siddhi.EagleAlertContext;
-import eagle.alert.siddhi.SiddhiPolicyDefinition;
-import eagle.alert.siddhi.SiddhiPolicyEvaluator;
-import eagle.alert.siddhi.StreamMetadataManager;
-import eagle.dataproc.core.ValuesArray;
-import eagle.datastream.Collector;
-import eagle.datastream.Tuple2;
-import eagle.executor.AlertExecutor;
-import junit.framework.Assert;
-import org.junit.Test;
-
-import java.util.*;
-
-public class TestSiddhiEvaluator {
-
-	int alertCount = 0;
-
-	public AlertStreamSchemaEntity createStreamMetaEntity(String attrName, String type) {
-		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("dataSource", "hdfsAuditLog");
-		tags.put("streamName", "hdfsAuditLogEventStream");
-		tags.put("attrName", attrName);
-		entity.setTags(tags);
-		entity.setAttrType(type);
-		return entity;
-	}
-
-	@Test
-	public void test() throws Exception{
-        Config config = ConfigFactory.load("unittest.conf");
-		AlertStreamSchemaDAO streamDao = new AlertStreamSchemaDAOImpl(null, null) {
-			@Override
-			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
-				List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
-				list.add(createStreamMetaEntity("cmd", "string"));
-				list.add(createStreamMetaEntity("dst", "string"));
-				list.add(createStreamMetaEntity("src", "string"));
-				list.add(createStreamMetaEntity("host", "string"));
-				list.add(createStreamMetaEntity("user", "string"));
-				list.add(createStreamMetaEntity("timestamp", "long"));
-				list.add(createStreamMetaEntity("securityZone", "string"));
-				list.add(createStreamMetaEntity("sensitivityType", "string"));
-				list.add(createStreamMetaEntity("allowed", "string"));
-				return list;
-			}
-		};
-		StreamMetadataManager.getInstance().init(config, streamDao);
-
-		Map<String, Object> data1 =  new TreeMap<String, Object>(){{
-			put("cmd", "open");
-			put("dst", "");
-			put("src", "");
-			put("host", "");
-			put("user", "");
-			put("timestamp", String.valueOf(System.currentTimeMillis()));
-			put("securityZone", "");
-			put("sensitivityType", "");
-			put("allowed", "true");
-		}};
-        final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition();
-        policyDef.setType("SiddhiCEPEngine");
-        String expression = "from hdfsAuditLogEventStream[cmd=='open'] " +
-							"select * " +
-							"insert into outputStream ;";
-        policyDef.setExpression(expression);
-        SiddhiPolicyEvaluator evaluator = new SiddhiPolicyEvaluator(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"});
-		EagleAlertContext context = new EagleAlertContext();
-
-		AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(null, null) {
-			@Override
-			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
-				return null;
-			}
-		};
-
-		context.alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) {
-			@Override
-			public Map<String, String> getDimensions(String policyId) {
-				return new HashMap<String, String>();
-			}
-
-			@Override
-			public void runMetricReporter() {}
-		};
-		context.alertExecutor.prepareConfig(config);
-		context.alertExecutor.init();
-		context.evaluator = evaluator;
-		context.policyId = "testPolicy";
-		context.outputCollector = new Collector<Tuple2<String, AlertAPIEntity>> () {
-			@Override
-			public void collect(Tuple2<String, AlertAPIEntity> stringAlertAPIEntityTuple2) {
-				alertCount++;
-			}
-		};
-		evaluator.evaluate(new ValuesArray(context, "hdfsAuditLogEventStream", data1));
-		Thread.sleep(2 * 1000);
-		Assert.assertEquals(alertCount, 1);
-		StreamMetadataManager.getInstance().reset();
-	}
-}