You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by yo...@apache.org on 2016/08/14 06:23:10 UTC
[10/13] incubator-eagle git commit: EAGLE-341 clean inner process
alert engine code clean inner process alert engine code
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
deleted file mode 100644
index dd36cac..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestDynamicPolicyLoader.java
+++ /dev/null
@@ -1,113 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.policy.DynamicPolicyLoader;
-import org.apache.eagle.policy.PolicyLifecycleMethods;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-public class TestDynamicPolicyLoader {
- private final static Logger LOG = LoggerFactory.getLogger(TestDynamicPolicyLoader.class);
-
- @Test
- public void test() throws Exception{
- System.setProperty("config.resource", "/unittest.conf");
- Config config = ConfigFactory.load();
- Map<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>> policyChangeListeners = new HashMap<String, PolicyLifecycleMethods<AlertDefinitionAPIEntity>>();
- policyChangeListeners.put("testAlertExecutorId", new PolicyLifecycleMethods<AlertDefinitionAPIEntity>() {
- @Override
- public void onPolicyDeleted(Map<String, AlertDefinitionAPIEntity> deleted) {
- LOG.info("deleted : " + deleted);
- }
-
- @Override
- public void onPolicyCreated(Map<String, AlertDefinitionAPIEntity> added) {
- Assert.assertTrue(added.size() == 1);
- LOG.info("added : " + added);
- }
-
- @Override
- public void onPolicyChanged(Map<String, AlertDefinitionAPIEntity> changed) {
- Assert.assertTrue(changed.size() == 1);
- LOG.info("changed :" + changed);
- }
- });
-
- Map<String, Map<String, AlertDefinitionAPIEntity>> initialAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
- initialAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
- Map<String, AlertDefinitionAPIEntity> map = initialAlertDefs.get("testAlertExecutorId");
- map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1"));
- map.put("policyId_3", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_3", "siddhi", "policyDef_3"));
-
- PolicyDefinitionDAO<AlertDefinitionAPIEntity> dao = new PolicyDefinitionDAO<AlertDefinitionAPIEntity>() {
- @Override
- public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(
- String site, String dataSource) {
- Map<String, Map<String, AlertDefinitionAPIEntity>> currentAlertDefs = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
- currentAlertDefs.put("testAlertExecutorId", new HashMap<String, AlertDefinitionAPIEntity>());
- Map<String, AlertDefinitionAPIEntity> map = currentAlertDefs.get("testAlertExecutorId");
- map.put("policyId_1", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_1", "siddhi", "policyDef_1_1"));
- map.put("policyId_2", buildTestAlertDefEntity("testProgramId", "testAlertExecutorId", "policyId_2", "siddhi", "policyDef_2"));
- return currentAlertDefs;
- }
-
- @Override
- public List<AlertDefinitionAPIEntity> findActivePolicies(String site, String dataSource) {
- return null;
- }
-
- @Override
- public void updatePolicyDetails(AlertDefinitionAPIEntity entity) { /* do nothing */ }
- };
-
- DynamicPolicyLoader<AlertDefinitionAPIEntity> loader = DynamicPolicyLoader.getInstanceOf(AlertDefinitionAPIEntity.class);
- loader.init(initialAlertDefs, dao, config);
-
- try{
- Thread.sleep(5000);
- }catch(Exception ex){
-
- }
- }
-
- public AlertDefinitionAPIEntity buildTestAlertDefEntity(String programId, String alertExecutorId, String policyId, String policyType, String policyDef) {
- AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
- entity.setEnabled(true);
- Map<String, String> tags = new HashMap<String, String>();
- tags.put("programId", programId);
- tags.put("alertExecutorId", alertExecutorId);
- tags.put("policyId", policyId);
- tags.put("policyType", policyType);
- entity.setTags(tags);
- entity.setPolicyDef(policyDef);
- return entity;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
deleted file mode 100644
index 77aaec3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistribution.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.junit.Test;
-
-public class TestPolicyDistribution {
- @Test
- public void test(){
- DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
- System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
- System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
- System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
deleted file mode 100644
index 5fd374e..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyDistributionUpdater.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.policy;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-import org.apache.eagle.policy.siddhi.StreamMetadataManager;
-import org.junit.Assert;
-import org.apache.eagle.alert.entity.AbstractPolicyDefinitionEntity;
-import org.apache.eagle.alert.entity.AlertDefinitionAPIEntity;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.policy.dao.AlertStreamSchemaDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionDAO;
-import org.apache.eagle.policy.dao.PolicyDefinitionEntityDAOImpl;
-import org.apache.eagle.service.client.EagleServiceConnector;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-public class TestPolicyDistributionUpdater {
- private static Logger LOG = LoggerFactory.getLogger(TestPolicyDistributionUpdater.class);
-
- @Test
- public void testPolicyDistributionReporter() throws Exception{
- StreamMetadataManager.getInstance().reset();
- PolicyDefinitionDAO alertDao = new PolicyDefinitionEntityDAOImpl(new EagleServiceConnector(null, 1),
- Constants.ALERT_DEFINITION_SERVICE_ENDPOINT_NAME) {
- @Override
- public Map<String, Map<String, AlertDefinitionAPIEntity>> findActivePoliciesGroupbyExecutorId(String site, String dataSource) throws Exception {
- final AlertDefinitionAPIEntity entity = new AlertDefinitionAPIEntity();
- entity.setTags(new HashMap<String, String>() {{
- put(Constants.POLICY_TYPE, "siddhiCEPEngine");
- put(Constants.POLICY_ID, "policyId_1");
- }});
- Map<String, Map<String, AlertDefinitionAPIEntity>> map = new HashMap<String, Map<String, AlertDefinitionAPIEntity>>();
- map.put("alertExecutorId_1", new HashMap<String, AlertDefinitionAPIEntity>() {{
- put("policyId_1", entity);
- }});
- entity.setPolicyDef("{\"type\":\"siddhiCEPEngine\",\"expression\":\"from testStream select name insert into outputStream ;\"}");
- return map;
- }
-
- @Override
- public void updatePolicyDetails(AbstractPolicyDefinitionEntity entity) { /* do nothing */ }
- };
-
- AlertExecutor alertExecutor = new AlertExecutor("alertExecutorId_1", new DefaultPolicyPartitioner(), 1, 0, alertDao, new String[]{"testStream"}){
- public AlertStreamSchemaDAO getAlertStreamSchemaDAO(Config config){
- return new AlertStreamSchemaDAO(){
- @Override
- public List<AlertStreamSchemaEntity> findAlertStreamSchemaByApplication(String application) throws Exception {
- AlertStreamSchemaEntity entity = new AlertStreamSchemaEntity();
- entity.setTags(new HashMap<String, String>(){{
- put("application", "UnitTest");
- put("streamName", "testStream");
- put("attrName", "name");
- }});
- entity.setAttrType("string");
- return Arrays.asList(entity);
- }
- };
- }
-
- @Override
- public void report() {
- Assert.assertEquals(1, getPolicyEvaluators().size());
- LOG.info("successuflly reported");
- }
- };
-
- Config config = ConfigFactory.load();
- alertExecutor.prepareConfig(config);
- alertExecutor.init();
- Thread.sleep(100);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
deleted file mode 100644
index c3bc4c9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/policy/TestPolicyPartitioner.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.policy;
-
-import org.apache.eagle.policy.DefaultPolicyPartitioner;
-import org.junit.Test;
-
-public class TestPolicyPartitioner {
- @Test
- public void test(){
- DefaultPolicyPartitioner p = new DefaultPolicyPartitioner();
- System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII"));
- System.out.println(p.partition(2, "siddhiCEPEngine", "readtmpPII2"));
- System.out.println(p.partition(2, "siddhiCEPEngine", "tmpPIIpii"));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
deleted file mode 100644
index e289793..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestExternalBatchWindow.java
+++ /dev/null
@@ -1,307 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.text.MessageFormat;
-import java.util.Calendar;
-import java.util.Date;
-import java.util.Random;
-import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.stream.output.StreamCallback;
-
-/**
- * @since Dec 23, 2015
- *
- */
-public class TestExternalBatchWindow {
-
- private static SiddhiManager siddhiManager;
-
- @BeforeClass
- public static void beforeClass() {
- siddhiManager = new SiddhiManager();
- }
-
- @AfterClass
- public static void afterClass() {
- siddhiManager.shutdown();
- }
-
- @Test
- public void test02NoMsg() throws Exception {
- ExecutionPlanRuntime runtime = simpleQueryRuntime();
-
- final AtomicBoolean recieved = new AtomicBoolean();
- runtime.addCallback("query", new QueryCallback() {
-
- @Override
- public void receive(long arg0, Event[] arg1, Event[] arg2) {
- recieved.set(true);
- System.out.println(arg1);
- }
- });
-
- InputHandler input = runtime.getInputHandler("jmxMetric");
-
- runtime.start();
- // external events' time stamp less than the window, should not have event recieved in call back.
- long now = System.currentTimeMillis();
- int length = 5;
- for (int i = 0; i < length; i++) {
- input.send(new Object[] { 15, now + i * 1000 });
- }
-
- Thread.sleep(1000);
- Assert.assertFalse("Event happens inner external time batch window, should not have event recieved in callback!", recieved.get());
-
- runtime.shutdown();
- }
-
- private ExecutionPlanRuntime simpleQueryRuntime() {
- String query = "define stream jmxMetric(cpu int, timestamp long); "
- + "@info(name='query')"
- + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
- + "select avg(cpu) as avgCpu, count(1) as count insert into tmp;";
-
- return siddhiManager.createExecutionPlanRuntime(query);
- }
-
- /**
- * This case try to capture the case that the window get a chunk of event that exceed the time batch.
- * In this case, two next processor should be triggered.
- */
- @Test
- public void test03BunchChunkExceedBatch() {
- // TODO
- }
- @Test
- public void test04MultiThread() {
- // TODO
- }
-
- @Test
- public void test05ExternalJoin() {
- // TODO
- }
-
- @Test
- public void test06EdgeCase() throws Exception {
- // every 10 sec
- ExecutionPlanRuntime runtime = simpleQueryRuntime();
-
- final AtomicInteger recCount = new AtomicInteger(0);
-// final CountDownLatch latch = new CountDownLatch(2);// for debug
- runtime.addCallback("query", new QueryCallback() {
- @Override
- public void receive(long arg0, Event[] arg1, Event[] arg2) {
-// latch.countDown();
- Assert.assertEquals(1, arg1.length);
- recCount.incrementAndGet();
- int avgCpu = ((Double) arg1[0].getData()[0]).intValue();
- if (recCount.get() == 1) {
- Assert.assertEquals(15, avgCpu);
- } else if (recCount.get() == 2) {
- Assert.assertEquals(85, avgCpu);
- }
- int count = ((Long) arg1[0].getData()[1]).intValue();
- Assert.assertEquals(3, count);
- }
- });
-
- InputHandler input = runtime.getInputHandler("jmxMetric");
- runtime.start();
- // external events' time stamp less than the window, should not have event recieved in call back.
- long now = 0;
- int length = 3;
- for (int i = 0; i < length; i++) {
- input.send(new Object[] { 15, now + i * 10 });
- }
-
- // second round
- // if the trigger event mix with the last window, we should see the avgValue is not expected
- for (int i = 0; i < length; i++) {
- input.send(new Object[] { 85, now + 10000 + i * 10 }); // the first entity of the second round
- }
- // to trigger second round
- input.send(new Object[] { 10000, now + 10 * 10000 });
-
-// latch.await();// for debug
-
- Thread.sleep(1000);
-
- Assert.assertEquals(2, recCount.get());
- }
-
- @Test
- public void test07Pull76() throws Exception {
- String defaultStream = "define stream LoginEvents (myTime long, ip string, phone string,price int);";
-
- String query = " @info(name='pull76') "
- + " from LoginEvents#window.externalTimeBatch(myTime, 5 sec) "
- + " select myTime, phone, ip, price, count(ip) as cntip , "
- + " min(myTime) as mintime, max(myTime) as maxtime "
- + " insert into events ;";
-
- ExecutionPlanRuntime runtime = siddhiManager.createExecutionPlanRuntime(defaultStream + query);
-
- InputHandler inputHandler = runtime.getInputHandler("LoginEvents");
-
- runtime.addCallback("pull76", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- if (inEvents != null) {
- System.out.println("======================== START ===============================");
- int i = 0;
- System.out.println(" Events Size:" + inEvents.length);
- for (i = 0; i < inEvents.length; i++) {
- Event e = inEvents[i];
- System.out.println("----------------------------");
- System.out.println(new Date((Long) e.getData(0)));
- System.out.println("IP:" + e.getData(2));
- System.out.println("price :" + e.getData(3));
- System.out.println("count :" + e.getData(4));
- System.out.println("mintime :" + new Date((Long) e.getData(5)) );
- System.out.println("maxtime :" + new Date((Long) e.getData(6)) );
- System.out.println("----------------------------");
- }
- System.out.println("======================== END ===============================");
-
- }
- }
- });
-
-
- runtime.start();
-
- long start = System.currentTimeMillis();
- Calendar c = Calendar.getInstance();
- c.add(Calendar.HOUR, 1);
- c.add(Calendar.SECOND, 1);
- int i = 0;
- for (i = 0; i <= 10000; i++) {
- c.add(Calendar.SECOND, 1);
- inputHandler.send(c.getTime().getTime(),
- new Object[] { c.getTime().getTime(), new String("192.10.1.1"), "1", new Random().nextInt(1000) });
- }
- long end = System.currentTimeMillis();
- System.out.printf("End : %d ", end - start);
-
- Thread.sleep(1000);
- runtime.shutdown();
- }
-
- @Test
- public void test01DownSampling() throws Exception {
- String stream = "define stream jmxMetric(cpu int, memory int, bytesIn long, bytesOut long, timestamp long);";
- String query = "@info(name = 'downSample') "
- + "from jmxMetric#window.externalTimeBatch(timestamp, 10 sec) "
- + "select "
- + "avg(cpu) as avgCpu, max(cpu) as maxCpu, min(cpu) as minCpu, "
- + " '|' as s, "
- + " avg(memory) as avgMem, max(memory) as maxMem, min(memory) as minMem, "
- + " '|' as s1, "
- + " avg(bytesIn) as avgBytesIn, max(bytesIn) as maxBytesIn, min(bytesIn) as minBytesIn, "
- + " '|' as s2, "
- + " avg(bytesOut) as avgBytesOut, max(bytesOut) as maxBytesOut, min(bytesOut) as minBytesOut, "
- + " '|' as s3, "
- + " timestamp as timeWindowEnds, "
- + " '|' as s4, "
- + " count(1) as metric_count "
- + " INSERT INTO tmp;";
-
- SiddhiManager sm = new SiddhiManager();
- ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
-
- InputHandler input = plan.getInputHandler("jmxMetric");
-
- // stream call back doesn't follow the counter
- final AtomicInteger counter = new AtomicInteger();
- {
- // stream callback
- plan.addCallback("jmxMetric", new StreamCallback() {
- @Override
- public void receive(Event[] arg0) {
- counter.addAndGet(arg0.length);
- }
- });
- }
- final AtomicInteger queryWideCounter = new AtomicInteger();
- {
- plan.addCallback("downSample", new QueryCallback() {
- @Override
- public void receive(long arg0, Event[] inevents, Event[] removeevents) {
- int currentCount = queryWideCounter.addAndGet(inevents.length);
- System.out.println(MessageFormat.format("Round {0} ====", currentCount));
- System.out.println(" events count " + inevents.length);
-
- for (Event e : inevents) {
- Object[] tranformedData = e.getData();
- for (Object o : tranformedData) {
- System.out.print(o);
- System.out.print(' ');
- }
- System.out.println(" events endendend");
- }
- }
-
- });
- }
-
- plan.start();
-
- int round = 4;
- int eventsPerRound= 0;
- long externalTs = System.currentTimeMillis();
- for (int i = 0; i < round; i++) {
- eventsPerRound = sendEvent(input, i, externalTs);
- Thread.sleep(3000);
- }
- //
- sendEvent(input, round, externalTs);
-
- plan.shutdown();
- Thread.sleep(1000);
- Assert.assertEquals(round * eventsPerRound + eventsPerRound, counter.get());
- Assert.assertEquals(round, queryWideCounter.get());
- }
-
- // one round of sending events
- private int sendEvent(InputHandler input, int ite, long externalTs) throws Exception {
- int len = 3;
- Event[] events = new Event[len];
- for (int i = 0; i < len; i++) {
- // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
- events[i] = new Event(externalTs,
- new Object[] { 15 + 10 * i * ite, 1500 + 10 * i * ite, 1000L, 2000L, externalTs + ite * 10000 + i * 50 });
- }
-
- input.send(events);
- return len;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
deleted file mode 100644
index 924ba5c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiEngine.java
+++ /dev/null
@@ -1,243 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import java.lang.reflect.Field;
-
-import org.apache.eagle.alert.executor.AlertExecutor;
-import org.junit.Assert;
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-public class TestSiddhiEngine {
- static final Logger log = LoggerFactory.getLogger(TestSiddhiEngine.class);
- int alertCount = 0;
-
- @Test
- public void TestStrContains() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "" +
- "@config(async = 'true') " +
- "define stream typeStream (cmd string, src string, dst string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:contains(dst,'/user/hdfs/.Trash/Current/tmp/pii')==true)] " +
- "select cmd, src, dst " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- alertCount++;
- }
- };
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/tmp/pii"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestRegexp() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "" +
- "@config(async = 'true') " +
- "define stream typeStream (str string, other string, num double) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream " +
- "select str as str1, other as other1 , num as num1, count(num) as number " +
- "having str:regexp(str1, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"/usr/data/000/001/002", "other", 1.0});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestStrEqualsIgnoreCase() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:equalsIgnoreCase(dst,'/user/hdfs/.TRAsh/current/TMP/PII')==true)] " +
- "select cmd, src, dst " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- Field field = QueryCallback.class.getDeclaredField("query");
- field.setAccessible(true);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash/Current/TMP/pii"}); // match case
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/HDFS/.Trash///Current/TMP/pii"}); //non-match case
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestStrContainsIgnoreCase() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream typeStream (cmd string, src string, dst string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream[(cmd == 'rename') and (src == '/tmp/pii') and (str:containsIgnoreCase(dst,'.TRASH/CURRENT/tMp/pII')==true)] " +
- "select cmd, src, dst " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- alertCount++;
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- Field field = QueryCallback.class.getDeclaredField("query");
- field.setAccessible(true);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash/Current/TMP/pii"}); // match case
- inputHandler.send(new Object[]{"rename", "/tmp/pii", "/user/hdfs/.Trash///Current/TMP/pii"}); //non-match case
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestRegexpIgnoreCase() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream typeStream (str string, other string, num double) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream " +
- "select str as str1, other as other1 , num as num1, count(num) as number " +
- "having str:regexpIgnoreCase(str1, '/usr/DATA/[0-9]+/[0-9]+/[0-9]+') == true " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"/USR/data/000/001/002", "other", 1.0});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void TestDataObject() throws Exception {
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "" +
- "@config(async = 'true') " +
- "define stream typeStream (dataobj object, str string, first string) ;";
- String queryString = "" +
- "@info(name = 'query1') " +
- "from typeStream " +
- "select * " +
- "having str:regexp(str, '/usr/data/[0-9]+/[0-9]+/[0-9]+') == true " +
- "insert into outputStream ;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + queryString);
-
- QueryCallback callback = new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- };
-
- executionPlanRuntime.addCallback("query1", callback);
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("typeStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{new AlertExecutor(queryString, null, 0, 1, null, null), "/usr/data/000/001/002", "second"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
deleted file mode 100644
index 521317c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiSlideWindow.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-import org.apache.eagle.common.DateTimeUtil;
-
-public class TestSiddhiSlideWindow {
-
- int alertCount = 0;
-
- @Test
- public void testSlideWindow1() throws Exception{
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream eventStream (user string, path string, cmd string);";
-// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.time(1 sec)"
-// + " select user, path, cmd, count(path) as cnt"
-// + " group by user"
-// + " having cnt > 3 insert all events into outputStream;";
-
-// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.length(10)"
-// + " select user, path, cmd, count(path) as cnt"
-// + " group by user"
-// + " having cnt > 3 insert all events into outputStream;";
-
-// String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.timeBatch(1 sec)"
-// + " select user, path, cmd, count(path) as cnt"
-// + " group by user"
-// + " having cnt > 3 insert all events into outputStream;";
-
- String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.lengthBatch(10)"
- + " select user, path, cmd, count(path) as cnt"
- + " group by user"
- + " having cnt > 3 insert all events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- alertCount++;
- }
- });
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/0000", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/1111", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/2222", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/0000/0000/3333", "open"});
-
- inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/0000", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/1111/0000/1111", "open"});
-
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/0000", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/1111", "open"});
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/2222", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 0);
- inputHandler.send(new Object[]{"user", "/usr/data/2222/0000/3333", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void testSlideWindow2() throws Exception{
- alertCount = 0;
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream eventStream (timeStamp long, user string, path string, cmd string);";
- String query = "@info(name = 'query1') from eventStream[cmd == 'open' AND str:regexp(path, '/usr/data/[0-9]+/[0-9]+/[0-9]+')]#window.externalTime(timeStamp,1 sec)"
- + " select user, path, cmd, count(path) as cnt"
- + " group by user"
- + " having cnt > 3 insert all events into outputStream;";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- alertCount++;
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("eventStream");
- executionPlanRuntime.start();
- long curTime = DateTimeUtil.humanDateToMilliseconds("2015-09-17 00:00:00,000");
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/0000", "open"});
- Thread.sleep(1100);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/1111", "open"});
- Thread.sleep(100);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/2222", "open"});
- Thread.sleep(100);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/3333", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 1);
- inputHandler.send(new Object[]{curTime, "user", "/usr/data/0000/0000/5555", "open"});
- Thread.sleep(100);
- Assert.assertTrue(alertCount == 2);
- executionPlanRuntime.shutdown();
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
deleted file mode 100644
index 0027bce..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/siddhi/TestSiddhiStream.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.siddhi;
-
-import org.apache.eagle.policy.siddhi.SiddhiPolicyEvaluator;
-import org.junit.Assert;
-import org.junit.Test;
-
-
-public class TestSiddhiStream {
-
- @Test
- public void test() {
- String rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select user,timestamp,resource,define stream hdfsAuditLogEventStream(eagleAlertContext object, allowed string,cmd string,dst string,host string,securityZone string,sensitivityType string,src string,timestamp long,user string); @info(name = 'query') from hdfsAuditLogEventStream[cmd=='open'] select * insert into outputStream ; insert into outputStream;";
- Assert.assertFalse(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
-
- rule = "from hiveAccessLogStream[sensitivityType=='PHONE_NUMBER'] select * insert into outputStream;";
- Assert.assertTrue(SiddhiPolicyEvaluator.addContextFieldIfNotExist(rule).equals(rule));
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
deleted file mode 100644
index 71f9691..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestAggregation.java
+++ /dev/null
@@ -1,98 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import java.util.concurrent.atomic.AtomicInteger;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-
-import org.wso2.siddhi.core.util.EventPrinter;
-
-public class TestAggregation {
- @Test
- public void test01DownSampling() throws Exception {
- String stream = "define stream jmxMetric(cpu double, memory int, bytesIn int, bytesOut long, timestamp long);";
- String query = "@info(name = 'downSample') "
- + "from jmxMetric#window.timeBatch(1 sec) "
- + "select "
- + " min(cpu) as minCpu, max(cpu) as maxCpu, avg(cpu) as avgCpu, "
- + " min(memory) as minMem, max(memory) as maxMem, avg(memory) as avgMem, "
- + " min(bytesIn) as minBytesIn, max(bytesIn) as maxBytesIn, avg(bytesIn) as avgBytesIn, sum(bytesIn) as totalBytesIn, "
- + " min(bytesOut) as minBytesOut, max(bytesOut) as maxBytesOut, avg(bytesOut) as avgBytesOut, sum(bytesOut) as totalBytesOut, "
- + " timestamp as timeWindowEnds "
- + " INSERT INTO tmp;";
-
- SiddhiManager sm = new SiddhiManager();
- ExecutionPlanRuntime plan = sm.createExecutionPlanRuntime(stream + query);
-
- final AtomicInteger counter = new AtomicInteger();
- plan.addCallback("downSample", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- int count = counter.incrementAndGet();
- if (count == 1) {
- Assert.assertEquals(6000L, inEvents[0].getData(9));
- } else if(count == 2) {
- Assert.assertEquals(6000L, inEvents[0].getData(9));
- }
- }
- });
- InputHandler input = plan.getInputHandler("jmxMetric");
-
- plan.start();
- sendEvent(input);
- Thread.sleep(100);
- sendEvent(input);
- Thread.sleep(1000);
- sendEvent(input);
- Thread.sleep(1000);
- sendEvent(input);
- Thread.sleep(200);
- plan.shutdown();
- }
-
- // send 3 events
- private void sendEvent(InputHandler input) throws Exception {
- int len = 3;
- Event[] events = new Event[len];
- for (int i = 0; i < len; i++) {
- long externalTs = System.currentTimeMillis();
- // cpu int, memory int, bytesIn long, bytesOut long, timestamp long
- events[i] = new Event(externalTs + i, new Object[] {
- 15.0,
- 15,
- 1000,
- 2000L,
- externalTs + i
- });
- }
-
- for (Event e : events) {
- input.send(e);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
deleted file mode 100644
index e0be82c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/java/org/apache/eagle/alert/state/TestSiddhiExpiredEvents.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- *
- * * Licensed to the Apache Software Foundation (ASF) under one or more
- * * contributor license agreements. See the NOTICE file distributed with
- * * this work for additional information regarding copyright ownership.
- * * The ASF licenses this file to You under the Apache License, Version 2.0
- * * (the "License"); you may not use this file except in compliance with
- * * the License. You may obtain a copy of the License at
- * *
- * * http://www.apache.org/licenses/LICENSE-2.0
- * *
- * * Unless required by applicable law or agreed to in writing, software
- * * distributed under the License is distributed on an "AS IS" BASIS,
- * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * * See the License for the specific language governing permissions and
- * * limitations under the License.
- *
- */
-
-package org.apache.eagle.alert.state;
-
-import org.junit.Assert;
-import org.junit.Test;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-import org.wso2.siddhi.core.event.Event;
-import org.wso2.siddhi.core.query.output.callback.QueryCallback;
-import org.wso2.siddhi.core.stream.input.InputHandler;
-import org.wso2.siddhi.core.util.EventPrinter;
-
-/**
- * Created by yonzhang on 11/25/15.
- */
-public class TestSiddhiExpiredEvents {
- @Test
- public void testExpiredEventsInLengthWindow() throws Exception{
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream TempStream (user string, cmd string);";
- String query = "@info(name = 'query1') from TempStream#window.length(3) "
- + " select *"
- + " insert all events into DelayedTempStream";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"user", "open1"});
- inputHandler.send(new Object[]{"user", "open2"});
- inputHandler.send(new Object[]{"user", "open3"});
- inputHandler.send(new Object[]{"user", "open4"});
- inputHandler.send(new Object[]{"user", "open5"});
- inputHandler.send(new Object[]{"user", "open6"});
- Thread.sleep(1000);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void testExpiredEventsInLengthBatchWindow() throws Exception{
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream TempStream (user string, cmd string);";
- String query = "@info(name = 'query1') from TempStream#window.lengthBatch(2) "
- + " select *"
- + " insert all events into DelayedTempStream";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"user", "open1"});
- inputHandler.send(new Object[]{"user", "open2"});
- inputHandler.send(new Object[]{"user", "open3"});
- inputHandler.send(new Object[]{"user", "open4"});
- inputHandler.send(new Object[]{"user", "open5"});
- inputHandler.send(new Object[]{"user", "open6"});
- Thread.sleep(1000);
- executionPlanRuntime.shutdown();
- }
-
- @Test
- public void testExpireEvents2() throws Exception{
- SiddhiManager siddhiManager = new SiddhiManager();
-
- String cseEventStream = "define stream TempStream (user string, cmd string);";
- String query = "@info(name = 'query1') from TempStream#window.length(4) "
- + " select user, cmd, count(user) as cnt " +
- " group by user " +
- "having cnt > 2 "
- + " insert all events into DelayedTempStream";
-
- ExecutionPlanRuntime executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(cseEventStream + query);
-
- executionPlanRuntime.addCallback("query1", new QueryCallback() {
- @Override
- public void receive(long timeStamp, Event[] inEvents, Event[] removeEvents) {
- EventPrinter.print(timeStamp, inEvents, removeEvents);
- }
- });
-
- InputHandler inputHandler = executionPlanRuntime.getInputHandler("TempStream");
- executionPlanRuntime.start();
- inputHandler.send(new Object[]{"user", "open1"});
- inputHandler.send(new Object[]{"user", "open2"});
- inputHandler.send(new Object[]{"user", "open3"});
- inputHandler.send(new Object[]{"user", "open4"});
- inputHandler.send(new Object[]{"user", "open5"});
-// inputHandler.send(new Object[]{"user", "open6"});
-// inputHandler.send(new Object[]{"user", "open7"});
-// inputHandler.send(new Object[]{"user", "open8"});
-// inputHandler.send(new Object[]{"user", "open9"});
- Thread.sleep(1000);
- executionPlanRuntime.shutdown();
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
deleted file mode 100644
index 524b867..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/application.conf
+++ /dev/null
@@ -1,33 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- "eagleProps" : {
- "site" : "sandbox",
- "application" : "UnitTest",
- "eagleService": {
- "host": "localhost",
- "port": 38080,
- "username": "admin",
- "password": "secret"
- }
- },
- "dynamicConfigSource" : {
- "enabled" : true,
- "initDelayMillis" : 0,
- "delayMillis" : 30000,
- "ignoreDeleteFromSource" : true
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
deleted file mode 100644
index 71a5dac..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/log4j.properties
+++ /dev/null
@@ -1,34 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-log4j.rootLogger=INFO, DRFA, stdout
-eagle.log.dir=./logs
-eagle.log.file=eagle.log
-
-# standard output
-log4j.appender.stdout=org.apache.log4j.ConsoleAppender
-log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
-log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
-
-# Daily Rolling File Appender
-log4j.appender.DRFA=org.apache.log4j.DailyRollingFileAppender
-log4j.appender.DRFA.File=${eagle.log.dir}/${eagle.log.file}
-log4j.appender.DRFA.DatePattern=.yyyy-MM-dd
-# 30-day backup
-#log4j.appender.DRFA.MaxBackupIndex=30
-log4j.appender.DRFA.layout=org.apache.log4j.PatternLayout
-
-# Pattern format: Date LogLevel LoggerName LogMessage
-log4j.appender.DRFA.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
deleted file mode 100644
index 435b4c3..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/str.siddhiext
+++ /dev/null
@@ -1,39 +0,0 @@
-#
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-#
-
-charAt=org.wso2.siddhi.extension.string.CharAtFunctionExtension
-coalesce=org.wso2.siddhi.extension.string.CoalesceFunctionExtension
-concat=org.wso2.siddhi.extension.string.ConcatFunctionExtension
-length=org.wso2.siddhi.extension.string.LengthFunctionExtension
-lower=org.wso2.siddhi.extension.string.LowerFunctionExtension
-regexp=org.wso2.siddhi.extension.string.RegexpFunctionExtension
-repeat=org.wso2.siddhi.extension.string.RepeatFunctionExtension
-replaceAll=org.wso2.siddhi.extension.string.ReplaceAllFunctionExtension
-replaceFirst=org.wso2.siddhi.extension.string.ReplaceFirstFunctionExtension
-reverse=org.wso2.siddhi.extension.string.ReverseFunctionExtension
-strcmp=org.wso2.siddhi.extension.string.StrcmpFunctionExtension
-substr=org.wso2.siddhi.extension.string.SubstrFunctionExtension
-trim=org.wso2.siddhi.extension.string.TrimFunctionExtension
-upper=org.wso2.siddhi.extension.string.UpperFunctionExtension
-hex=org.wso2.siddhi.extension.string.HexFunctionExtension
-unhex=org.wso2.siddhi.extension.string.UnhexFunctionExtension
-contains=org.wso2.siddhi.extension.string.ContainsFunctionExtension
-
-# Eagle Siddhi Extension
-equalsIgnoreCase=org.apache.eagle.policy.siddhi.extension.EqualsIgnoreCaseExtension
-containsIgnoreCase=org.apache.eagle.policy.siddhi.extension.ContainsIgnoreCaseExtension
-regexpIgnoreCase=org.apache.eagle.policy.siddhi.extension.RegexpIgnoreCaseFunctionExtension
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf b/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
deleted file mode 100644
index 1d18b67..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-process/src/test/resources/unittest.conf
+++ /dev/null
@@ -1,65 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements. See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
- "envContextConfig" : {
- "env" : "storm",
- "mode" : "local",
- "topologyName" : "auditLogProcessTopology",
- "stormConfigFile" : "security-auditlog-storm.yaml",
- "parallelismConfig" : {
- "kafkaMsgConsumer" : 2,
- "hdfsAuditLogAlertExecutor*" : 3
- }
- },
- "dataSourceConfig": {
- "flavor" : "stormkafka",
- "topic" : "hdfs_audit_log",
- "zkConnection" : "localhost:2181",
- "zkConnectionTimeoutMS" : 15000,
- "consumerGroupId" : "EagleConsumer",
- "fetchSize" : 1048586,
- "deserializerClass" : "org.apache.eagle.security.auditlog.HdfsAuditLogKafkaDeserializer",
- "transactionZKServers" : "localhost",
- "transactionZKPort" : 2181,
- "transactionZKRoot" : "/brokers/topics",
- "transactionStateUpdateMS" : 2000
- },
- "alertExecutorConfigs" : {
- "hdfsAuditLogAlertExecutor" : {
- "parallelism" : 2,
- "partitioner" : "org.apache.eagle.policy.DefaultPolicyPartitioner"
- "needValidation" : "true"
- }
- },
- "eagleProps" : {
- "site": "site1",
- "application": "hdfsAuditLog",
- "eagleService": {
- "host": "localhost",
- "port": 38080
- },
- "dataJoinPollIntervalSec" : 30,
- "env" : "test",
- "mail.host" : "mailHost.com",
- "mail.smtp.port":"25",
- "mail.debug" : "true"
- },
- "dynamicConfigSource" : {
- "enabled" : false,
- "initDelayMillis" : 0,
- "delayMillis" : 1000
- }
-}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
index e2a9222..31d6d2b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert-service/pom.xml
@@ -31,16 +31,6 @@
<name>eagle-alert-service</name>
<dependencies>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-alert-base</artifactId>
- <version>${project.version}</version>
- </dependency>
- <dependency>
- <groupId>org.apache.eagle</groupId>
- <artifactId>eagle-alert-process</artifactId>
- <version>${project.version}</version>
- </dependency>
<dependency>
<groupId>org.apache.eagle</groupId>
<artifactId>eagle-service-base</artifactId>
@@ -59,6 +49,11 @@
<artifactId>eagle-metadata-base</artifactId>
<version>${project.version}</version>
</dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>eagle-stream-process-base</artifactId>
+ <version>${project.version}</version>
+ </dependency>
</dependencies>
</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
deleted file mode 100644
index 7f5bddd..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiddhiAlertPolicyValidateProvider.java
+++ /dev/null
@@ -1,133 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.service.alert;
-
-import java.util.Arrays;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.service.generic.GenericEntityServiceResource;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.wso2.siddhi.core.ExecutionPlanRuntime;
-import org.wso2.siddhi.core.SiddhiManager;
-
-import org.apache.eagle.policy.common.Constants;
-import org.apache.eagle.alert.entity.AlertStreamSchemaEntity;
-import org.apache.eagle.policy.siddhi.AttributeType;
-import org.apache.eagle.log.entity.GenericServiceAPIResponseEntity;
-import org.apache.eagle.common.DateTimeUtil;
-import com.fasterxml.jackson.databind.Module;
-import com.fasterxml.jackson.databind.jsontype.NamedType;
-import com.fasterxml.jackson.databind.module.SimpleModule;
-
-public class SiddhiAlertPolicyValidateProvider extends AlertPolicyValidateProvider{
-
- public String type;
- public List<String> streamNames;
- public String policyDefinition;
- public static Logger LOG = LoggerFactory.getLogger(PolicyValidateResource.class);
- public static final String EXECUTION_PLAN_NAME = "query";
-
- @SuppressWarnings({"unchecked"})
- public String getStreamDef(String streamName) {
- GenericEntityServiceResource resource = new GenericEntityServiceResource();
- String startTime = "1969-01-01 00:00:00";
- String endTime = DateTimeUtil.millisecondsToHumanDateWithSeconds(Long.MAX_VALUE);
- int pageSize = 1000;
- String query = Constants.ALERT_STREAM_SCHEMA_SERVICE_ENDPOINT_NAME + "[@streamName=\"" + streamName + "\"]{*}";
- GenericServiceAPIResponseEntity<AlertStreamSchemaEntity> streamResponse = resource.search(query, startTime, endTime, pageSize, null, false, false, 0L, 0, true, 0, null, false);
- List<AlertStreamSchemaEntity> list = streamResponse.getObj();
-
- Map<String, String> map = new HashMap<String, String>();
- for(AlertStreamSchemaEntity entity : list){
- map.put(entity.getTags().get("attrName"), entity.getAttrType());
- }
- StringBuilder sb = new StringBuilder();
- sb.append("dataobj object,");
- for(Map.Entry<String, String> entry : map.entrySet()){
- String attrName = entry.getKey();
- sb.append(attrName);
- sb.append(" ");
- String attrType = entry.getValue();
- if(attrType.equalsIgnoreCase(AttributeType.STRING.name())){
- sb.append("string");
- }else if(attrType.equalsIgnoreCase(AttributeType.INTEGER.name())){
- sb.append("int");
- }else if(attrType.equalsIgnoreCase(AttributeType.LONG.name())){
- sb.append("long");
- }else if(attrType.equalsIgnoreCase(AttributeType.BOOL.name())){
- sb.append("bool");
- }else{
- LOG.error("AttrType is not recognized, ignore : " + attrType);
- }
- sb.append(",");
- }
- if(sb.length() > 0){
- sb.deleteCharAt(sb.length()-1);
- }
-
- String siddhiStreamDefFormat = "define stream " + streamName + " (" + "%s" + ");";
- String streamDef = String.format(siddhiStreamDefFormat, sb.toString());
- return streamDef;
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public GenericServiceAPIResponseEntity validate() {
- GenericServiceAPIResponseEntity result = new GenericServiceAPIResponseEntity();
- SiddhiManager siddhiManager = new SiddhiManager();
- ExecutionPlanRuntime executionPlanRuntime = null;
- try {
- String streamDefs = new String();
- for(String streamName : streamNames){
- //String streamDef = SiddhiStreamMetadataUtils.convertToStreamDef(streamName);
- //We don't use SiddhiStreamMetadataUtils, for it only consume one dataSource
- String streamDef = getStreamDef(streamName);
- LOG.info("Siddhi stream definition : " + streamDef);
- streamDefs += streamDef;
- }
-
- String executionPlan = streamDefs + " @info(name = '" + EXECUTION_PLAN_NAME + "') " + policyDefinition;
- executionPlanRuntime = siddhiManager.createExecutionPlanRuntime(executionPlan);
- }
- catch (Exception ex) {
- result.setSuccess(false);
- result.setException(ex);
- return result;
- }
- finally {
- if (executionPlanRuntime != null) {
- executionPlanRuntime.shutdown();
- }
- }
- result.setSuccess(true);
- return result;
- }
-
- @Override
- public String PolicyType() {
- return Constants.policyType.siddhiCEPEngine.name();
- }
-
- @Override
- public List<Module> BindingModules() {
- Module module = new SimpleModule("policyValidate").registerSubtypes(new NamedType(SiddhiAlertPolicyValidateProvider.class, PolicyType()));
- return Arrays.asList(module);
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/0f11a591/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java b/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
deleted file mode 100644
index 41ece1d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert-service/src/main/java/org/apache/eagle/service/alert/SiteApplicationObject.java
+++ /dev/null
@@ -1,62 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- *
- */
-
-package org.apache.eagle.service.alert;
-
-
-import org.apache.eagle.log.base.taggedlog.TaggedLogAPIEntity;
-import org.apache.eagle.alert.entity.SiteApplicationServiceEntity;
-
-import java.util.List;
-import java.util.Map;
-
-public class SiteApplicationObject extends TaggedLogAPIEntity {
-
- public Boolean getEnabled() {
- return enabled;
- }
-
- public void setEnabled(Boolean enabled) {
- this.enabled = enabled;
- valueChanged("enabled");
- }
-
- public List<SiteApplicationServiceEntity> getApplications() {
- return applications;
- }
-
- public void setApplications(List<SiteApplicationServiceEntity> applications) {
- this.applications = applications;
- valueChanged("applicationList");
- }
-
- @Override
- public Map<String, String> getTags() {
- return tags;
- }
-
- @Override
- public void setTags(Map<String, String> tags) {
- this.tags = tags;
- valueChanged("tags");
- }
-
- Map<String, String> tags;
- Boolean enabled;
- List<SiteApplicationServiceEntity> applications;
-}
\ No newline at end of file