You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2015/11/19 11:47:51 UTC
[44/55] [abbrv] [partial] incubator-eagle git commit: [EAGLE-46]
Rename package name as "org.apache.eagle"
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java
deleted file mode 100644
index baf0612..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/config/TestAlertDedup.java
+++ /dev/null
@@ -1,38 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.config;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-import eagle.dataproc.core.JsonSerDeserUtils;
-
-public class TestAlertDedup {
-
- @Test
- public void test() throws Exception{
- String alertDef = "{\"alertDedupIntervalMin\":\"720\",\"emailDedupIntervalMin\":\"1440\"}";
- DeduplicatorConfig dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
- Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 720);
- Assert.assertEquals(dedupConfig.getEmailDedupIntervalMin(), 1440);
-
- alertDef = "null";
- dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
- Assert.assertEquals(dedupConfig, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
deleted file mode 100644
index aae3afb..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import eagle.common.config.EagleConfigConstants;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestAlertDefinitionDAOImpl {
-
- public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, String programId, String alertExecutorId, String policyId, String policyType) {
- AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
- entity.setEnabled(true);
- Map<String, String> tags = new HashMap<String, String>();
- tags.put("site", site);
- tags.put("programId", programId);
- tags.put("alertExecutorId", alertExecutorId);
- tags.put("policyId", policyId);
- tags.put("policyType", policyType);
- entity.setTags(tags);
- return entity;
- }
-
- @Test
- public void test() throws Exception{
- Config config = ConfigFactory.load();
- String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
- int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
-
- String site = "sandbox";
- String dataSource = "UnitTest";
- AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(eagleServiceHost, eagleServicePort) {
- @Override
- public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) throws Exception {
- List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC"));
- list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD"));
- return list;
- }
- };
-
- Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
-
- Assert.assertEquals(2, retMap.size());
- Assert.assertEquals(2, retMap.get("TestExecutor1").size());
- Assert.assertEquals(2, retMap.get("TestExecutor2").size());
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
deleted file mode 100644
index 936313a..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
+++ /dev/null
@@ -1,60 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dao;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import junit.framework.Assert;
-
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import eagle.alert.siddhi.SiddhiStreamMetadataUtils;
-import eagle.alert.siddhi.StreamMetadataManager;
-import org.junit.Test;
-
-public class TestSiddhiStreamMetadataUtils {
- @Test
- public void test() throws Exception{
- Config config = ConfigFactory.load();
- StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO(){
- @Override
- public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(
- String dataSource) {
- return Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"),
- generateStreamMetadataAPIEntity("attrName2", "LONG")
- );
- }
- });
- String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
- Assert.assertEquals("define stream " + "testStreamName" + "(eagleAlertContext object,attrName1 string,attrName2 long);", siddhiStreamDef);
- StreamMetadataManager.getInstance().reset();
- }
-
- private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){
- AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
- entity.setTags(new HashMap<String, String>(){{
- put("programId", "testProgramId");
- put("streamName", "testStreamName");
- put("attrName", attrName);
- }});
- entity.setAttrType(attrType);
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
deleted file mode 100644
index 2ff978d..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.dao;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import eagle.alert.entity.AlertStreamSchemaEntity;
-import eagle.alert.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.junit.Test;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestStreamDefinitionDAOImpl {
-
- public AlertStreamSchemaEntity buildTestStreamDefEntity(String programId, String streamName, String attrName) {
- AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
- entity.setAttrType("String");
- entity.setAttrValueResolver("DefaultAttrValueResolver");
- entity.setCategory("SimpleType");
- Map<String, String> tags = new HashMap<String, String>();
- tags.put("programId", programId);
- tags.put("streamName", streamName);
- tags.put("attrName", attrName);
- entity.setTags(tags);
- return entity;
- }
-
- @Test
- public void test() throws Exception{
- Config config = ConfigFactory.load();
- AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, null) {
- public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
- List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
- String programId = "UnitTest";
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr1"));
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr2"));
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr3"));
- list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr4"));
- return list;
- }
- };
-
- StreamMetadataManager.getInstance().init(config, dao);
- Map<String, List<AlertStreamSchemaEntity>> retMap = StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams();
- Assert.assertTrue(retMap.get("TestStream").size() == 4);
- StreamMetadataManager.getInstance().reset();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java
deleted file mode 100644
index fede802..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestDynamicPolicyLoader.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import eagle.alert.dao.AlertDefinitionDAO;
-import eagle.alert.entity.AlertDefinitionAPIEntity;
-import junit.framework.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class TestDynamicPolicyLoader {
- private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
-
- @Test
- public void test() throws Exception{
- System.setProperty("config.resource", "/unittest.conf");
- Config config = ConfigFactory.load();
- Map<String, PolicyLifecycleMethods> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods>();
- policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods() {
- @Override
- public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
- LOG.info("deleted : " + deleted);
- }
-
- @Override
- public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
- Assert.assertTrue(added.size() == 1);
- LOG.info("added : " + added);
- }
-
- @Override
- public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
- Assert.assertTrue(changed.size() == 1);
- LOG.info("changed :" + changed);
- }
- });
-
- Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
- initialAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
- Map<String, AlertDefinitionAPIEntity> map = initialAlertDefs.get("testAlertExecutorId");
- map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
- map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
-
- AlertDefinitionDAO dao = new AlertDefinitionDAO() {
- @Override
- public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(
- String site, String dataSource) {
- Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
- currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
- Map<String, AlertDefinitionAPIEntity> map = currentAlertDefs.get("testAlertExecutorId");
- map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1_1"));
- map.put("policyId_2", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_2", "siddhi", "policyDef_2"));
- return currentAlertDefs;
- }
-
- @Override
- public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) {
- return null;
- }
- };
-
- DynamicPolicyLoader loader = DynamicPolicyLoader.getInstance();
- loader.init(initialAlertDefs, dao, config);
-
- try{
- Thread.sleep(5000);
- }catch(Exception ex){
-
- }
- }
-
- public AlertDefinitionAPIEntity buildTestAlertDefEntity(String programId, String alertExecutorId, String policyId, String policyType, String policyDef) {
- AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
- entity.setEnabled(true);
- Map<String, String> tags = new HashMap<String, String>();
- tags.put("programId", programId);
- tags.put("alertExecutorId", alertExecutorId);
- tags.put("policyId", policyId);
- tags.put("policyType", policyType);
- entity.setTags(tags);
- entity.setPolicyDef(policyDef);
- return entity;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java
deleted file mode 100644
index 96ec8f4..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/policy/TestPolicyDistribution.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.policy;
-
-import org.junit.Test;
-
-public class TestPolicyDistribution {
- @Test
- public void test(){
- DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
- System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
- System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
- System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java
deleted file mode 100644
index 4eb0c61..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiEngine.java
+++ /dev/null
@@ -1,248 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import java.lang.reflect.Field;
-import java.util.List;
-
-import eagle.executor.AlertExecutor;
-import junit.framework.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-import org.wso2.siddhi.query.api.execution.query.Query;
-import org.wso2.siddhi.query.api.execution.query.selection.OutputAttribute;
-import org.wso2.siddhi.query.api.expression.Expression;
-import org.wso2.siddhi.query.api.expression.Variable;
-
-public class TestSiddhiEngine {
- static final Logger log = LoggerFactory.getLogger(TestSiddhiEngine.class);
- int alertCount = 0;
-
- @Test
- public void TestStrContains() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "" +
- "@config(async = 'true') " +
- "define stream typeStream (cmd string, src string, dst string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:contains(dst,'/user/hdfs/.Trash/Current/tmp/pii')==true)] " +
- "select cmd, src, dst " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- alertCount++;
- }
- };
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/tmp/pii"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestRegexp() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "" +
- "@config(async = 'true') " +
- "define stream typeStream (str string, other string, num double) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream " +
- "select str as str1, other as other1 , num as num1, count(num) as number " +
- "having str:regexp(str1, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"/usr/data/000/001/002", "other", 1.0});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestStrEqualsIgnoreCase() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:equalsIgnoreCase(dst,'/user/hdfs/.TRAsh/current/TMP/PII')==true)] " +
- "select cmd, src, dst " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- Field field = QueryCallback.class.getDeclaredField("query");
- field.setAccessible(true);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash/Current/TMP/pii"}); // match case
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash///Current/TMP/pii"}); //non-match case
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestStrContainsIgnoreCase() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:containsIgnoreCase(dst,'.TRASH/CURRENT/tMp/pII')==true)] " +
- "select cmd, src, dst " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- alertCount++;
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- Field field = QueryCallback.class.getDeclaredField("query");
- field.setAccessible(true);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/TMP/pii"}); // match case
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash///Current/TMP/pii"}); //non-match case
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestRegexpIgnoreCase() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream typeStream (str string, other string, num double) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream " +
- "select str as str1, other as other1 , num as num1, count(num) as number " +
- "having str:regexpIgnoreCase(str1, '/usr/DATA/[0-9]+/[0-9]+/[0-9]+') == true " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"/USR/data/000/001/002", "other", 1.0});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestDataObject() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "" +
- "@config(async = 'true') " +
- "define stream typeStream (dataobj object, str string, first string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream " +
- "select * " +
- "having str:regexp(str, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{new AlertExecutor(queryString, null, 0, 1, null, null), "/usr/data/000/001/002", "second"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java
deleted file mode 100644
index 7bac0e4..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiSlideWindow.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import eagle.common.DateTimeUtil;
-
-public class TestSiddhiSlideWindow {
-
- int alertCount = 0;
-
- @Test
- public void testSlideWindow1() throws Exception{
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream eventStream (user string, path string, cmd string);";
-// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.time(1 sec)"
-// + " select user, path, cmd, count(path) as cnt"
-// + " group by user"
-// + " having cnt > 3 insert all events into outputStream;";
-
-// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.length(10)"
-// + " select user, path, cmd, count(path) as cnt"
-// + " group by user"
-// + " having cnt > 3 insert all events into outputStream;";
-
-// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.timeBatch(1 sec)"
-// + " select user, path, cmd, count(path) as cnt"
-// + " group by user"
-// + " having cnt > 3 insert all events into outputStream;";
-
- String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.lengthBatch(10)"
- + " select user, path, cmd, count(path) as cnt"
- + " group by user"
- + " having cnt > 3 insert all events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- alertCount++;
- }
- });
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/0000", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/1111", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/2222", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/3333", "open"});
-
- inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/0000", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/1111", "open"});
-
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/0000", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/1111", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/2222", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 0);
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/3333", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void testSlideWindow2() throws Exception{
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream eventStream (timeStamp long, user string, path string, cmd string);";
- String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.externalTime(timeStamp,1 sec)"
- + " select user, path, cmd, count(path) as cnt"
- + " group by user"
- + " having cnt > 3 insert all events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
- executionPlanRuntime.start();
- long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/0000", "open"});
- Thread.sleep(1100);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/1111", "open"});
- Thread.sleep(100);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/2222", "open"});
- Thread.sleep(100);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/3333", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/5555", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 2);
- executionPlanRuntime.shutdown();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java
deleted file mode 100644
index 9ad1654..0000000
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/eagle/alert/siddhi/TestSiddhiStream.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package eagle.alert.siddhi;
-
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSiddhiStream {
-
- @Test
- public void test() {
- String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,sensitivityType insert into outputStream;";
- Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
-
- rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select * insert into outputStream;";
- Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
new file mode 100644
index 0000000..76c43ce
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/cep/TestSiddhiEvaluator.java
@@ -0,0 +1,128 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.cep;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.dao.AlertDefinitionDAOImpl;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAO;
+import org.apache.eagle.alert.dao.AlertStreamSchemaDAOImpl;
+import org.apache.eagle.alert.entity.AlertAPIEntity;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.siddhi.EagleAlertContext;
+import org.apache.eagle.alert.siddhi.SiddhiPolicyDefinition;
+import org.apache.eagle.alert.siddhi.SiddhiPolicyEvaluator;
+import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.apache.eagle.dataproc.core.ValuesArray;
+import org.apache.eagle.datastream.Collector;
+import org.apache.eagle.datastream.Tuple2;
+import org.apache.eagle.executor.AlertExecutor;
+import junit.framework.Assert;
+import org.junit.Test;
+
+import java.util.*;
+
+public class TestSiddhiEvaluator {
+
+ int alertCount = 0;
+
+ public AlertStreamSchemaEntity createStreamMetaEntity(String attrName, String type) {
+ AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
+ Map<String, String> tags = new HashMap<String, String>();
+ tags.put("dataSource", "hdfsAuditLog");
+ tags.put("streamName", "hdfsAuditLogEventStream");
+ tags.put("attrName", attrName);
+ entity.setTags(tags);
+ entity.setAttrType(type);
+ return entity;
+ }
+
+ @Test
+ public void test() throws Exception{
+ Config config = ConfigFactory.load("unittest.conf");
+ AlertStreamSchemaDAO streamDao = new AlertStreamSchemaDAOImpl(null, null) {
+ @Override
+ public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
+ List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
+ list.add(createStreamMetaEntity("cmd", "string"));
+ list.add(createStreamMetaEntity("dst", "string"));
+ list.add(createStreamMetaEntity("src", "string"));
+ list.add(createStreamMetaEntity("host", "string"));
+ list.add(createStreamMetaEntity("user", "string"));
+ list.add(createStreamMetaEntity("timestamp", "long"));
+ list.add(createStreamMetaEntity("securityZone", "string"));
+ list.add(createStreamMetaEntity("sensitivityType", "string"));
+ list.add(createStreamMetaEntity("allowed", "string"));
+ return list;
+ }
+ };
+ StreamMetadataManager.getInstance().init(config, streamDao);
+
+ Map<String, Object> data1 = new TreeMap<String, Object>(){{
+ put("cmd", "open");
+ put("dst", "");
+ put("src", "");
+ put("host", "");
+ put("user", "");
+ put("timestamp", String.valueOf(System.currentTimeMillis()));
+ put("securityZone", "");
+ put("sensitivityType", "");
+ put("allowed", "true");
+ }};
+ final SiddhiPolicyDefinition policyDef = new SiddhiPolicyDefinition();
+ policyDef.setType("SiddhiCEPEngine");
+ String expression = "from hdfsAuditLogEventStream[cmd=='open'] " +
+ "select * " +
+ "insert into outputStream ;";
+ policyDef.setExpression(expression);
+ SiddhiPolicyEvaluator evaluator = new SiddhiPolicyEvaluator(config, "testPolicy", policyDef, new String[]{"hdfsAuditLogEventStream"});
+ EagleAlertContext context = new EagleAlertContext();
+
+ AlertDefinitionDAO alertDao = new AlertDefinitionDAOImpl(null, null) {
+ @Override
+ public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(String site, String dataSource) throws Exception {
+ return null;
+ }
+ };
+
+ context.alertExecutor = new AlertExecutor("alertExecutorId", null, 3, 1, alertDao, new String[]{"hdfsAuditLogEventStream"}) {
+ @Override
+ public Map<String, String> getDimensions(String policyId) {
+ return new HashMap<String, String>();
+ }
+
+ @Override
+ public void runMetricReporter() {}
+ };
+ context.alertExecutor.prepareConfig(config);
+ context.alertExecutor.init();
+ context.evaluator = evaluator;
+ context.policyId = "testPolicy";
+ context.outputCollector = new Collector<Tuple2<String, AlertAPIEntity>> () {
+ @Override
+ public void collect(Tuple2<String, AlertAPIEntity> stringAlertAPIEntityTuple2) {
+ alertCount++;
+ }
+ };
+ evaluator.evaluate(new ValuesArray(context, "hdfsAuditLogEventStream", data1));
+ Thread.sleep(2 * 1000);
+ Assert.assertEquals(alertCount, 1);
+ StreamMetadataManager.getInstance().reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
new file mode 100644
index 0000000..4295bdc
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/config/TestAlertDedup.java
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.config;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+import org.apache.eagle.dataproc.core.JsonSerDeserUtils;
+
+public class TestAlertDedup {
+
+ @Test
+ public void test() throws Exception{
+ String alertDef = "{\"alertDedupIntervalMin\":\"720\",\"emailDedupIntervalMin\":\"1440\"}";
+ DeduplicatorConfig dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
+ Assert.assertEquals(dedupConfig.getAlertDedupIntervalMin(), 720);
+ Assert.assertEquals(dedupConfig.getEmailDedupIntervalMin(), 1440);
+
+ alertDef = "null";
+ dedupConfig = JsonSerDeserUtils.deserialize(alertDef, DeduplicatorConfig.class);
+ Assert.assertEquals(dedupConfig, null);
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
new file mode 100644
index 0000000..231c57d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestAlertDefinitionDAOImpl.java
@@ -0,0 +1,72 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dao;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import org.apache.eagle.common.config.EagleConfigConstants;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestAlertDefinitionDAOImpl {
+
+ public AlertDefinitionAPIEntity buildTestAlertDefEntity(String site, String programId, String alertExecutorId, String policyId, String policyType) {
+ AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
+ entity.setEnabled(true);
+ Map<String, String> tags = new HashMap<String, String>();
+ tags.put("site", site);
+ tags.put("programId", programId);
+ tags.put("alertExecutorId", alertExecutorId);
+ tags.put("policyId", policyId);
+ tags.put("policyType", policyType);
+ entity.setTags(tags);
+ return entity;
+ }
+
+ @Test
+ public void test() throws Exception{
+ Config config = ConfigFactory.load();
+ String eagleServiceHost = config.getString(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.HOST);
+ int eagleServicePort = config.getInt(EagleConfigConstants.EAGLE_PROPS + "." + EagleConfigConstants.EAGLE_SERVICE + "." + EagleConfigConstants.PORT);
+
+ String site = "sandbox";
+ String dataSource = "UnitTest";
+ AlertDefinitionDAO dao = new AlertDefinitionDAOImpl(eagleServiceHost, eagleServicePort) {
+ @Override
+ public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) throws Exception {
+ List<AlertDefinitionAPIEntity> list = new ArrayList<AlertDefinitionAPIEntity>();
+ list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDA", "TestPolicyTypeA"));
+ list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor1", "TestPolicyIDB", "TestPolicyTypeB"));
+ list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDC", "TestPolicyTypeC"));
+ list.add(buildTestAlertDefEntity(site, dataSource, "TestExecutor2", "TestPolicyIDD", "TestPolicyTypeD"));
+ return list;
+ }
+ };
+
+ Map<String, Map<String, AlertDefinitionAPIEntity>> retMap = dao.findActiveAlertDefsGroupbyAlertExecutorId(site, dataSource);
+
+ Assert.assertEquals(2, retMap.size());
+ Assert.assertEquals(2, retMap.get("TestExecutor1").size());
+ Assert.assertEquals(2, retMap.get("TestExecutor2").size());
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
new file mode 100644
index 0000000..b7f40fa
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestSiddhiStreamMetadataUtils.java
@@ -0,0 +1,60 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dao;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import junit.framework.Assert;
+
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.siddhi.SiddhiStreamMetadataUtils;
+import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.junit.Test;
+
+public class TestSiddhiStreamMetadataUtils {
+ @Test
+ public void test() throws Exception{
+ Config config = ConfigFactory.load();
+ StreamMetadataManager.getInstance().init(config, new AlertStreamSchemaDAO(){
+ @Override
+ public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(
+ String dataSource) {
+ return Arrays.asList(generateStreamMetadataAPIEntity("attrName1", "STRING"),
+ generateStreamMetadataAPIEntity("attrName2", "LONG")
+ );
+ }
+ });
+ String siddhiStreamDef = SiddhiStreamMetadataUtils.convertToStreamDef("testStreamName");
+ Assert.assertEquals("define stream " + "testStreamName" + "(eagleAlertContext object,attrName1 string,attrName2 long);", siddhiStreamDef);
+ StreamMetadataManager.getInstance().reset();
+ }
+
+ private AlertStreamSchemaEntity generateStreamMetadataAPIEntity(final String attrName, String attrType){
+ AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
+ entity.setTags(new HashMap<String, String>(){{
+ put("programId", "testProgramId");
+ put("streamName", "testStreamName");
+ put("attrName", attrName);
+ }});
+ entity.setAttrType(attrType);
+ return entity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
new file mode 100644
index 0000000..80aacd6
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/dao/TestStreamDefinitionDAOImpl.java
@@ -0,0 +1,66 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.dao;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
+import org.apache.eagle.alert.siddhi.StreamMetadataManager;
+import org.junit.Assert;
+import org.junit.Test;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+public class TestStreamDefinitionDAOImpl {
+
+ public AlertStreamSchemaEntity buildTestStreamDefEntity(String programId, String streamName, String attrName) {
+ AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
+ entity.setAttrType("String");
+ entity.setAttrValueResolver("DefaultAttrValueResolver");
+ entity.setCategory("SimpleType");
+ Map<String, String> tags = new HashMap<String, String>();
+ tags.put("programId", programId);
+ tags.put("streamName", streamName);
+ tags.put("attrName", attrName);
+ entity.setTags(tags);
+ return entity;
+ }
+
+ @Test
+ public void test() throws Exception{
+ Config config = ConfigFactory.load();
+ AlertStreamSchemaDAO dao = new AlertStreamSchemaDAOImpl(null, null) {
+ public List<AlertStreamSchemaEntity> findAlertStreamSchemaByDataSource(String dataSource) throws Exception {
+ List<AlertStreamSchemaEntity> list = new ArrayList<AlertStreamSchemaEntity>();
+ String programId = "UnitTest";
+ list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr1"));
+ list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr2"));
+ list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr3"));
+ list.add(buildTestStreamDefEntity(programId, "TestStream", "Attr4"));
+ return list;
+ }
+ };
+
+ StreamMetadataManager.getInstance().init(config, dao);
+ Map<String, List<AlertStreamSchemaEntity>> retMap = StreamMetadataManager.getInstance().getMetadataEntitiesForAllStreams();
+ Assert.assertTrue(retMap.get("TestStream").size() == 4);
+ StreamMetadataManager.getInstance().reset();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
new file mode 100644
index 0000000..091a6a6
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
@@ -0,0 +1,105 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+import org.apache.eagle.alert.dao.AlertDefinitionDAO;
+import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class TestDynamicPolicyLoader {
+ private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
+
+ @Test
+ public void test() throws Exception{
+ System.setProperty("config.resource", "/unittest.conf");
+ Config config = ConfigFactory.load();
+ Map<String, PolicyLifecycleMethods> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods>();
+ policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods() {
+ @Override
+ public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
+ LOG.info("deleted : " + deleted);
+ }
+
+ @Override
+ public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
+ Assert.assertTrue(added.size() == 1);
+ LOG.info("added : " + added);
+ }
+
+ @Override
+ public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
+ Assert.assertTrue(changed.size() == 1);
+ LOG.info("changed :" + changed);
+ }
+ });
+
+ Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
+ initialAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
+ Map<String, AlertDefinitionAPIEntity> map = initialAlertDefs.get("testAlertExecutorId");
+ map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
+ map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
+
+ AlertDefinitionDAO dao = new AlertDefinitionDAO() {
+ @Override
+ public Map<String, Map<String, AlertDefinitionAPIEntity>> findActiveAlertDefsGroupbyAlertExecutorId(
+ String site, String dataSource) {
+ Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
+ currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
+ Map<String, AlertDefinitionAPIEntity> map = currentAlertDefs.get("testAlertExecutorId");
+ map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1_1"));
+ map.put("policyId_2", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_2", "siddhi", "policyDef_2"));
+ return currentAlertDefs;
+ }
+
+ @Override
+ public List<AlertDefinitionAPIEntity> findActiveAlertDefs(String site, String dataSource) {
+ return null;
+ }
+ };
+
+ DynamicPolicyLoader loader = DynamicPolicyLoader.getInstance();
+ loader.init(initialAlertDefs, dao, config);
+
+ try{
+ Thread.sleep(5000);
+ }catch(Exception ex){
+
+ }
+ }
+
+ public AlertDefinitionAPIEntity buildTestAlertDefEntity(String programId, String alertExecutorId, String policyId, String policyType, String policyDef) {
+ AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
+ entity.setEnabled(true);
+ Map<String, String> tags = new HashMap<String, String>();
+ tags.put("programId", programId);
+ tags.put("alertExecutorId", alertExecutorId);
+ tags.put("policyId", policyId);
+ tags.put("policyType", policyType);
+ entity.setTags(tags);
+ entity.setPolicyDef(policyDef);
+ return entity;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
new file mode 100644
index 0000000..367187d
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
@@ -0,0 +1,29 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.policy;
+
+import org.junit.Test;
+
+public class TestPolicyDistribution {
+ @Test
+ public void test(){
+ DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
+ System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
+ System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
+ System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
new file mode 100644
index 0000000..8dcc20b
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
@@ -0,0 +1,243 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import java.lang.reflect.Field;
+
+import org.apache.eagle.executor.AlertExecutor;
+import junit.framework.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+public class TestSiddhiEngine {
+ static final Logger log = LoggerFactory.getLogger(TestSiddhiEngine.class);
+ int alertCount = 0;
+
+ @Test
+ public void TestStrContains() throws Exception {
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "" +
+ "@config(async = 'true') " +
+ "define stream typeStream (cmd string, src string, dst string) ;";
+ String queryString = "" +
+ "@info(name = 'query1') " +
+ "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:contains(dst,'/user/hdfs/.Trash/Current/tmp/pii')==true)] " +
+ "select cmd, src, dst " +
+ "insert into outputStream ;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+ QueryCallback callback = new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ alertCount++;
+ }
+ };
+ executionPlanRuntime.addCallback("query1", callback);
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/tmp/pii"});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ executionPlanRuntime.shutdown();
+ }
+
+ @Test
+ public void TestRegexp() throws Exception {
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "" +
+ "@config(async = 'true') " +
+ "define stream typeStream (str string, other string, num double) ;";
+ String queryString = "" +
+ "@info(name = 'query1') " +
+ "from typeStream " +
+ "select str as str1, other as other1 , num as num1, count(num) as number " +
+ "having str:regexp(str1, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " +
+ "insert into outputStream ;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+ QueryCallback callback = new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ alertCount++;
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ }
+ };
+
+ executionPlanRuntime.addCallback("query1", callback);
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"/usr/data/000/001/002", "other", 1.0});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ executionPlanRuntime.shutdown();
+ }
+
+ @Test
+ public void TestStrEqualsIgnoreCase() throws Exception {
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
+ String queryString = "" +
+ "@info(name = 'query1') " +
+ "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:equalsIgnoreCase(dst,'/user/hdfs/.TRAsh/current/TMP/PII')==true)] " +
+ "select cmd, src, dst " +
+ "insert into outputStream ;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+ QueryCallback callback = new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ alertCount++;
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ }
+ };
+
+ executionPlanRuntime.addCallback("query1", callback);
+
+ Field field = QueryCallback.class.getDeclaredField("query");
+ field.setAccessible(true);
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash/Current/TMP/pii"}); // match case
+ inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash///Current/TMP/pii"}); //non-match case
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ executionPlanRuntime.shutdown();
+ }
+
+ @Test
+ public void TestStrContainsIgnoreCase() throws Exception {
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
+ String queryString = "" +
+ "@info(name = 'query1') " +
+ "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:containsIgnoreCase(dst,'.TRASH/CURRENT/tMp/pII')==true)] " +
+ "select cmd, src, dst " +
+ "insert into outputStream ;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+ QueryCallback callback = new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ alertCount++;
+ }
+ };
+
+ executionPlanRuntime.addCallback("query1", callback);
+
+ Field field = QueryCallback.class.getDeclaredField("query");
+ field.setAccessible(true);
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/TMP/pii"}); // match case
+ inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash///Current/TMP/pii"}); //non-match case
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ executionPlanRuntime.shutdown();
+ }
+
+ @Test
+ public void TestRegexpIgnoreCase() throws Exception {
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "define stream typeStream (str string, other string, num double) ;";
+ String queryString = "" +
+ "@info(name = 'query1') " +
+ "from typeStream " +
+ "select str as str1, other as other1 , num as num1, count(num) as number " +
+ "having str:regexpIgnoreCase(str1, '/usr/DATA/[0-9]+/[0-9]+/[0-9]+') == true " +
+ "insert into outputStream ;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+ QueryCallback callback = new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ alertCount++;
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ }
+ };
+ executionPlanRuntime.addCallback("query1", callback);
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"/USR/data/000/001/002", "other", 1.0});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ executionPlanRuntime.shutdown();
+ }
+
+ @Test
+ public void TestDataObject() throws Exception {
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "" +
+ "@config(async = 'true') " +
+ "define stream typeStream (dataobj object, str string, first string) ;";
+ String queryString = "" +
+ "@info(name = 'query1') " +
+ "from typeStream " +
+ "select * " +
+ "having str:regexp(str, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " +
+ "insert into outputStream ;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
+
+ QueryCallback callback = new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ alertCount++;
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ }
+ };
+
+ executionPlanRuntime.addCallback("query1", callback);
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{new AlertExecutor(queryString, null, 0, 1, null, null), "/usr/data/000/001/002", "second"});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ executionPlanRuntime.shutdown();
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
new file mode 100644
index 0000000..521317c
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
@@ -0,0 +1,129 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.query.output.callback.QueryCallback;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.util.EventPrinter;
+
+import org.apache.eagle.common.DateTimeUtil;
+
+public class TestSiddhiSlideWindow {
+
+ int alertCount = 0;
+
+ @Test
+ public void testSlideWindow1() throws Exception{
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "define stream eventStream (user string, path string, cmd string);";
+// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.time(1 sec)"
+// + " select user, path, cmd, count(path) as cnt"
+// + " group by user"
+// + " having cnt > 3 insert all events into outputStream;";
+
+// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.length(10)"
+// + " select user, path, cmd, count(path) as cnt"
+// + " group by user"
+// + " having cnt > 3 insert all events into outputStream;";
+
+// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.timeBatch(1 sec)"
+// + " select user, path, cmd, count(path) as cnt"
+// + " group by user"
+// + " having cnt > 3 insert all events into outputStream;";
+
+ String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.lengthBatch(10)"
+ + " select user, path, cmd, count(path) as cnt"
+ + " group by user"
+ + " having cnt > 3 insert all events into outputStream;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+
+ executionPlanRuntime.addCallback("query1", new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ alertCount++;
+ }
+ });
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
+ executionPlanRuntime.start();
+ inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/0000", "open"});
+ inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/1111", "open"});
+ inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/2222", "open"});
+ inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/3333", "open"});
+
+ inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/0000", "open"});
+ inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/1111", "open"});
+
+ inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/0000", "open"});
+ inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/1111", "open"});
+ inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/2222", "open"});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 0);
+ inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/3333", "open"});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ executionPlanRuntime.shutdown();
+ }
+
+ @Test
+ public void testSlideWindow2() throws Exception{
+ alertCount = 0;
+ SiddhiManager siddhiManager = new SiddhiManager();
+
+ String cseEventStream = "define stream eventStream (timeStamp long, user string, path string, cmd string);";
+ String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.externalTime(timeStamp,1 sec)"
+ + " select user, path, cmd, count(path) as cnt"
+ + " group by user"
+ + " having cnt > 3 insert all events into outputStream;";
+
+ ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
+
+ executionPlanRuntime.addCallback("query1", new QueryCallback() {
+ @Override
+ public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
+ alertCount++;
+ EventPrinter.print(timeStamp, inEvents, removeEvents);
+ }
+ });
+
+ InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
+ executionPlanRuntime.start();
+ long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
+ inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/0000", "open"});
+ Thread.sleep(1100);
+ inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/1111", "open"});
+ Thread.sleep(100);
+ inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/2222", "open"});
+ Thread.sleep(100);
+ inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/3333", "open"});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 1);
+ inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/5555", "open"});
+ Thread.sleep(100);
+ Assert.assertTrue(alertCount == 2);
+ executionPlanRuntime.shutdown();
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
new file mode 100644
index 0000000..beb790a
--- /dev/null
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.siddhi;
+
+import org.junit.Assert;
+import org.junit.Test;
+
+
+public class TestSiddhiStream {
+
+ @Test
+ public void test() {
+ String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,sensitivityType insert into outputStream;";
+ Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
+
+ rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select * insert into outputStream;";
+ Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/afe86834/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
index 7ad0018..576001b 100644
--- a/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
+++ b/eagle-core/eagle-alert/eagle-alert-process/src/test/resources/str.siddhiext
@@ -30,6 +30,6 @@ trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
hex=org.wso2.siddhi.extension.string.HexFunctionExtension
unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-equalsIgnoreCase=eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file
+equalsIgnoreCase=org.apache.eagle.alert.siddhi.extension.EqualsIgnoreCaseExtension
+containsIgnoreCase=org.apache.eagle.alert.siddhi.extension.ContainsIgnoreCaseExtension
+regexpIgnoreCase=org.apache.eagle.alert.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file