You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:51 UTC

[44/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46] Rename package name as "org.apache.eagle"

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java
deleted file mode 100644
index baf0612..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import eagle.dataproc.core.JsonSerDeserUtils;
-
-public class TestAlertDedup {
-
-	@Test
-	public void test() throws Exception{
-		String alertDef = "{\"alertDedupIntervalMin\":\"720\",\"emailDedupIntervalMin\":\"1440\"}";
-		DeduplicatorConfig dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
-		Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 720);
-		Assert.assertEquals(dedupConfig.getEmailDedupIntervalMin(), 1440);
-		
-		alertDef = "null";
-		dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
-		Assert.assertEquals(dedupConfig, null);
-	}
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
deleted file mode 100644
index aae3afb..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.common.config.EagleConfigConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestAlertDefinitionDAOImpl {
-
-	public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, String programId, String alertExecutorId, String policyId, String policyType) {
-		AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-		entity.setEnabled(true);
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("site", site);
-		tags.put("programId", programId);
-		tags.put("alertExecutorId", alertExecutorId);
-		tags.put("policyId", policyId);
-		tags.put("policyType", policyType);
-		entity.setTags(tags);
-		return entity;
-	}
-	
-	@Test
-	public void test() throws Exception{
-		Config config = ConfigFactory.load();
-		String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
-		int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
-		String site = "sandbox";
-		String dataSource = "UnitTest";
-		AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(eagleServiceHost, eagleServicePort) {
-			@Override
-			public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) throws Exception {
-				List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC"));
-				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD"));
-				return list;
-			}
-		};
-
-		Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
-		
-		Assert.assertEquals(2, retMap.size());
-		Assert.assertEquals(2, retMap.get("TestExecutor1").size());
-		Assert.assertEquals(2, retMap.get("TestExecutor2").size());
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
deleted file mode 100644
index 936313a..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dao;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import eagle.alert.siddhi.SiddhiStreamMetadataUtils;
-import eagle.alert.siddhi.StreamMetadataManager;
-import org.junit.Test;
-
-public class TestSiddhiStreamMetadataUtils {
-	@Test
-	public void test() throws Exception{
-        Config config = ConfigFactory.load();
-		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO(){
-			@Override
-			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(
-                    String dataSource) {
-				return Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"),
-						generateStreamMetadataAPIEntity("attrName2", "LONG")
-						);
-			}
-		});
-		String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
-		Assert.assertEquals("define stream " + "testStreamName" + "(eagleAlertContext object,attrName1 string,attrName2 long);", siddhiStreamDef);
-		StreamMetadataManager.getInstance().reset();
-	}
-	
-	private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){
-		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-		entity.setTags(new HashMap<String, String>(){{
-			put("programId", "testProgramId");
-			put("streamName", "testStreamName");
-			put("attrName", attrName);
-		}});
-		entity.setAttrType(attrType);
-		return entity;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
deleted file mode 100644
index 2ff978d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import eagle.alert.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestStreamDefinitionDAOImpl {
-	
-	public AlertStreamSchemaEntity buildTestStreamDefEntity(String programId, String streamName, String attrName) {
-		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-		entity.setAttrType("String");
-		entity.setAttrValueResolver("DefaultAttrValueResolver");
-		entity.setCategory("SimpleType");
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("programId", programId);
-		tags.put("streamName", streamName);
-		tags.put("attrName", attrName);
-		entity.setTags(tags);
-		return entity;
-	}
-	
-	@Test
-	public void test() throws Exception{
-        Config config = ConfigFactory.load();
-		AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, null) {
-			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
-				List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
-				String programId = "UnitTest";
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr1"));
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr2"));
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr3"));
-				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr4"));
-				return list;
-			}
-		};
-
-		StreamMetadataManager.getInstance().init(config, dao);
-		Map<String, List<AlertStreamSchemaEntity>> retMap = StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams();
-		Assert.assertTrue(retMap.get("TestStream").size() == 4);
-		StreamMetadataManager.getInstance().reset();
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java
deleted file mode 100644
index fede802..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import junit.framework.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestDynamicPolicyLoader {
-	private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
-
-	@Test
-	public void test() throws Exception{
-		System.setProperty("config.resource", "/unittest.conf");
-		Config config = ConfigFactory.load();
-		Map<String, PolicyLifecycleMethods> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods>();
-		policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods() {
-			@Override
-			public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
-				LOG.info("deleted : " + deleted);
-			}
-			
-			@Override
-			public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
-				Assert.assertTrue(added.size() == 1);
-				LOG.info("added : " + added);
-			}
-			
-			@Override
-			public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
-				Assert.assertTrue(changed.size() == 1);
-				LOG.info("changed :" + changed);
-			}
-		});
-		
-		Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-		initialAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
-		Map<String, AlertDefinitionAPIEntity> map = initialAlertDefs.get("testAlertExecutorId");
-		map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
-		map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
-		
-		AlertDefinitionDAO dao = new AlertDefinitionDAO() {
-			@Override
-			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(
-					String site, String dataSource) {
-				Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-				currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
-				Map<String, AlertDefinitionAPIEntity> map = currentAlertDefs.get("testAlertExecutorId");
-				map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1_1"));
-				map.put("policyId_2", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_2", "siddhi", "policyDef_2"));
-				return currentAlertDefs;
-			}
-			
-			@Override
-			public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) {
-				return null;
-			}
-		};
-		
-		DynamicPolicyLoader loader = DynamicPolicyLoader.getInstance();
-		loader.init(initialAlertDefs, dao, config);
-		
-		try{
-			Thread.sleep(5000);
-		}catch(Exception ex){
-			
-		}
-	}
-	
-	public AlertDefinitionAPIEntity buildTestAlertDefEntity(String programId, String alertExecutorId, String policyId, String policyType, String policyDef) {
-		AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-		entity.setEnabled(true);
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("programId", programId);
-		tags.put("alertExecutorId", alertExecutorId);
-		tags.put("policyId", policyId);
-		tags.put("policyType", policyType);
-		entity.setTags(tags);
-		entity.setPolicyDef(policyDef);
-		return entity;
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java
deleted file mode 100644
index 96ec8f4..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import org.junit.Test;
-
-public class TestPolicyDistribution {
-    @Test
-    public void test(){
-        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java
deleted file mode 100644
index 4eb0c61..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.lang.reflect.Field;
-import java.util.List;
-
-import eagle.executor.AlertExecutor;
-import junit.framework.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.expression.Expression;
-import org.wso2.siddhi.query.api.expression.Variable;
-
-public class TestSiddhiEngine {
-    static final Logger log = LoggerFactory.getLogger(TestSiddhiEngine.class);
-    int alertCount = 0;
-
-    @Test
-    public void TestStrContains() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:contains(dst,'/user/hdfs/.Trash/Current/tmp/pii')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/tmp/pii"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestRegexp() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (str string, other string, num double) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select str as str1, other as other1 , num as num1, count(num) as number " +
-                "having str:regexp(str1, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"/usr/data/000/001/002", "other", 1.0});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestStrEqualsIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:equalsIgnoreCase(dst,'/user/hdfs/.TRAsh/current/TMP/PII')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        Field field = QueryCallback.class.getDeclaredField("query");
-        field.setAccessible(true);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash/Current/TMP/pii"}); // match case
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash///Current/TMP/pii"}); //non-match case
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestStrContainsIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:containsIgnoreCase(dst,'.TRASH/CURRENT/tMp/pII')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        Field field = QueryCallback.class.getDeclaredField("query");
-        field.setAccessible(true);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/TMP/pii"}); // match case
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash///Current/TMP/pii"}); //non-match case
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestRegexpIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (str string, other string, num double) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select str as str1, other as other1 , num as num1, count(num) as number " +
-                "having str:regexpIgnoreCase(str1, '/usr/DATA/[0-9]+/[0-9]+/[0-9]+') == true " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"/USR/data/000/001/002", "other", 1.0});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-    
-    @Test
-    public void TestDataObject() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-        
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (dataobj object, str string, first string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select * " +
-                "having str:regexp(str, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-           public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{new AlertExecutor(queryString, null, 0, 1, null, null), "/usr/data/000/001/002", "second"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java
deleted file mode 100644
index 7bac0e4..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import eagle.common.DateTimeUtil;
-
-public class TestSiddhiSlideWindow {
-
-    int alertCount = 0;
-
-    @Test
-    public void testSlideWindow1() throws Exception{
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream eventStream (user string, path string, cmd string);";
-//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.time(1 sec)"
-//        		     + " select user, path, cmd, count(path) as cnt" 
-//        			 + " group by user"
-//        			 + " having cnt > 3 insert all events into outputStream;";
-
-//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.length(10)"
-//        		+ " select user, path, cmd, count(path) as cnt" 
-//        		+ " group by user"
-//        		+ " having cnt > 3 insert all events into outputStream;";
-
-//      String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.timeBatch(1 sec)"
-//				+ " select user, path, cmd, count(path) as cnt" 
-//				+ " group by user"
-//				+ " having cnt > 3 insert all events into outputStream;";
-
-        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.lengthBatch(10)"
-                + " select user, path, cmd, count(path) as cnt"
-                + " group by user"
-                + " having cnt > 3 insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/1111", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/2222", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/3333", "open"});
-
-        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/1111", "open"});
-
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/1111", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/2222", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 0);
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/3333", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void testSlideWindow2() throws Exception{
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream eventStream (timeStamp long, user string, path string, cmd string);";
-        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.externalTime(timeStamp,1 sec)"
-                + " select user, path, cmd, count(path) as cnt"
-                + " group by user"
-                + " having cnt > 3 insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/0000", "open"});
-        Thread.sleep(1100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/1111", "open"});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/2222", "open"});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/3333", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/5555", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 2);
-        executionPlanRuntime.shutdown();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java
deleted file mode 100644
index 9ad1654..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSiddhiStream {
-	
-	@Test
-	public void test() {
-		String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,sensitivityType insert into outputStream;";
-		Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
-		
-		rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select    * insert into outputStream;";
-		Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));		
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
new file mode 100644
index 0000000..76c43ce
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.cep;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.siddhi.EagleAlertContext;
+import org.apache.eagle.alert.siddhi.SiddhiPolicyDefinition;
+import org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluator;
+import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.executor.AlertExecutor;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class TestSiddhiEvaluator {
+
+	int alertCount = 0;
+
+	public AlertStreamSchemaEntity createStreamMetaEntity(String attrName, String type) {
+		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
+		Map<String, String> tags = new HashMap<String, String>();
+		tags.put("dataSource", "hdfsAuditLog");
+		tags.put("streamName", "hdfsAuditLogEventStream");
+		tags.put("attrName", attrName);
+		entity.setTags(tags);
+		entity.setAttrType(type);
+		return entity;
+	}
+
+	@Test
+	public void test() throws Exception{
+        Config config = ConfigFactory.load("unittest.conf");
+		AlertStreamSchemaDAO streamDao = new AlertStreamSchemaDAOImpl(null, null) {
+			@Override
+			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
+				List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
+				list.add(createStreamMetaEntity("cmd", "string"));
+				list.add(createStreamMetaEntity("dst", "string"));
+				list.add(createStreamMetaEntity("src", "string"));
+				list.add(createStreamMetaEntity("host", "string"));
+				list.add(createStreamMetaEntity("user", "string"));
+				list.add(createStreamMetaEntity("timestamp", "long"));
+				list.add(createStreamMetaEntity("securityZone", "string"));
+				list.add(createStreamMetaEntity("sensitivityType", "string"));
+				list.add(createStreamMetaEntity("allowed", "string"));
+				return list;
+			}
+		};
+		StreamMetadataManager.getInstance().init(config, streamDao);
+
+		Map<String, Object> data1 =  new TreeMap<String, Object>(){{
+			put("cmd", "open");
+			put("dst", "");
+			put("src", "");
+			put("host", "");
+			put("user", "");
+			put("timestamp", String.valueOf(System.currentTimeMillis()));
+			put("securityZone", "");
+			put("sensitivityType", "");
+			put("allowed", "true");
+		}};
+        final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition();
+        policyDef.setType("SiddhiCEPEngine");
+        String expression = "from hdfsAuditLogEventStream[cmd=='open'] " +
+							"select * " +
+							"insert into outputStream ;";
+        policyDef.setExpression(expression);
+        SiddhiPolicyEvaluator evaluator = new SiddhiPolicyEvaluator(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"});
+		EagleAlertContext context = new EagleAlertContext();
+
+		AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(null, null) {
+			@Override
+			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
+				return null;
+			}
+		};
+
+		context.alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) {
+			@Override
+			public Map<String, String> getDimensions(String policyId) {
+				return new HashMap<String, String>();
+			}
+
+			@Override
+			public void runMetricReporter() {}
+		};
+		context.alertExecutor.prepareConfig(config);
+		context.alertExecutor.init();
+		context.evaluator = evaluator;
+		context.policyId = "testPolicy";
+		context.outputCollector = new Collector<Tuple2<String, AlertAPIEntity>> () {
+			@Override
+			public void collect(Tuple2<String, AlertAPIEntity> stringAlertAPIEntityTuple2) {
+				alertCount++;
+			}
+		};
+		evaluator.evaluate(new ValuesArray(context, "hdfsAuditLogEventStream", data1));
+		Thread.sleep(2 * 1000);
+		Assert.assertEquals(alertCount, 1);
+		StreamMetadataManager.getInstance().reset();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
new file mode 100644
index 0000000..4295bdc
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+
+public class TestAlertDedup {
+
+	@Test
+	public void test() throws Exception{
+		String alertDef = "{\"alertDedupIntervalMin\":\"720\",\"emailDedupIntervalMin\":\"1440\"}";
+		DeduplicatorConfig dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
+		Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 720);
+		Assert.assertEquals(dedupConfig.getEmailDedupIntervalMin(), 1440);
+		
+		alertDef = "null";
+		dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
+		Assert.assertEquals(dedupConfig, null);
+	}
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
new file mode 100644
index 0000000..231c57d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dao;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestAlertDefinitionDAOImpl {
+
+	public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, String programId, String alertExecutorId, String policyId, String policyType) {
+		AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
+		entity.setEnabled(true);
+		Map<String, String> tags = new HashMap<String, String>();
+		tags.put("site", site);
+		tags.put("programId", programId);
+		tags.put("alertExecutorId", alertExecutorId);
+		tags.put("policyId", policyId);
+		tags.put("policyType", policyType);
+		entity.setTags(tags);
+		return entity;
+	}
+	
+	@Test
+	public void test() throws Exception{
+		Config config = ConfigFactory.load();
+		String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+		int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+
+		String site = "sandbox";
+		String dataSource = "UnitTest";
+		AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(eagleServiceHost, eagleServicePort) {
+			@Override
+			public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) throws Exception {
+				List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();
+				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
+				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
+				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC"));
+				list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD"));
+				return list;
+			}
+		};
+
+		Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+		
+		Assert.assertEquals(2, retMap.size());
+		Assert.assertEquals(2, retMap.get("TestExecutor1").size());
+		Assert.assertEquals(2, retMap.get("TestExecutor2").size());
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
new file mode 100644
index 0000000..b7f40fa
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dao;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.siddhi.SiddhiStreamMetadataUtils;
+import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.junit.Test;
+
+public class TestSiddhiStreamMetadataUtils {
+	@Test
+	public void test() throws Exception{
+        Config config = ConfigFactory.load();
+		StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO(){
+			@Override
+			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(
+                    String dataSource) {
+				return Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"),
+						generateStreamMetadataAPIEntity("attrName2", "LONG")
+						);
+			}
+		});
+		String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
+		Assert.assertEquals("define stream " + "testStreamName" + "(eagleAlertContext object,attrName1 string,attrName2 long);", siddhiStreamDef);
+		StreamMetadataManager.getInstance().reset();
+	}
+	
+	private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){
+		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
+		entity.setTags(new HashMap<String, String>(){{
+			put("programId", "testProgramId");
+			put("streamName", "testStreamName");
+			put("attrName", attrName);
+		}});
+		entity.setAttrType(attrType);
+		return entity;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
new file mode 100644
index 0000000..80aacd6
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dao;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestStreamDefinitionDAOImpl {
+	
+	public AlertStreamSchemaEntity buildTestStreamDefEntity(String programId, String streamName, String attrName) {
+		AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
+		entity.setAttrType("String");
+		entity.setAttrValueResolver("DefaultAttrValueResolver");
+		entity.setCategory("SimpleType");
+		Map<String, String> tags = new HashMap<String, String>();
+		tags.put("programId", programId);
+		tags.put("streamName", streamName);
+		tags.put("attrName", attrName);
+		entity.setTags(tags);
+		return entity;
+	}
+	
+	@Test
+	public void test() throws Exception{
+        Config config = ConfigFactory.load();
+		AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, null) {
+			public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
+				List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
+				String programId = "UnitTest";
+				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr1"));
+				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr2"));
+				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr3"));
+				list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr4"));
+				return list;
+			}
+		};
+
+		StreamMetadataManager.getInstance().init(config, dao);
+		Map<String, List<AlertStreamSchemaEntity>> retMap = StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams();
+		Assert.assertTrue(retMap.get("TestStream").size() == 4);
+		StreamMetadataManager.getInstance().reset();
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
new file mode 100644
index 0000000..091a6a6
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestDynamicPolicyLoader {
+	private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
+
+	@Test
+	public void test() throws Exception{
+		System.setProperty("config.resource", "/unittest.conf");
+		Config config = ConfigFactory.load();
+		Map<String, PolicyLifecycleMethods> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods>();
+		policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods() {
+			@Override
+			public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
+				LOG.info("deleted : " + deleted);
+			}
+			
+			@Override
+			public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
+				Assert.assertTrue(added.size() == 1);
+				LOG.info("added : " + added);
+			}
+			
+			@Override
+			public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
+				Assert.assertTrue(changed.size() == 1);
+				LOG.info("changed :" + changed);
+			}
+		});
+		
+		Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
+		initialAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
+		Map<String, AlertDefinitionAPIEntity> map = initialAlertDefs.get("testAlertExecutorId");
+		map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
+		map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
+		
+		AlertDefinitionDAO dao = new AlertDefinitionDAO() {
+			@Override
+			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(
+					String site, String dataSource) {
+				Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
+				currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
+				Map<String, AlertDefinitionAPIEntity> map = currentAlertDefs.get("testAlertExecutorId");
+				map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1_1"));
+				map.put("policyId_2", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_2", "siddhi", "policyDef_2"));
+				return currentAlertDefs;
+			}
+			
+			@Override
+			public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) {
+				return null;
+			}
+		};
+		
+		DynamicPolicyLoader loader = DynamicPolicyLoader.getInstance();
+		loader.init(initialAlertDefs, dao, config);
+		
+		try{
+			Thread.sleep(5000);
+		}catch(Exception ex){
+			
+		}
+	}
+	
+	public AlertDefinitionAPIEntity buildTestAlertDefEntity(String programId, String alertExecutorId, String policyId, String policyType, String policyDef) {
+		AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
+		entity.setEnabled(true);
+		Map<String, String> tags = new HashMap<String, String>();
+		tags.put("programId", programId);
+		tags.put("alertExecutorId", alertExecutorId);
+		tags.put("policyId", policyId);
+		tags.put("policyType", policyType);
+		entity.setTags(tags);
+		entity.setPolicyDef(policyDef);
+		return entity;
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
new file mode 100644
index 0000000..367187d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import org.junit.Test;
+
+public class TestPolicyDistribution {
+    @Test
+    public void test(){
+        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
+        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
+        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
+        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
new file mode 100644
index 0000000..8dcc20b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.lang.reflect.Field;
+
+import org.apache.eagle.executor.AlertExecutor;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+public class TestSiddhiEngine {
+    static final Logger log = LoggerFactory.getLogger(TestSiddhiEngine.class);
+    int alertCount = 0;
+
+    @Test
+    public void TestStrContains() throws Exception {
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "" +
+                "@config(async = 'true') " +
+                "define stream typeStream (cmd string, src string, dst string) ;";
+        String queryString = "" +
+                "@info(name = 'query1') " +
+                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:contains(dst,'/user/hdfs/.Trash/Current/tmp/pii')==true)] " +
+                "select cmd, src, dst " +
+                "insert into outputStream ;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+        QueryCallback callback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+                alertCount++;
+            }
+        };
+        executionPlanRuntime.addCallback("query1", callback);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/tmp/pii"});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        executionPlanRuntime.shutdown();
+    }
+
+    @Test
+    public void TestRegexp() throws Exception {
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "" +
+                "@config(async = 'true') " +
+                "define stream typeStream (str string, other string, num double) ;";
+        String queryString = "" +
+                "@info(name = 'query1') " +
+                "from typeStream " +
+                "select str as str1, other as other1 , num as num1, count(num) as number " +
+                "having str:regexp(str1, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
+                "insert into outputStream ;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+        QueryCallback callback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                alertCount++;
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        };
+        
+        executionPlanRuntime.addCallback("query1", callback);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"/usr/data/000/001/002", "other", 1.0});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        executionPlanRuntime.shutdown();
+    }
+
+    @Test
+    public void TestStrEqualsIgnoreCase() throws Exception {
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
+        String queryString = "" +
+                "@info(name = 'query1') " +
+                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:equalsIgnoreCase(dst,'/user/hdfs/.TRAsh/current/TMP/PII')==true)] " +
+                "select cmd, src, dst " +
+                "insert into outputStream ;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+        QueryCallback callback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                alertCount++;
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        };
+
+        executionPlanRuntime.addCallback("query1", callback);
+
+        Field field = QueryCallback.class.getDeclaredField("query");
+        field.setAccessible(true);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash/Current/TMP/pii"}); // match case
+        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash///Current/TMP/pii"}); //non-match case
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        executionPlanRuntime.shutdown();
+    }
+
+    @Test
+    public void TestStrContainsIgnoreCase() throws Exception {
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
+        String queryString = "" +
+                "@info(name = 'query1') " +
+                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:containsIgnoreCase(dst,'.TRASH/CURRENT/tMp/pII')==true)] " +
+                "select cmd, src, dst " +
+                "insert into outputStream ;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+        QueryCallback callback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+                alertCount++;
+            }
+        };
+
+        executionPlanRuntime.addCallback("query1", callback);
+
+        Field field = QueryCallback.class.getDeclaredField("query");
+        field.setAccessible(true);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/TMP/pii"}); // match case
+        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash///Current/TMP/pii"}); //non-match case
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        executionPlanRuntime.shutdown();
+    }
+
+    @Test
+    public void TestRegexpIgnoreCase() throws Exception {
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream typeStream (str string, other string, num double) ;";
+        String queryString = "" +
+                "@info(name = 'query1') " +
+                "from typeStream " +
+                "select str as str1, other as other1 , num as num1, count(num) as number " +
+                "having str:regexpIgnoreCase(str1, '/usr/DATA/[0-9]+/[0-9]+/[0-9]+') == true " +
+                "insert into outputStream ;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+        QueryCallback callback = new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                alertCount++;
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        };
+        executionPlanRuntime.addCallback("query1", callback);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"/USR/data/000/001/002", "other", 1.0});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        executionPlanRuntime.shutdown();
+    }
+    
+    @Test
+    public void TestDataObject() throws Exception {
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+        
+        String cseEventStream = "" +
+                "@config(async = 'true') " +
+                "define stream typeStream (dataobj object, str string, first string) ;";
+        String queryString = "" +
+                "@info(name = 'query1') " +
+                "from typeStream " +
+                "select * " +
+                "having str:regexp(str, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
+                "insert into outputStream ;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+        QueryCallback callback = new QueryCallback() {
+            @Override
+           public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                alertCount++;
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        };
+
+        executionPlanRuntime.addCallback("query1", callback);
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{new AlertExecutor(queryString, null, 0, 1, null, null), "/usr/data/000/001/002", "second"});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        executionPlanRuntime.shutdown();
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
new file mode 100644
index 0000000..521317c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+public class TestSiddhiSlideWindow {
+
+    int alertCount = 0;
+
+    @Test
+    public void testSlideWindow1() throws Exception{
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream eventStream (user string, path string, cmd string);";
+//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.time(1 sec)"
+//        		     + " select user, path, cmd, count(path) as cnt" 
+//        			 + " group by user"
+//        			 + " having cnt > 3 insert all events into outputStream;";
+
+//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.length(10)"
+//        		+ " select user, path, cmd, count(path) as cnt" 
+//        		+ " group by user"
+//        		+ " having cnt > 3 insert all events into outputStream;";
+
+//      String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.timeBatch(1 sec)"
+//				+ " select user, path, cmd, count(path) as cnt" 
+//				+ " group by user"
+//				+ " having cnt > 3 insert all events into outputStream;";
+
+        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.lengthBatch(10)"
+                + " select user, path, cmd, count(path) as cnt"
+                + " group by user"
+                + " having cnt > 3 insert all events into outputStream;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+                alertCount++;
+            }
+        });
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
+        executionPlanRuntime.start();
+        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/0000", "open"});
+        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/1111", "open"});
+        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/2222", "open"});
+        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/3333", "open"});
+
+        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/0000", "open"});
+        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/1111", "open"});
+
+        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/0000", "open"});
+        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/1111", "open"});
+        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/2222", "open"});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 0);
+        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/3333", "open"});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        executionPlanRuntime.shutdown();
+    }
+
+    @Test
+    public void testSlideWindow2() throws Exception{
+        alertCount = 0;
+        SiddhiManager siddhiManager = new SiddhiManager();
+
+        String cseEventStream = "define stream eventStream (timeStamp long, user string, path string, cmd string);";
+        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.externalTime(timeStamp,1 sec)"
+                + " select user, path, cmd, count(path) as cnt"
+                + " group by user"
+                + " having cnt > 3 insert all events into outputStream;";
+
+        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+
+        executionPlanRuntime.addCallback("query1", new QueryCallback() {
+            @Override
+            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+                alertCount++;
+                EventPrinter.print(timeStamp, inEvents, removeEvents);
+            }
+        });
+
+        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
+        executionPlanRuntime.start();
+        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/0000", "open"});
+        Thread.sleep(1100);
+        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/1111", "open"});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/2222", "open"});
+        Thread.sleep(100);
+        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/3333", "open"});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 1);
+        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/5555", "open"});
+        Thread.sleep(100);
+        Assert.assertTrue(alertCount == 2);
+        executionPlanRuntime.shutdown();
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
new file mode 100644
index 0000000..beb790a
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSiddhiStream {
+	
+	@Test
+	public void test() {
+		String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,sensitivityType insert into outputStream;";
+		Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
+		
+		rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select    * insert into outputStream;";
+		Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));		
+	}
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
index 7ad0018..576001b 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
@@ -30,6 +30,6 @@ trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
 upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
 hex=org.wso2.siddhi.extension.string.HexFunctionExtension
 unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-equalsIgnoreCase=eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file
+equalsIgnoreCase=org.apache.eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
+containsIgnoreCase=org.apache.eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
+regexpIgnoreCase=org.apache.eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file