You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 06:23:10 UTC

[10/13] incubator-eagle git commit: EAGLE-341 clean inner process alert engine code clean inner process alert engine code

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
deleted file mode 100644
index dd36cac..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.DynamicPolicyLoader;
-import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-public class TestDynamicPolicyLoader {
-	private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
-
-	@Test
-	public void test() throws Exception{
-		System.setProperty("config.resource", "/unittest.conf");
-		Config config = ConfigFactory.load();
-		Map<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>>();
-		policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods<AlertDefinitionAPIEntity>() {
-			@Override
-			public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
-				LOG.info("deleted : " + deleted);
-			}
-			
-			@Override
-			public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
-				Assert.assertTrue(added.size() == 1);
-				LOG.info("added : " + added);
-			}
-			
-			@Override
-			public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
-				Assert.assertTrue(changed.size() == 1);
-				LOG.info("changed :" + changed);
-			}
-		});
-		
-		Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-		initialAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
-		Map<String, AlertDefinitionAPIEntity> map = initialAlertDefs.get("testAlertExecutorId");
-		map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
-		map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
-		
-		PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao = new PolicyDefinitionDAO<AlertDefinitionAPIEntity>() {
-			@Override
-			public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(
-					String site, String dataSource) {
-				Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-				currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
-				Map<String, AlertDefinitionAPIEntity> map = currentAlertDefs.get("testAlertExecutorId");
-				map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1_1"));
-				map.put("policyId_2", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_2", "siddhi", "policyDef_2"));
-				return currentAlertDefs;
-			}
-			
-			@Override
-			public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) {
-				return null;
-			}
-
-            @Override
-            public void updatePolicyDetails(AlertDefinitionAPIEntity entity) { /* do nothing */ }
-		};
-		
-		DynamicPolicyLoader<AlertDefinitionAPIEntity> loader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
-		loader.init(initialAlertDefs, dao, config);
-		
-		try{
-			Thread.sleep(5000);
-		}catch(Exception ex){
-			
-		}
-	}
-	
-	public AlertDefinitionAPIEntity buildTestAlertDefEntity(String programId, String alertExecutorId, String policyId, String policyType, String policyDef) {
-		AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-		entity.setEnabled(true);
-		Map<String, String> tags = new HashMap<String, String>();
-		tags.put("programId", programId);
-		tags.put("alertExecutorId", alertExecutorId);
-		tags.put("policyId", policyId);
-		tags.put("policyType", policyType);
-		entity.setTags(tags);
-		entity.setPolicyDef(policyDef);
-		return entity;
-	}
-	
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
deleted file mode 100644
index 77aaec3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.junit.Test;
-
-public class TestPolicyDistribution {
-    @Test
-    public void test(){
-        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
deleted file mode 100644
index 5fd374e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestPolicyDistributionUpdater {
-    private static Logger LOG = LoggerFactory.getLogger(TestPolicyDistributionUpdater.class);
-
-    @Test
-    public void testPolicyDistributionReporter() throws Exception{
-        StreamMetadataManager.getInstance().reset();
-        PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, 1),
-                Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
-            @Override
-            public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
-                final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
-                entity.setTags(new HashMap<String, String>() {{
-                    put(Constants.POLICY_TYPE, "siddhiCEPEngine");
-                    put(Constants.POLICY_ID, "policyId_1");
-                }});
-                Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
-                map.put("alertExecutorId_1", new HashMap<String, AlertDefinitionAPIEntity>() {{
-                    put("policyId_1", entity);
-                }});
-                entity.setPolicyDef("{\"type\":\"siddhiCEPEngine\",\"expression\":\"from testStream select name insert into outputStream ;\"}");
-                return map;
-            }
-
-            @Override
-            public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
-        };
-
-        AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId_1", new DefaultPolicyPartitioner(), 1, 0, alertDao, new String[]{"testStream"}){
-            public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
-                return new AlertStreamSchemaDAO(){
-                    @Override
-                    public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String application) throws Exception {
-                        AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
-                        entity.setTags(new HashMap<String, String>(){{
-                            put("application", "UnitTest");
-                            put("streamName", "testStream");
-                            put("attrName", "name");
-                        }});
-                        entity.setAttrType("string");
-                        return Arrays.asList(entity);
-                    }
-                };
-            }
-
-            @Override
-            public void report() {
-                Assert.assertEquals(1, getPolicyEvaluators().size());
-                LOG.info("successuflly reported");
-            }
-        };
-
-        Config config = ConfigFactory.load();
-        alertExecutor.prepareConfig(config);
-        alertExecutor.init();
-        Thread.sleep(100);
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
deleted file mode 100644
index c3bc4c9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.junit.Test;
-
-public class TestPolicyPartitioner {
-    @Test
-    public void test(){
-        DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
-        System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
deleted file mode 100644
index e289793..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.text.MessageFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-/**
- * @since Dec 23, 2015
- *
- */
-public class TestExternalBatchWindow {
-
-    private static SiddhiManager siddhiManager;
-
-    @BeforeClass
-    public static void beforeClass() {
-        siddhiManager = new SiddhiManager();
-    }
-
-    @AfterClass
-    public static void afterClass() {
-        siddhiManager.shutdown();
-    }
-
-    @Test
-    public void test02NoMsg() throws Exception {
-        ExecutionPlanRuntime runtime = simpleQueryRuntime();
-
-        final AtomicBoolean recieved = new AtomicBoolean();
-        runtime.addCallback("query", new QueryCallback() {
-
-            @Override
-            public void receive(long arg0, Event[] arg1, Event[] arg2) {
-                recieved.set(true);
-                System.out.println(arg1);
-            }
-        });
-
-        InputHandler input = runtime.getInputHandler("jmxMetric");
-
-        runtime.start();
-        // external events' time stamp less than the window, should not have event recieved in call back.
-        long now = System.currentTimeMillis();
-        int length = 5;
-        for (int i = 0; i < length; i++) {
-            input.send(new Object[] { 15, now + i * 1000 });
-        }
-
-        Thread.sleep(1000);
-        Assert.assertFalse("Event happens inner external time batch window, should not have event recieved in callback!", recieved.get());
-
-        runtime.shutdown();
-    }
-
-    private ExecutionPlanRuntime simpleQueryRuntime() {
-        String query = "define stream jmxMetric(cpu int, timestamp long); "
-                + "@info(name='query')"
-                + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
-                + "select avg(cpu) as avgCpu, count(1) as count insert into tmp;";
-
-        return siddhiManager.createExecutionPlanRuntime(query);
-    }
-
-    /**
-     * This case try to capture the case that the window get a chunk of event that exceed the time batch.
-     * In this case, two next processor should be triggered.
-     */
-    @Test
-    public void test03BunchChunkExceedBatch() {
-        // TODO
-    }
-    @Test
-    public void test04MultiThread() {
-        // TODO
-    }
-
-    @Test
-    public void test05ExternalJoin() {
-        // TODO
-    }
-
-    @Test
-    public void test06EdgeCase() throws Exception {
-        // every 10 sec
-        ExecutionPlanRuntime runtime = simpleQueryRuntime();
-
-        final AtomicInteger recCount = new AtomicInteger(0);
-//        final CountDownLatch latch = new CountDownLatch(2);// for debug
-        runtime.addCallback("query", new QueryCallback() {
-            @Override
-            public void receive(long arg0, Event[] arg1, Event[] arg2) {
-//                latch.countDown();
-                Assert.assertEquals(1, arg1.length);
-                recCount.incrementAndGet();
-                int avgCpu = ((Double) arg1[0].getData()[0]).intValue();
-                if (recCount.get() == 1) {
-                    Assert.assertEquals(15, avgCpu);
-                } else if (recCount.get() == 2) {
-                    Assert.assertEquals(85, avgCpu);
-                }
-                int count = ((Long) arg1[0].getData()[1]).intValue();
-                Assert.assertEquals(3, count);
-            }
-        });
-
-        InputHandler input = runtime.getInputHandler("jmxMetric");
-        runtime.start();
-        // external events' time stamp less than the window, should not have event recieved in call back.
-        long now = 0;
-        int length = 3;
-        for (int i = 0; i < length; i++) {
-            input.send(new Object[] { 15, now + i * 10 });
-        }
-
-        // second round
-        // if the trigger event mix with the last window, we should see the avgValue is not expected
-        for (int i = 0; i < length; i++) {
-            input.send(new Object[] { 85, now + 10000 + i * 10 }); // the first entity of the second round
-        }
-        // to trigger second round
-        input.send(new Object[] { 10000, now + 10 * 10000 });
-
-//        latch.await();// for debug
-
-        Thread.sleep(1000);
-
-        Assert.assertEquals(2, recCount.get());
-    }
-
-    @Test
-    public void test07Pull76() throws Exception {
-        String defaultStream = "define stream LoginEvents (myTime long, ip string, phone string,price int);";
-
-        String query = " @info(name='pull76') "
-                + " from LoginEvents#window.externalTimeBatch(myTime, 5 sec)  "
-                + " select myTime, phone, ip, price, count(ip) as cntip , "
-                + " min(myTime) as mintime, max(myTime) as maxtime "
-                + " insert into events ;";
-
-        ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime(defaultStream + query);
-
-        InputHandler inputHandler = runtime.getInputHandler("LoginEvents");
-
-        runtime.addCallback("pull76", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                if (inEvents != null) {
-                    System.out.println("======================== START ===============================");
-                    int i = 0;
-                    System.out.println(" Events Size:" + inEvents.length);
-                    for (i = 0; i < inEvents.length; i++) {
-                        Event e = inEvents[i];
-                        System.out.println("----------------------------");
-                        System.out.println(new Date((Long) e.getData(0)));
-                        System.out.println("IP:" + e.getData(2));
-                        System.out.println("price :" + e.getData(3));
-                        System.out.println("count :" + e.getData(4));
-                        System.out.println("mintime :" + new Date((Long) e.getData(5)) );
-                        System.out.println("maxtime :" + new Date((Long) e.getData(6)) );
-                        System.out.println("----------------------------");
-                    }
-                    System.out.println("======================== END  ===============================");
-
-                }
-            }
-        });
-
-
-        runtime.start();
-
-        long start = System.currentTimeMillis();
-        Calendar c = Calendar.getInstance();
-        c.add(Calendar.HOUR, 1);
-        c.add(Calendar.SECOND, 1);
-        int i = 0;
-        for (i = 0; i <= 10000; i++) {
-            c.add(Calendar.SECOND, 1);
-            inputHandler.send(c.getTime().getTime(),
-                    new Object[] { c.getTime().getTime(), new String("192.10.1.1"), "1", new Random().nextInt(1000) });
-        }
-        long end = System.currentTimeMillis();
-        System.out.printf("End : %d ", end - start);
-
-        Thread.sleep(1000);
-        runtime.shutdown();
-    }
-
-    @Test
-    public void test01DownSampling() throws Exception {
-        String stream = "define stream jmxMetric(cpu int, memory int, bytesIn long, bytesOut long, timestamp long);";
-        String query = "@info(name = 'downSample') "
-                + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
-                + "select "
-                + "avg(cpu) as avgCpu, max(cpu) as maxCpu, min(cpu) as minCpu, "
-                + " '|' as s, "
-                + " avg(memory) as avgMem, max(memory) as maxMem, min(memory) as minMem, "
-                + " '|' as s1, "
-                + " avg(bytesIn) as avgBytesIn, max(bytesIn) as maxBytesIn, min(bytesIn) as minBytesIn, "
-                + " '|' as s2, "
-                + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, "
-                + " '|' as s3, "
-                + " timestamp as timeWindowEnds, "
-                + " '|' as s4, "
-                + " count(1) as metric_count "
-                + " INSERT INTO tmp;";
-
-        SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
-
-        InputHandler input = plan.getInputHandler("jmxMetric");
-
-        // stream call back doesn't follow the counter
-        final AtomicInteger counter = new AtomicInteger();
-        {
-            // stream callback
-            plan.addCallback("jmxMetric", new StreamCallback() {
-                @Override
-                public void receive(Event[] arg0) {
-                    counter.addAndGet(arg0.length);
-                }
-            });
-        }
-        final AtomicInteger queryWideCounter = new AtomicInteger();
-        {
-            plan.addCallback("downSample", new QueryCallback() {
-                @Override
-                public void receive(long arg0, Event[] inevents, Event[] removeevents) {
-                    int currentCount = queryWideCounter.addAndGet(inevents.length);
-                    System.out.println(MessageFormat.format("Round {0} ====", currentCount));
-                    System.out.println(" events count " + inevents.length);
-
-                    for (Event e : inevents) {
-                        Object[] tranformedData = e.getData();
-                        for (Object o : tranformedData) {
-                            System.out.print(o);
-                            System.out.print(' ');
-                        }
-                        System.out.println(" events endendend");
-                    }
-                }
-
-            });
-        }
-
-        plan.start();
-
-        int round = 4;
-        int eventsPerRound= 0;
-        long externalTs = System.currentTimeMillis();
-        for (int i = 0; i < round; i++) {
-            eventsPerRound = sendEvent(input, i, externalTs);
-            Thread.sleep(3000);
-        }
-        //
-        sendEvent(input, round, externalTs);
-
-        plan.shutdown();
-        Thread.sleep(1000);
-        Assert.assertEquals(round * eventsPerRound + eventsPerRound, counter.get());
-        Assert.assertEquals(round, queryWideCounter.get());
-    }
-
-    // one round of sending events
-    private int sendEvent(InputHandler input, int ite, long externalTs) throws Exception {
-        int len = 3;
-        Event[] events = new Event[len];
-        for (int i = 0; i < len; i++) {
-            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
-            events[i] = new Event(externalTs,
-                    new Object[] { 15 + 10 * i * ite, 1500 + 10 * i * ite, 1000L, 2000L, externalTs + ite * 10000 + i * 50 });
-        }
-
-        input.send(events);
-        return len;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
deleted file mode 100644
index 924ba5c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.lang.reflect.Field;
-
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-public class TestSiddhiEngine {
-    static final Logger log = LoggerFactory.getLogger(TestSiddhiEngine.class);
-    int alertCount = 0;
-
-    @Test
-    public void TestStrContains() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:contains(dst,'/user/hdfs/.Trash/Current/tmp/pii')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/tmp/pii"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestRegexp() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (str string, other string, num double) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select str as str1, other as other1 , num as num1, count(num) as number " +
-                "having str:regexp(str1, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"/usr/data/000/001/002", "other", 1.0});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestStrEqualsIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:equalsIgnoreCase(dst,'/user/hdfs/.TRAsh/current/TMP/PII')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        Field field = QueryCallback.class.getDeclaredField("query");
-        field.setAccessible(true);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash/Current/TMP/pii"}); // match case
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash///Current/TMP/pii"}); //non-match case
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestStrContainsIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:containsIgnoreCase(dst,'.TRASH/CURRENT/tMp/pII')==true)] " +
-                "select cmd, src, dst " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        Field field = QueryCallback.class.getDeclaredField("query");
-        field.setAccessible(true);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/TMP/pii"}); // match case
-        inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash///Current/TMP/pii"}); //non-match case
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void TestRegexpIgnoreCase() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream typeStream (str string, other string, num double) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select str as str1, other as other1 , num as num1, count(num) as number " +
-                "having str:regexpIgnoreCase(str1, '/usr/DATA/[0-9]+/[0-9]+/[0-9]+') == true " +
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"/USR/data/000/001/002", "other", 1.0});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-    
-    @Test
-    public void TestDataObject() throws Exception {
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-        
-        String cseEventStream = "" +
-                "@config(async = 'true') " +
-                "define stream typeStream (dataobj object, str string, first string) ;";
-        String queryString = "" +
-                "@info(name = 'query1') " +
-                "from typeStream " +
-                "select * " +
-                "having str:regexp(str, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " + 
-                "insert into outputStream ;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
-        QueryCallback callback = new QueryCallback() {
-            @Override
-           public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        };
-
-        executionPlanRuntime.addCallback("query1", callback);
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{new AlertExecutor(queryString, null, 0, 1, null, null), "/usr/data/000/001/002", "second"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
deleted file mode 100644
index 521317c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import org.apache.eagle.common.DateTimeUtil;
-
-public class TestSiddhiSlideWindow {
-
-    int alertCount = 0;
-
-    @Test
-    public void testSlideWindow1() throws Exception{
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream eventStream (user string, path string, cmd string);";
-//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.time(1 sec)"
-//        		     + " select user, path, cmd, count(path) as cnt" 
-//        			 + " group by user"
-//        			 + " having cnt > 3 insert all events into outputStream;";
-
-//        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.length(10)"
-//        		+ " select user, path, cmd, count(path) as cnt" 
-//        		+ " group by user"
-//        		+ " having cnt > 3 insert all events into outputStream;";
-
-//      String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.timeBatch(1 sec)"
-//				+ " select user, path, cmd, count(path) as cnt" 
-//				+ " group by user"
-//				+ " having cnt > 3 insert all events into outputStream;";
-
-        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.lengthBatch(10)"
-                + " select user, path, cmd, count(path) as cnt"
-                + " group by user"
-                + " having cnt > 3 insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                alertCount++;
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/1111", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/2222", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/3333", "open"});
-
-        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/1111", "open"});
-
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/0000", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/1111", "open"});
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/2222", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 0);
-        inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/3333", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void testSlideWindow2() throws Exception{
-        alertCount = 0;
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream eventStream (timeStamp long, user string, path string, cmd string);";
-        String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.externalTime(timeStamp,1 sec)"
-                + " select user, path, cmd, count(path) as cnt"
-                + " group by user"
-                + " having cnt > 3 insert all events into outputStream;";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                alertCount++;
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
-        executionPlanRuntime.start();
-        long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/0000", "open"});
-        Thread.sleep(1100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/1111", "open"});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/2222", "open"});
-        Thread.sleep(100);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/3333", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 1);
-        inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/5555", "open"});
-        Thread.sleep(100);
-        Assert.assertTrue(alertCount == 2);
-        executionPlanRuntime.shutdown();
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
deleted file mode 100644
index 0027bce..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSiddhiStream {
-	
-	@Test
-	public void test() {
-		String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,define stream hdfsAuditLogEventStream(eagleAlertContext object, allowed string,cmd string,dst string,host string,securityZone string,sensitivityType string,src string,timestamp long,user string); @info(name = 'query') from hdfsAuditLogEventStream[cmd=='open'] select * insert into outputStream ; insert into outputStream;";
-		Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
-		
-		rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select    * insert into outputStream;";
-		Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
deleted file mode 100644
index 71f9691..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-
-import org.wso2.siddhi.core.util.EventPrinter;
-
-public class TestAggregation {
-    @Test
-    public void test01DownSampling() throws Exception {
-        String stream = "define stream jmxMetric(cpu double, memory int, bytesIn int, bytesOut long, timestamp long);";
-        String query = "@info(name = 'downSample') "
-                + "from jmxMetric#window.timeBatch(1 sec) "
-                + "select "
-                + " min(cpu) as minCpu, max(cpu) as maxCpu, avg(cpu) as avgCpu, "
-                + " min(memory) as minMem, max(memory) as maxMem, avg(memory) as avgMem, "
-                + " min(bytesIn) as minBytesIn, max(bytesIn) as maxBytesIn, avg(bytesIn) as avgBytesIn, sum(bytesIn) as totalBytesIn, "
-                + " min(bytesOut) as minBytesOut, max(bytesOut) as maxBytesOut, avg(bytesOut) as avgBytesOut, sum(bytesOut) as totalBytesOut, "
-                + " timestamp as timeWindowEnds "
-                + " INSERT  INTO tmp;";
-
-        SiddhiManager sm = new SiddhiManager();
-        ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
-
-        final AtomicInteger counter = new AtomicInteger();
-        plan.addCallback("downSample", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-                int count = counter.incrementAndGet();
-                if (count == 1) {
-                    Assert.assertEquals(6000L, inEvents[0].getData(9));
-                } else if(count == 2) {
-                    Assert.assertEquals(6000L, inEvents[0].getData(9));
-                }
-            }
-        });
-        InputHandler input = plan.getInputHandler("jmxMetric");
-
-        plan.start();
-        sendEvent(input);
-        Thread.sleep(100);
-        sendEvent(input);
-        Thread.sleep(1000);
-        sendEvent(input);
-        Thread.sleep(1000);
-        sendEvent(input);
-        Thread.sleep(200);
-        plan.shutdown();
-    }
-
-    // send 3 events
-    private void sendEvent(InputHandler input) throws Exception {
-        int len = 3;
-        Event[] events = new Event[len];
-        for (int i = 0; i < len; i++) {
-            long externalTs = System.currentTimeMillis();
-            // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
-            events[i] = new Event(externalTs + i, new Object[] {
-                    15.0,
-                    15,
-                    1000,
-                    2000L,
-                    externalTs + i
-            });
-        }
-
-        for (Event e : events) {
-            input.send(e);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
deleted file mode 100644
index e0be82c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- *  * Licensed to the Apache Software Foundation (ASF) under one or more
- *  * contributor license agreements.  See the NOTICE file distributed with
- *  * this work for additional information regarding copyright ownership.
- *  * The ASF licenses this file to You under the Apache License, Version 2.0
- *  * (the "License"); you may not use this file except in compliance with
- *  * the License.  You may obtain a copy of the License at
- *  *
- *  *    http://www.apache.org/licenses/LICENSE-2.0
- *  *
- *  * Unless required by applicable law or agreed to in writing, software
- *  * distributed under the License is distributed on an "AS IS" BASIS,
- *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  * See the License for the specific language governing permissions and
- *  * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-/**
- * Created by yonzhang on 11/25/15.
- */
-public class TestSiddhiExpiredEvents {
-    @Test
-    public void testExpiredEventsInLengthWindow() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream TempStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from TempStream#window.length(3) "
-                + " select *"
-                + " insert all events into DelayedTempStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "open1"});
-        inputHandler.send(new Object[]{"user", "open2"});
-        inputHandler.send(new Object[]{"user", "open3"});
-        inputHandler.send(new Object[]{"user", "open4"});
-        inputHandler.send(new Object[]{"user", "open5"});
-        inputHandler.send(new Object[]{"user", "open6"});
-        Thread.sleep(1000);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void testExpiredEventsInLengthBatchWindow() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream TempStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from TempStream#window.lengthBatch(2) "
-                + " select *"
-                + " insert all events into DelayedTempStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "open1"});
-        inputHandler.send(new Object[]{"user", "open2"});
-        inputHandler.send(new Object[]{"user", "open3"});
-        inputHandler.send(new Object[]{"user", "open4"});
-        inputHandler.send(new Object[]{"user", "open5"});
-        inputHandler.send(new Object[]{"user", "open6"});
-        Thread.sleep(1000);
-        executionPlanRuntime.shutdown();
-    }
-
-    @Test
-    public void testExpireEvents2() throws Exception{
-        SiddhiManager siddhiManager = new SiddhiManager();
-
-        String cseEventStream = "define stream TempStream (user string, cmd string);";
-        String query = "@info(name = 'query1') from TempStream#window.length(4) "
-                + " select user, cmd, count(user) as cnt " +
-                " group by user " +
-                "having cnt > 2 "
-                + " insert all events into DelayedTempStream";
-
-        ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
-        executionPlanRuntime.addCallback("query1", new QueryCallback() {
-            @Override
-            public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
-                EventPrinter.print(timeStamp, inEvents, removeEvents);
-            }
-        });
-
-        InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
-        executionPlanRuntime.start();
-        inputHandler.send(new Object[]{"user", "open1"});
-        inputHandler.send(new Object[]{"user", "open2"});
-        inputHandler.send(new Object[]{"user", "open3"});
-        inputHandler.send(new Object[]{"user", "open4"});
-        inputHandler.send(new Object[]{"user", "open5"});
-//        inputHandler.send(new Object[]{"user", "open6"});
-//        inputHandler.send(new Object[]{"user", "open7"});
-//        inputHandler.send(new Object[]{"user", "open8"});
-//        inputHandler.send(new Object[]{"user", "open9"});
-        Thread.sleep(1000);
-        executionPlanRuntime.shutdown();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
deleted file mode 100644
index 524b867..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
+++ /dev/null
@@ -1,33 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  "eagleProps" : {
-    "site" : "sandbox",
-    "application" : "UnitTest",
-    "eagleService": {
-      "host": "localhost",
-      "port": 38080,
-      "username": "admin",
-      "password": "secret"
-    }
-  },
-  "dynamicConfigSource" : {
-    "enabled" : true,
-    "initDelayMillis" : 0,
-    "delayMillis" : 30000,
-    "ignoreDeleteFromSource" : true
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
deleted file mode 100644
index 71a5dac..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, DRFA, stdout
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
deleted file mode 100644
index 435b4c3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
+++ /dev/null
@@ -1,39 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension
-coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension
-concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension
-length=org.wso2.siddhi.extension.string.LengthFunctionExtension
-lower=org.wso2.siddhi.extension.string.LowerFunctionExtension
-regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension
-repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension
-replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension
-replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension
-reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension
-strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension
-substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension
-trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
-upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
-hex=org.wso2.siddhi.extension.string.HexFunctionExtension
-unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-contains=org.wso2.siddhi.extension.string.ContainsFunctionExtension
-
-# Eagle Siddhi Extension
-equalsIgnoreCase=org.apache.eagle.policy.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=org.apache.eagle.policy.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=org.apache.eagle.policy.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
deleted file mode 100644
index 1d18b67..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
+++ /dev/null
@@ -1,65 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-  "envContextConfig" : {
-    "env" : "storm",
-    "mode" : "local",
-    "topologyName" : "auditLogProcessTopology",
-    "stormConfigFile" : "security-auditlog-storm.yaml",
-    "parallelismConfig" : {
-      "kafkaMsgConsumer" : 2,
-      "hdfsAuditLogAlertExecutor*" : 3
-    }
-  },
-  "dataSourceConfig": {
-    "flavor" : "stormkafka",
-    "topic" : "hdfs_audit_log",
-    "zkConnection" : "localhost:2181",
-    "zkConnectionTimeoutMS" : 15000,
-    "consumerGroupId" : "EagleConsumer",
-    "fetchSize" : 1048586,
-    "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
-    "transactionZKServers" : "localhost",
-    "transactionZKPort" : 2181,
-    "transactionZKRoot" : "/brokers/topics",
-    "transactionStateUpdateMS" : 2000
-  },
-  "alertExecutorConfigs" : {
-     "hdfsAuditLogAlertExecutor" : {
-       "parallelism" : 2,
-       "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
-       "needValidation" : "true"
-     }
-  },
-  "eagleProps" : {
-    "site": "site1",
-    "application": "hdfsAuditLog",
-    "eagleService": {
-       "host": "localhost",
-       "port": 38080
-    },
-  	"dataJoinPollIntervalSec" : 30,
-    "env"       : "test",
-    "mail.host" : "mailHost.com",
-	  "mail.smtp.port":"25",
-    "mail.debug" : "true"
-  },
-  "dynamicConfigSource" : {
-  	"enabled" : false,
-  	"initDelayMillis" : 0,
-  	"delayMillis" : 1000
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
index e2a9222..31d6d2b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
@@ -31,16 +31,6 @@
 	<name>eagle-alert-service</name>
 
 	<dependencies>
-		<dependency>
-			<groupId>org.apache.eagle</groupId>
-			<artifactId>eagle-alert-base</artifactId>
-			<version>${project.version}</version>
-		</dependency>
-		<dependency>
-			<groupId>org.apache.eagle</groupId>
-			<artifactId>eagle-alert-process</artifactId>
-			<version>${project.version}</version>
-		</dependency>
 	  	<dependency>
   			<groupId>org.apache.eagle</groupId>
   			<artifactId>eagle-service-base</artifactId>
@@ -59,6 +49,11 @@
 			<artifactId>eagle-metadata-base</artifactId>
 			<version>${project.version}</version>
 		</dependency>
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>eagle-stream-process-base</artifactId>
+			<version>${project.version}</version>
+		</dependency>
 	</dependencies>
 </project>
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
deleted file mode 100644
index 7f5bddd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.service.generic.GenericEntityServiceResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.siddhi.AttributeType;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.common.DateTimeUtil;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-public class SiddhiAlertPolicyValidateProvider extends AlertPolicyValidateProvider{
-
-	public String type;
-	public List<String> streamNames;
-	public String policyDefinition;	
-	public static Logger LOG = LoggerFactory.getLogger(PolicyValidateResource.class);
-	public static final String EXECUTION_PLAN_NAME = "query";
-	
-	@SuppressWarnings({"unchecked"})
-	public String getStreamDef(String streamName) {
-		GenericEntityServiceResource resource = new GenericEntityServiceResource();
-		String startTime = "1969-01-01 00:00:00";
-		String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(Long.MAX_VALUE);
-		int pageSize = 1000;
-		String query = Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME + "[@streamName=\"" + streamName + "\"]{*}";
-		GenericServiceAPIResponseEntity<AlertStreamSchemaEntity> streamResponse = resource.search(query, startTime, endTime, pageSize, null, false, false, 0L, 0, true, 0, null, false);
-		List<AlertStreamSchemaEntity> list = streamResponse.getObj();
-		
-		Map<String, String> map = new HashMap<String, String>();
-		for(AlertStreamSchemaEntity entity : list){
-			map.put(entity.getTags().get("attrName"), entity.getAttrType());
-		}
-		StringBuilder sb = new StringBuilder();
-		sb.append("dataobj object,");
-		for(Map.Entry<String, String> entry : map.entrySet()){
-			String attrName = entry.getKey();
-			sb.append(attrName);
-			sb.append(" ");
-			String attrType = entry.getValue();
-			if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
-				sb.append("string");
-			}else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
-				sb.append("int");
-			}else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
-				sb.append("long");
-			}else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
-				sb.append("bool");
-			}else{
-				LOG.error("AttrType is not recognized, ignore : " + attrType);
-			}
-			sb.append(",");
-		}
-		if(sb.length() > 0){
-			sb.deleteCharAt(sb.length()-1);
-		}
-		
-		String siddhiStreamDefFormat = "define stream " + streamName + " (" + "%s" + ");";
-		String streamDef = String.format(siddhiStreamDefFormat, sb.toString());
-		return streamDef;
-	}
-	
-	@SuppressWarnings({ "rawtypes", "unchecked" })
-	@Override
-	public GenericServiceAPIResponseEntity validate() {
-		GenericServiceAPIResponseEntity result = new GenericServiceAPIResponseEntity();
-		SiddhiManager siddhiManager = new SiddhiManager();
-		ExecutionPlanRuntime executionPlanRuntime = null;
-		try {				
-			String streamDefs = new String();
-			for(String streamName : streamNames){
-				//String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(streamName);
-				//We don't use SiddhiStreamMetadataUtils, for it only consume one dataSource
-				String streamDef = getStreamDef(streamName);
-				LOG.info("Siddhi stream definition : " + streamDef);
-				streamDefs += streamDef;
-			}		
-			
-			String executionPlan = streamDefs + " @info(name = '" + EXECUTION_PLAN_NAME + "') " +  policyDefinition;
-			executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
-		}
-		catch (Exception ex) {
-			result.setSuccess(false);
-			result.setException(ex);
-			return result;
-		}
-		finally {
-			if (executionPlanRuntime != null) {
-				executionPlanRuntime.shutdown();
-			}
-		}
-		result.setSuccess(true);
-		return result;
-	}
-	
-	@Override
-	public String PolicyType() {
-		return Constants.policyType.siddhiCEPEngine.name();
-	}
-	
-	@Override
-	public List<Module> BindingModules() {
-		Module module = new SimpleModule("policyValidate").registerSubtypes(new NamedType(SiddhiAlertPolicyValidateProvider.class, PolicyType()));
-		return Arrays.asList(module);	
-	}
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
deleted file mode 100644
index 41ece1d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- *  Unless required by applicable law or agreed to in writing, software
- *  distributed under the License is distributed on an "AS IS" BASIS,
- *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- *  See the License for the specific language governing permissions and
- *  limitations under the License.
- *
- */
-
-package org.apache.eagle.service.alert;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.alert.entity.SiteApplicationServiceEntity;
-
-import java.util.List;
-import java.util.Map;
-
-public class SiteApplicationObject extends TaggedLogAPIEntity {
-
-    public Boolean getEnabled() {
-        return enabled;
-    }
-
-    public void setEnabled(Boolean enabled) {
-        this.enabled = enabled;
-        valueChanged("enabled");
-    }
-
-    public List<SiteApplicationServiceEntity> getApplications() {
-        return applications;
-    }
-
-    public void setApplications(List<SiteApplicationServiceEntity> applications) {
-        this.applications = applications;
-        valueChanged("applicationList");
-    }
-
-    @Override
-    public Map<String, String> getTags() {
-        return tags;
-    }
-
-    @Override
-    public void setTags(Map<String, String> tags) {
-        this.tags = tags;
-        valueChanged("tags");
-    }
-
-    Map<String, String> tags;
-    Boolean enabled;
-    List<SiteApplicationServiceEntity> applications;
-}
\ No newline at end of file