You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/06/01 06:00:03 UTC
[03/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
new file mode 100644
index 0000000..279a041
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
@@ -0,0 +1,161 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Since 4/29/16.
+ */
+@SuppressWarnings({"serial", "rawtypes"})
+public class TestStormParallelism {
+ /**
+ * When run this test, please check the following through jstack and log
+ * 1) for blue-spout, num of executors is 2, # of tasks is 2
+ *
+ * Expected:
+ *
+ * a. 2 threads uniquely named Thread-*-blue-spout-executor[*,*]
+ * b. each thread will have single task
+ *
+ * 2) for green-bolt, num of executors is 2, # of tasks is 4
+ *
+ * Expected:
+ *
+ * a. 2 threads uniquely named Thread-*-green-bolt-executor[*,*]
+ * b. each thread will have 2 tasks
+ *
+ * 3) for yellow-bolt, num of executors is 6, # of tasks is 6
+ *
+ * Expected:
+ *
+ * a. 6 threads uniquely named Thread-*-yellow-bolt-executor[*,*]
+ * b. each thread will have 1 tasks
+ *
+ *
+ * Continue to think:
+ *
+ * For alter engine, if we use multiple tasks per component instead of one task per component,
+ * what will the parallelism mechanism affect?
+ *
+ * @throws Exception
+ */
+ @Ignore
+ @Test
+ public void testParallelism() throws Exception{
+ Config conf = new Config();
+ conf.setNumWorkers(2); // use two worker processes
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hint
+
+ topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
+ .setNumTasks(4)
+ .shuffleGrouping("blue-spout");
+
+ topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
+ .shuffleGrouping("green-bolt");
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
+
+ while(true) {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static class BlueSpout extends BaseRichSpout{
+ static int count = 0;
+ public BlueSpout(){
+ }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("a"));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ count++;
+ System.out.println("# of spout objects " + count + ", current spout " + this);
+ }
+
+ @Override
+ public void nextTuple() {
+
+ }
+ }
+
+ private static class GreenBolt extends BaseRichBolt{
+ static int count;
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ count++;
+ System.out.println("# of green bolt objects " + count + ", current green bolt " + this);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("a"));
+ }
+ }
+
+ private static class YellowBolt extends BaseRichBolt{
+ static int count;
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ count++;
+ System.out.println("# of yellow bolt objects " + count + ", current yellow bolt " + this);
+ }
+
+ @Override
+ public void execute(Tuple input) {
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("a"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
new file mode 100644
index 0000000..9d3892a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
@@ -0,0 +1,140 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+
+/**
+ * Since 4/29/16.
+ */
+@SuppressWarnings({"serial", "rawtypes", "unused"})
+public class TestStormStreamIdRouting {
+ @Ignore
+ @Test
+ public void testRoutingByStreamId() throws Exception{
+ Config conf = new Config();
+ conf.setNumWorkers(2); // use two worker processes
+ TopologyBuilder topologyBuilder = new TopologyBuilder();
+ topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
+
+ topologyBuilder.setBolt("green-bolt-1", new GreenBolt(1))
+ .shuffleGrouping("blue-spout", "green-bolt-stream-1");
+ topologyBuilder.setBolt("green-bolt-2", new GreenBolt(2))
+ .shuffleGrouping("blue-spout", "green-bolt-stream-2");
+
+ LocalCluster cluster = new LocalCluster();
+ cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
+
+ while(true) {
+ try {
+ Thread.sleep(1000);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ private static class BlueSpout extends BaseRichSpout {
+ int count = 0;
+ private SpoutOutputCollector collector;
+ public BlueSpout(){
+ }
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declareStream("green-bolt-stream-1", new Fields("a"));
+ declarer.declareStream("green-bolt-stream-2", new Fields("a"));
+ }
+
+ @Override
+ public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+ this.collector = collector;
+ }
+
+ @Override
+ public void nextTuple() {
+ if(count % 2 == 0) {
+ this.collector.emit("green-bolt-stream-1", Arrays.asList("testdata" + count));
+ count++;
+ }else{
+ this.collector.emit("green-bolt-stream-2", Arrays.asList("testdata" + count));
+ count++;
+ }
+ try{
+ Thread.sleep(10000);
+ }catch(Exception ex){
+
+ }
+ }
+ }
+
+ private static class GreenBolt extends BaseRichBolt {
+ private int id;
+ public GreenBolt(int id){
+ this.id = id;
+ }
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ }
+
+ @Override
+ public void execute(Tuple input) {
+ System.out.println("bolt " + id + " received data " + input.getString(0));
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("a"));
+ }
+ }
+
+ private static class YellowBolt extends BaseRichBolt{
+ @Override
+ public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+ }
+
+ @Override
+ public void execute(Tuple input) {
+
+ }
+
+ @Override
+ public void declareOutputFields(OutputFieldsDeclarer declarer) {
+ declarer.declare(new Fields("a"));
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
new file mode 100644
index 0000000..b0e5c4a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
@@ -0,0 +1,62 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.Tuple2StreamConverter;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Since 5/3/16.
+ */
+public class TestTuple2StreamConverter {
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Test
+ public void test(){
+ Tuple2StreamMetadata metadata = new Tuple2StreamMetadata();
+ Set activeStreamNames = new HashSet<>();
+ activeStreamNames.add("defaultStringStream");
+ metadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+ metadata.setStreamNameSelectorProp(new Properties());
+ metadata.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+ metadata.setActiveStreamNames(activeStreamNames);
+ metadata.setTimestampColumn("timestamp");
+ Tuple2StreamConverter convert = new Tuple2StreamConverter(metadata);
+ String topic = "testTopic";
+ Map m = new HashMap<>();
+ m.put("value", "IAmPlainString");
+ long t = System.currentTimeMillis();
+ m.put("timestamp", t);
+ List<Object> ret = convert.convert(Arrays.asList(topic, m));
+ Assert.assertEquals(topic, ret.get(0));
+ Assert.assertEquals("defaultStringStream", ret.get(1));
+ Assert.assertEquals(t, ret.get(2));
+ Assert.assertEquals(m, ret.get(3));
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
new file mode 100644
index 0000000..7132f5a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
@@ -0,0 +1,57 @@
+/*
+ *
+ * * Licensed to the Apache Software Foundation (ASF) under one or more
+ * * contributor license agreements. See the NOTICE file distributed with
+ * * this work for additional information regarding copyright ownership.
+ * * The ASF licenses this file to You under the Apache License, Version 2.0
+ * * (the "License"); you may not use this file except in compliance with
+ * * the License. You may obtain a copy of the License at
+ * *
+ * * http://www.apache.org/licenses/LICENSE-2.0
+ * *
+ * * Unless required by applicable law or agreed to in writing, software
+ * * distributed under the License is distributed on an "AS IS" BASIS,
+ * * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * * See the License for the specific language governing permissions and
+ * * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 5/4/16.
+ */
+public class TestUnitTopologyMain {
+ @Ignore
+ @Test
+ public void testTopologyRun(){
+ testTopologyRun("/application-test.conf");
+ }
+
+ public void testTopologyRun(String configResourceName){
+ ConfigFactory.invalidateCaches();
+ System.setProperty("config.resource", configResourceName);
+ System.out.print("Set config.resource = "+configResourceName);
+ Config config = ConfigFactory.load();
+ String topologyId = config.getString("topology.name");
+ MockMetadataChangeNotifyService changeNotifyService =
+ new MockMetadataChangeNotifyService(topologyId,"alertEngineSpout");
+ new UnitTopologyRunner(changeNotifyService).run(topologyId,config);
+ }
+
+ public static void main(String[] args){
+ if(args.length>0) {
+ new TestUnitTopologyMain().testTopologyRun(args[0]);
+ } else {
+ new TestUnitTopologyMain().testTopologyRun();
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
new file mode 100644
index 0000000..0861597
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class CompressionUtilsTest {
+ private final static Logger LOG = LoggerFactory.getLogger(CompressionUtilsTest.class);
+
+ @Test
+ public void testCompressAndDecompress() throws IOException {
+ String value = "http://www.apache.org/licenses/LICENSE-2.0";
+ byte[] original = value.getBytes();
+ byte[] compressed = CompressionUtils.compress(original);
+ byte[] decompressed = CompressionUtils.decompress(compressed);
+
+ LOG.info("original size: {}",original.length);
+ LOG.info("compressed size: {}",compressed.length);
+ LOG.info("decompressed size: {}",decompressed.length);
+
+ String decompressedValue = new String(decompressed);
+ Assert.assertEquals(value,decompressedValue);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
new file mode 100644
index 0000000..cad53d4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.utils;
+
+import java.text.ParseException;
+
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.joda.time.Seconds;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TimePeriodUtilsTest {
+ @Test
+ public void testJodaTimePeriod() throws ParseException {
+ String periodText = "PT10m";
+ Period period = new Period(periodText);
+ int seconds = period.toStandardSeconds().getSeconds();
+ Assert.assertEquals(600, seconds);
+ Assert.assertEquals(60, period.toStandardSeconds().dividedBy(10).getSeconds());
+ }
+
+ @Test
+ public void testJodaTimePeriodBySeconds() throws ParseException {
+ String periodText = "PT10s";
+ Period period = new Period(periodText);
+ int seconds = period.toStandardSeconds().getSeconds();
+ Assert.assertEquals(10, seconds);
+ }
+
+ @Test
+ public void testFormatSecondsByPeriod15M() throws ParseException {
+
+ Period period = new Period("PT15m");
+ Seconds seconds = period.toStandardSeconds();
+ Assert.assertEquals(15*60,seconds.getSeconds());
+
+ long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
+ long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:45:00");
+ long result = TimePeriodUtils.formatSecondsByPeriod(time,seconds);
+ Assert.assertEquals(expect,result);
+
+ time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
+ expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+ result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+ Assert.assertEquals(expect,result);
+
+ time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
+ expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+ result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+ Assert.assertEquals(expect,result);
+ }
+
+ @Test
+ public void testFormatSecondsByPeriod1H() throws ParseException {
+
+ Period period = new Period("PT1h");
+ Seconds seconds = period.toStandardSeconds();
+ Assert.assertEquals(60*60,seconds.getSeconds());
+
+ long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
+ long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:00:00");
+ long result = TimePeriodUtils.formatSecondsByPeriod(time,seconds);
+ Assert.assertEquals(expect,result);
+
+ time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
+ expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+ result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+ Assert.assertEquals(expect,result);
+
+ time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:30:59");
+ expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+ result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+ Assert.assertEquals(expect,result);
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf
new file mode 100644
index 0000000..13d4ee1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+ "topology" : {
+ "name" : "alertUnitTopology_1",
+ "numOfTotalWorkers": 20,
+ "numOfSpoutTasks" : 1,
+ "numOfRouterBolts" : 4,
+ "numOfAlertBolts" : 10,
+ "numOfPublishTasks" : 1,
+ "localMode" : "true"
+ },
+ "spout" : {
+ "kafkaBrokerZkQuorum": "localhost:2181",
+ "kafkaBrokerZkBasePath": "/brokers",
+ "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+ "stormKafkaTransactionZkQuorum": "",
+ "stormKafkaTransactionZkPath": "/consumers",
+ "stormKafkaEagleConsumer": "eagle_consumer",
+ "stormKafkaStateUpdateIntervalMs": 2000,
+ "stormKafkaFetchSizeBytes": 1048586,
+ },
+ "zkConfig" : {
+ "zkQuorum" : "localhost:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "dynamicConfigSource" : {
+ "initDelayMillis": 3000,
+ "delayMillis" : 10000
+ },
+ "metadataService": {
+ "context" : "/api",
+ "host" : "localhost",
+ "port" : 8080
+ },
+ "coordinatorService": {
+ "host": "localhost",
+ "port": "9090",
+ "context" : "/api"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf
new file mode 100755
index 0000000..2c99a68
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+ "topology" : {
+ "name" : "alertUnitTopology_bug",
+ "numOfSpoutTasks" : 3,
+ "numOfRouterBolts" : 6,
+ "numOfAlertBolts" : 6,
+ "numOfPublishTasks" : 1,
+ "numOfTotalWorkers":1,
+ "messageTimeoutSecs": 30, // topology.message.timeout.secs: 30 by default
+ "localMode" : true
+ },
+ "spout" : {
+ "kafkaBrokerZkQuorum": "10.64.243.71:2181",
+ "kafkaBrokerZkBasePath": "/brokers",
+ "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+ "stormKafkaTransactionZkQuorum": "",
+ "stormKafkaTransactionZkPath": "/consumers",
+ "stormKafkaEagleConsumer": "eagle_consumer",
+ "stormKafkaStateUpdateIntervalMs": 2000,
+ "stormKafkaFetchSizeBytes": 1048586,
+ },
+ "zkConfig" : {
+ "zkQuorum" : "10.64.243.71:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "dynamicConfigSource" : {
+ "initDelayMillis": 3000,
+ "delayMillis" : 10000
+ },
+ "metadataService": {
+ "context" : "/api",
+ "host" : "localhost",
+ "port" : 8080
+ },
+ "metric":{
+ "tags":{
+ "topologyName":"alertUnitTopology_1"
+ }
+ "sink": {
+// "kafka": {
+// "topic": "alert_metric_test"
+// "bootstrap.servers": "localhost:9092"
+// }
+ "logger": {
+ "level":"DEBUG"
+ }
+ "elasticsearch": {
+ "hosts": ["10.64.223.222:9200"]
+ "index": "alert_metric_test"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf
new file mode 100755
index 0000000..f4a797e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+ "topology" : {
+ "name" : "alertUnitTopology_1_test",
+ "numOfSpoutTasks" : 3,
+ "numOfRouterBolts" : 6,
+ "numOfAlertBolts" : 6,
+ "numOfPublishTasks" : 1,
+ "numOfTotalWorkers":1,
+ "messageTimeoutSecs": 30, // topology.message.timeout.secs: 30 by default
+ "localMode" : true
+ },
+ "spout" : {
+ "kafkaBrokerZkQuorum": "localhost:2181",
+ "kafkaBrokerZkBasePath": "/brokers",
+ "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+ "stormKafkaTransactionZkQuorum": "",
+ "stormKafkaTransactionZkPath": "/consumers",
+ "stormKafkaEagleConsumer": "eagle_consumer",
+ "stormKafkaStateUpdateIntervalMs": 2000,
+ "stormKafkaFetchSizeBytes": 1048586,
+ },
+ "zkConfig" : {
+ "zkQuorum" : "localhost:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "dynamicConfigSource" : {
+ "initDelayMillis": 3000,
+ "delayMillis" : 10000
+ },
+ "metadataService": {
+ "context" : "/api",
+ "host" : "localhost",
+ "port" : 8080
+ },
+ "metric":{
+ "sink": {
+// "kafka": {
+// "topic": "alert_metric_test"
+// "bootstrap.servers": "localhost:9092"
+// }
+ "logger": {
+ "level":"INFO"
+ }
+ "elasticsearch": {
+ "hosts": ["10.64.223.222:9200"]
+ "index": "alert_metric_test"
+ }
+ }
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf
new file mode 100644
index 0000000..bb998cd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+ "topology" : {
+ "name" : "alertUnitTopology_2",
+ "numOfSpoutTasks" : 1,
+ "numOfRouterBolts" : 4,
+ "numOfAlertBolts" : 10,
+ "numOfPublishTasks" : 1,
+ "numOfTotalWorkers": 20,
+ "localMode" : "true"
+ },
+ "spout" : {
+ "kafkaBrokerZkQuorum": "localhost:2181",
+ "kafkaBrokerZkBasePath": "/brokers",
+ "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+ "stormKafkaTransactionZkQuorum": "",
+ "stormKafkaTransactionZkPath": "/consumers",
+ "stormKafkaEagleConsumer": "eagle_consumer",
+ "stormKafkaStateUpdateIntervalMs": 2000,
+ "stormKafkaFetchSizeBytes": 1048586,
+ },
+ "zkConfig" : {
+ "zkQuorum" : "localhost:2181",
+ "zkRoot" : "/alert",
+ "zkSessionTimeoutMs" : 10000,
+ "connectionTimeoutMs" : 10000,
+ "zkRetryTimes" : 3,
+ "zkRetryInterval" : 3000
+ },
+ "dynamicConfigSource" : {
+ "initDelayMillis": 3000,
+ "delayMillis" : 10000
+ },
+ "metadataService": {
+ "context" : "/api",
+ "host" : "localhost",
+ "port" : 8080
+ },
+ "coordinatorService": {
+ "host": "localhost",
+ "port": "9090",
+ "context" : "/api"
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json
new file mode 100644
index 0000000..946acd6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json
@@ -0,0 +1,37 @@
+[
+{
+ "name": "eslog_datasource",
+ "type": "KAFKA",
+ "properties": {
+ },
+ "topic": "eslogs",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+ "codec": {
+ "streamNameSelectorProp": {
+ "userProvidedStreamName" : "esStream",
+ "streamNameFormat":"%s"
+ },
+ "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat":""
+ }
+},
+{
+ "name": "bootfailure_datasource",
+ "type": "KAFKA",
+ "properties": {
+ },
+ "topic": "bootfailures",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+ "codec": {
+ "streamNameSelectorProp": {
+ "userProvidedStreamName" : "ifStream",
+ "streamNameFormat":"%s"
+ },
+ "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat":""
+ }
+}
+
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json
new file mode 100644
index 0000000..4702575
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json
@@ -0,0 +1,39 @@
+[
+{
+ "name": "logStreamJoinBootFailure",
+ "description" : "policy to check host perfmon_cpu",
+ "inputStreams": [
+ "esStream",
+ "ifStream"
+ ],
+ "outputStreams": [
+ "log_stream_join_output"
+ ],
+ "definition": {
+ "type": "siddhi",
+ "value": " from esStream#window.externalTime(timestamp, 20 min) as a join ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == b.instanceUuid select logLevel, a.host as aHost, a.component, a.message as logMessage, b.message as failMessage, a.timestamp as t1, b.timestamp as t2, b.host as bHost, count(1) as errorCount insert into log_stream_join_output; "
+ },
+ "partitionSpec": [
+ {
+ "streamId" : "esStream",
+ "type" : "GROUPBY",
+ "columns" : [
+ "instanceUuid"
+ ],
+ "sortSpec": {
+ "windowPeriod" : "PT1M"
+ }
+ },
+ {
+ "streamId" : "ifStream",
+ "type" : "GROUPBY",
+ "columns" : [
+ "instanceUuid"
+ ],
+ "sortSpec": {
+ "windowPeriod" : "PT1M"
+ }
+ }
+ ]
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json
new file mode 100644
index 0000000..7b531fc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json
@@ -0,0 +1,17 @@
+[
+{
+ "name":"log-stream-join-output",
+ "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "inputStreams": [
+ "log_stream_join_output"
+ ],
+ "properties": {
+ "subject":"UMP Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "receiver@corp.com",
+ "smtp.server":"mailhost.com"
+ },
+ "dedupIntervalMin" : "1"
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json
new file mode 100644
index 0000000..0a26a28
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json
@@ -0,0 +1,93 @@
+[
+{
+ "streamId": "esStream",
+ "dataSource" : "eslog_datasource",
+ "description":"the data stream for es log of different modules",
+ "validate": false,
+ "timeseries":false,
+ "columns": [
+ {
+ "name": "instanceUuid",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ },
+ {
+ "name": "timestamp",
+ "type" : "long",
+ "defaultValue": 0,
+ "required":true
+ },
+ {
+ "name": "logLevel",
+ "type" : "string",
+ "defaultValue": "ERROR",
+ "required": true
+ },
+ {
+ "name": "message",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ },
+ {
+ "name": "reqId",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ },
+ {
+ "name": "host",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ },
+ {
+ "name": "component",
+ "type" : "string",
+ "defaultValue": "nova",
+ "required":true
+ }
+ ]
+}
+,
+{
+ "streamId": "ifStream",
+ "dataSource" : "bootfailure_datasource",
+ "description":"the data stream for boot failure(instance fault)",
+ "validate": false,
+ "timeseries":false,
+ "columns": [
+ {
+ "name": "instanceUuid",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ },
+ {
+ "name": "timestamp",
+ "type" : "long",
+ "defaultValue": 0,
+ "required":true
+ },
+ {
+ "name": "reqId",
+ "type" : "string",
+ "defaultValue": "",
+ "required": true
+ },
+ {
+ "name": "message",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ },
+ {
+ "name": "host",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ }
+ ]
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json
new file mode 100644
index 0000000..9aa8716
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+ "name": "alertUnitTopology_2",
+ "numOfSpout":1,
+ "numOfGroupBolt": 4,
+ "numOfAlertBolt": 10,
+ "spoutId": "alertEngineSpout",
+ "groupNodeIds" : [
+ "streamRouterBolt0",
+ "streamRouterBolt1",
+ "streamRouterBolt2",
+ "streamRouterBolt3"
+ ],
+ "alertBoltIds": [
+ "alertBolt0",
+ "alertBolt1",
+ "alertBolt2",
+ "alertBolt3",
+ "alertBolt4",
+ "alertBolt5",
+ "alertBolt6",
+ "alertBolt7",
+ "alertBolt8",
+ "alertBolt9"
+ ],
+ "pubBoltId" : "alertPublishBolt",
+ "spoutParallelism": 1,
+ "groupParallelism": 1,
+ "alertParallelism": 1
+}
+]
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json
new file mode 100644
index 0000000..77a280c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json
@@ -0,0 +1,19 @@
+[
+{
+ "name": "perfmon_datasource",
+ "type": "KAFKA",
+ "properties": {
+ },
+ "topic": "perfmon_metrics",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+ "codec": {
+ "streamNameSelectorProp": {
+ "fieldNamesToInferStreamName" : "metric",
+ "streamNameFormat":"%s"
+ },
+ "streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat":""
+ }
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3ba587d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements. See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License. You may obtain a copy of the License at
+#
+# http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+#log4j.logger.org.apache.eagle.alert.metric=DEBUG
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json
new file mode 100644
index 0000000..5edece9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json
@@ -0,0 +1,54 @@
+[
+{
+ "name": "perfmon_cpu_host_check",
+ "description" : "policy to check host perfmon_cpu",
+ "inputStreams": [
+ "perfmon_cpu_stream"
+ ],
+ "outputStreams": [
+ "perfmon_cpu_check_output"
+ ],
+ "definition": {
+ "type": "siddhi",
+ "value": "from perfmon_cpu_stream[value > 90.0] select * group by host insert into perfmon_cpu_check_output;"
+ },
+ "partitionSpec": [
+ {
+ "streamId" : "perfmon_cpu_stream",
+ "type" : "GROUPBY",
+ "columns" : [
+ "host"
+ ],
+ "sortSpec": {
+ "windowPeriod" : "PT1M"
+ }
+ }
+ ]
+},
+{
+ "name": "perfmon_cpu_pool_check",
+ "description" : "policy to check pool perfmon_cpu",
+ "inputStreams": [
+ "perfmon_cpu_stream"
+ ],
+ "outputStreams": [
+ "perfmon_cpu_check_output"
+ ],
+ "definition": {
+ "type": "siddhi",
+ "value": "from perfmon_cpu_stream[value > 75.0] select * group by pool insert into perfmon_cpu_check_output;"
+ },
+ "partitionSpec": [
+ {
+ "streamId" : "perfmon_cpu_stream",
+ "type" : "GROUPBY",
+ "columns" : [
+ "pool"
+ ],
+ "sortSpec": {
+ "windowPeriod" : "PT1M"
+ }
+ }
+ ]
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json
new file mode 100644
index 0000000..b3840a5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json
@@ -0,0 +1,29 @@
+[
+{
+ "name":"test-stream-output",
+ "type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+ "policyIds": [
+ "perfmon_cpu_host_check", "perfmon_cpu_pool_check"
+ ],
+ "properties": {
+ "subject":"UMP Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "receiver@corp.com",
+ "smtp.server":"mailhost.com",
+ "connection": "plaintext",
+ "smtp.port": "25"
+ },
+ "dedupIntervalMin" : "PT0M"
+},
+{
+ "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+ "name":"kafka-testAlertStream",
+ "policyIds": ["perfmon_cpu_host_check"],
+ "dedupIntervalMin": "PT1M",
+ "properties":{
+ "kafka_broker":"sandbox.hortonworks.com:6667",
+ "topic":"test_kafka"
+ }
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json
new file mode 100644
index 0000000..277ba75
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json
@@ -0,0 +1,19 @@
+[
+{
+ "name":"test-stream-output",
+ "type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "policyIds": [
+ "policy1", "policy2"
+ ],
+ "properties": {
+ "subject":"UMP Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "receiver@corp.com",
+ "smtp.server":"mailhost.com",
+ "connection": "plaintext",
+ "smtp.port": "25"
+ },
+ "dedupIntervalMin" : "PT0M"
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json
new file mode 100644
index 0000000..c63b9ff
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json
@@ -0,0 +1,3 @@
+[
+{"host": "", "timestamp" : "", "metric" : "", "pool": "", "value": 1.0, "colo": "phx"},
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json
new file mode 100644
index 0000000..cbeae19
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json
@@ -0,0 +1,44 @@
+[
+{
+ "streamId": "perfmon_cpu_stream",
+ "dataSource" : "perfmon_datasource",
+ "description":"the data stream for perfmon cpu metrics",
+ "validate": false,
+ "timeseries":false,
+ "columns": [
+ {
+ "name": "host",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ },
+ {
+ "name": "timestamp",
+ "type" : "long",
+ "defaultValue": 0,
+ "required":true
+ },{
+ "name": "metric",
+ "type" : "string",
+ "defaultValue": "perfmon_cpu",
+ "required": true
+ },{
+ "name": "pool",
+ "type" : "string",
+ "defaultValue": "raptor_general",
+ "required":true
+ },{
+ "name": "value",
+ "type" : "double",
+ "defaultValue": 0.0,
+ "required":true
+ },
+ {
+ "name": "colo",
+ "type" : "string",
+ "defaultValue": "",
+ "required":true
+ }
+ ]
+}
+]
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json
new file mode 100644
index 0000000..fd255cd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json
@@ -0,0 +1,92 @@
+{
+ "version": "version1",
+ "topologyName": "testTopology",
+ "boltPoliciesMap": {
+ "alertBolt0": [
+ {
+ "name": "policy1",
+ "description": null,
+ "inputStreams": [
+ "testTopic3Stream"
+ ],
+ "outputStreams": [
+ "testAlertStream"
+ ],
+ "definition": {
+ "type": "siddhi",
+ "value": "from testTopic3Stream[value=='xyz'] select value insert into testAlertStream;"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "testTopic3Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ }
+ ],
+ "parallelismHint": 0
+ },
+ {
+ "name": "policy2",
+ "description": null,
+ "inputStreams": [
+ "testTopic4Stream"
+ ],
+ "outputStreams": [
+ "testAlertStream"
+ ],
+ "definition": {
+ "type": "siddhi",
+ "value": "from testTopic4Stream[value=='xyz'] select value insert into testAlertStream;"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "testTopic4Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ }
+ ],
+ "parallelismHint": 0
+ },
+ {
+ "name": "policy3",
+ "description": null,
+ "inputStreams": [
+ "testTopic5Stream"
+ ],
+ "outputStreams": [
+ "testAlertStream"
+ ],
+ "definition": {
+ "type": "siddhi",
+ "value": "from testTopic5Stream[value=='xyz'] select value insert into testAlertStream;"
+ },
+ "partitionSpec": [
+ {
+ "streamId": "testTopic5Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ }
+ ],
+ "parallelismHint": 0
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json
new file mode 100644
index 0000000..220c8fe
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json
@@ -0,0 +1,32 @@
+{
+ "version": "version1",
+ "topologyName": "testTopology",
+ "boltId": "alertPublishBolt",
+ "publishments": [
+ {
+ "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "name":"email-testAlertStream",
+ "policyIds": ["policy1", "policy2", "policy3"],
+ "dedupIntervalMin": "PT1M",
+ "properties":{
+ "subject":"UMP Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "receiver@corp.com",
+ "smtp.server":"mailhost.com",
+ "connection": "plaintext",
+ "smtp.port": "25"
+ }
+ }
+/* {
+ "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+ "name":"kafka-testAlertStream",
+ "policyIds": ["testPolicy"],
+ "dedupIntervalMin": "PT1M",
+ "properties":{
+ "kafka_broker":"sandbox.hortonworks.com:6667",
+ "topic":"test_kafka"
+ }
+ }*/
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json
new file mode 100644
index 0000000..0a16540
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json
@@ -0,0 +1,32 @@
+{
+ "version": "version1",
+ "topologyName": "testTopology",
+ "boltId": "alertPublishBolt",
+ "publishments": [
+ {
+ "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+ "name":"email-testAlertStream",
+ "policyIds": ["policy1"],
+ "dedupIntervalMin": "PT2M",
+ "properties":{
+ "subject":"UMP Test Alert",
+ "template":"",
+ "sender": "sender@corp.com",
+ "recipients": "receiver@corp.com",
+ "smtp.server":"mailhost.com",
+ "connection": "plaintext",
+ "smtp.port": "25"
+ }
+ }
+// {
+// "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+// "name":"kafka-testAlertStream",
+// "policyIds": ["testPolicy"],
+// "dedupIntervalMin": "PT1M",
+// "properties":{
+// "kafka_broker":"sandbox.hortonworks.com:6667",
+// "topic":"test_kafka"
+// }
+// }
+ ]
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json
new file mode 100644
index 0000000..c97ec43
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json
@@ -0,0 +1,139 @@
+{
+ "version": null,
+ "topologyId": "testTopology",
+ "kafka2TupleMetadataMap": {
+ "testTopic5": {
+ "type": null,
+ "name": "testTopic5",
+ "properties": null,
+ "topic": "testTopic5",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+ "codec": null
+ },
+ "testTopic3": {
+ "type": null,
+ "name": "testTopic3",
+ "properties": null,
+ "topic": "testTopic3",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.PlainStringScheme",
+ "codec": null
+ },
+ "testTopic4": {
+ "type": null,
+ "name": "testTopic4",
+ "properties": null,
+ "topic": "testTopic4",
+ "schemeCls": "org.apache.eagle.alert.engine.scheme.PlainStringScheme",
+ "codec": null
+ }
+ },
+ "tuple2StreamMetadataMap": {
+ "testTopic5": {
+ "activeStreamNames": [
+ "testTopic5Stream"
+ ],
+ "streamNameSelectorProp": {
+ "userProvidedStreamName": "testTopic5Stream"
+ },
+ "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat": null
+ },
+ "testTopic3": {
+ "activeStreamNames": [
+ "testTopic3Stream"
+ ],
+ "streamNameSelectorProp": {
+ "userProvidedStreamName": "testTopic3Stream"
+ },
+ "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat": null
+ },
+ "testTopic4": {
+ "activeStreamNames": [
+ "testTopic4Stream"
+ ],
+ "streamNameSelectorProp": {
+ "userProvidedStreamName": "testTopic4Stream"
+ },
+ "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector",
+ "timestampColumn": "timestamp",
+ "timestampFormat": null
+ }
+ },
+ "streamRepartitionMetadataMap": {
+ "testTopic5": [
+ {
+ "topicName": "testTopic5",
+ "streamId": "defaultStringStream",
+ "groupingStrategies": [
+ {
+ "partition": {
+ "streamId": "testTopic5Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "numTotalParticipatingRouterBolts": 1,
+ "startSequence": 0,
+ "totalTargetBoltIds": []
+ }
+ ]
+ }
+ ],
+ "testTopic3": [
+ {
+ "topicName": "testTopic3",
+ "streamId": "defaultStringStream",
+ "groupingStrategies": [
+ {
+ "partition": {
+ "streamId": "testTopic3Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "numTotalParticipatingRouterBolts": 1,
+ "startSequence": 0,
+ "totalTargetBoltIds": []
+ }
+ ]
+ }
+ ],
+ "testTopic4": [
+ {
+ "topicName": "testTopic4",
+ "streamId": "defaultStringStream",
+ "groupingStrategies": [
+ {
+ "partition": {
+ "streamId": "testTopic4Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "numTotalParticipatingRouterBolts": 1,
+ "startSequence": 0,
+ "totalTargetBoltIds": []
+ }
+ ]
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json
new file mode 100644
index 0000000..6b3ccff
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json
@@ -0,0 +1,47 @@
+{
+ "testTopic5Stream": {
+ "streamId": "testTopic5Stream",
+ "dataSource": null,
+ "description": null,
+ "validate": false,
+ "timeseries": false,
+ "columns": [
+ {
+ "name": "value",
+ "type": "STRING",
+ "defaultValue": null,
+ "required": false
+ }
+ ]
+ },
+ "testTopic4Stream": {
+ "streamId": "testTopic4Stream",
+ "dataSource": null,
+ "description": null,
+ "validate": false,
+ "timeseries": false,
+ "columns": [
+ {
+ "name": "value",
+ "type": "STRING",
+ "defaultValue": null,
+ "required": false
+ }
+ ]
+ },
+ "testTopic3Stream": {
+ "streamId": "testTopic3Stream",
+ "dataSource": null,
+ "description": null,
+ "validate": false,
+ "timeseries": false,
+ "columns": [
+ {
+ "name": "value",
+ "type": "STRING",
+ "defaultValue": null,
+ "required": false
+ }
+ ]
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
new file mode 100644
index 0000000..f4e72bf
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
@@ -0,0 +1,123 @@
+{
+ "version": null,
+ "topologyName": "testTopology",
+ "routerSpecs": [
+ {
+ "streamId": "testTopic3Stream",
+ "partition": {
+ "streamId": "testTopic3Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "targetQueue": [
+ {
+ "partition": {
+ "streamId": "testTopic3Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "workers": [
+ {
+ "topologyName": "testTopology",
+ "boltId": "alertBolt0"
+ },
+ {
+ "topologyName": "testTopology",
+ "boltId": "alertBolt1"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "streamId": "testTopic4Stream",
+ "partition": {
+ "streamId": "testTopic4Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "targetQueue": [
+ {
+ "partition": {
+ "streamId": "testTopic4Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "workers": [
+ {
+ "topologyName": "testTopology",
+ "boltId": "alertBolt0"
+ },
+ {
+ "topologyName": "testTopology",
+ "boltId": "alertBolt1"
+ }
+ ]
+ }
+ ]
+ },
+ {
+ "streamId": "testTopic5Stream",
+ "partition": {
+ "streamId": "testTopic5Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "targetQueue": [
+ {
+ "partition": {
+ "streamId": "testTopic5Stream",
+ "type": "GROUPBY",
+ "columns": [
+ "value"
+ ],
+ "sortSpec": {
+ "windowPeriod": "PT10S",
+ "windowMargin": 1000
+ }
+ },
+ "workers": [
+ {
+ "topologyName": "testTopology",
+ "boltId": "alertBolt0"
+ },
+ {
+ "topologyName": "testTopology",
+ "boltId": "alertBolt1"
+ }
+ ]
+ }
+ ]
+ }
+ ]
+}
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
new file mode 100644
index 0000000..b49d6ad
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
@@ -0,0 +1 @@
+nn_jmx_metric_sandbox
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
new file mode 100644
index 0000000..411cc48
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+ "name": "alertUnitTopology_1",
+ "numOfSpout":1,
+ "numOfAlertBolt": 10,
+ "numOfGroupBolt": 4,
+ "spoutId": "alertEngineSpout",
+ "groupNodeIds" : [
+ "streamRouterBolt0",
+ "streamRouterBolt1",
+ "streamRouterBolt2",
+ "streamRouterBolt3"
+ ],
+ "alertBoltIds": [
+ "alertBolt0",
+ "alertBolt1",
+ "alertBolt2",
+ "alertBolt3",
+ "alertBolt4",
+ "alertBolt5",
+ "alertBolt6",
+ "alertBolt7",
+ "alertBolt8",
+ "alertBolt9"
+ ],
+ "pubBoltId" : "alertPublishBolt",
+ "spoutParallelism": 1,
+ "groupParallelism": 1,
+ "alertParallelism": 1
+}
+]
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/pom.xml b/eagle-core/eagle-alert/alert/alert-engine/pom.xml
new file mode 100644
index 0000000..4e68b4c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
+ or more ~ * contributor license agreements. See the NOTICE file distributed
+ with ~ * this work for additional information regarding copyright ownership.
+ ~ * The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ * (the "License"); you may not use this file except in compliance with
+ ~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
+ ~ * ~ * Unless required by applicable law or agreed to in writing, software
+ ~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ * See the License for the specific language governing permissions and ~
+ * limitations under the License. ~ */ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>io.sherlock</groupId>
+ <artifactId>alert-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>alert-engine-parent</artifactId>
+ <packaging>pom</packaging>
+
+ <modules>
+ <module>alert-engine-base</module>
+ </modules>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.sherlock</groupId>
+ <artifactId>alert-common</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-kafka</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.storm</groupId>
+ <artifactId>storm-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>${kafka.artifact.id}</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-client</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.netflix.archaius</groupId>
+ <artifactId>archaius-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-query-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-query-compiler</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-extension-regex</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.wso2.siddhi</groupId>
+ <artifactId>siddhi-extension-string</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.kafka</groupId>
+ <artifactId>kafka-clients</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.101tec</groupId>
+ <artifactId>zkclient</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>joda-time</groupId>
+ <artifactId>joda-time</artifactId>
+ </dependency>
+ <!--<dependency>-->
+ <!--<groupId>org.slf4j</groupId>-->
+ <!--<artifactId>slf4j-log4j12</artifactId>-->
+ <!--</dependency>-->
+ <!--<dependency>-->
+ <!--<groupId>org.slf4j</groupId>-->
+ <!--<artifactId>slf4j-api</artifactId>-->
+ <!--</dependency>-->
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ </dependencies>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
new file mode 100644
index 0000000..1dd3331
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
@@ -0,0 +1,2 @@
+/target/
+/target/
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
new file mode 100644
index 0000000..83749b5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
+ or more ~ * contributor license agreements. See the NOTICE file distributed
+ with ~ * this work for additional information regarding copyright ownership.
+ ~ * The ASF licenses this file to You under the Apache License, Version 2.0
+ ~ * (the "License"); you may not use this file except in compliance with
+ ~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
+ ~ * ~ * Unless required by applicable law or agreed to in writing, software
+ ~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ ~ * See the License for the specific language governing permissions and ~
+ * limitations under the License. ~ */ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+
+ <parent>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>alert-metadata-parent</artifactId>
+ <version>0.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>alert-metadata-service</artifactId>
+ <packaging>war</packaging>
+
+ <dependencies>
+ <!-- Storm depends on org.ow2.asm:asm:4.0 -->
+ <!-- Jersey depends on asm:asm:3.0 -->
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>alert-engine-base</artifactId>
+ <version>${project.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.ow2.asm</groupId>
+ <artifactId>asm</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.eagle</groupId>
+ <artifactId>alert-metadata</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-server</artifactId>
+ <!--<exclusions> -->
+ <!--<exclusion> -->
+ <!--<groupId>asm</groupId> -->
+ <!--<artifactId>asm</artifactId> -->
+ <!--</exclusion> -->
+ <!--</exclusions> -->
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey.contribs</groupId>
+ <artifactId>jersey-multipart</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-mapper-asl</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-jaxrs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.codehaus.jackson</groupId>
+ <artifactId>jackson-xc</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.fasterxml.jackson.core</groupId>
+ <artifactId>jackson-databind</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.typesafe</groupId>
+ <artifactId>config</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.slf4j</groupId>
+ <artifactId>slf4j-api</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.tomcat.embed</groupId>
+ <artifactId>tomcat-embed-core</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>io.swagger</groupId>
+ <artifactId>swagger-jaxrs</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>com.sun.jersey</groupId>
+ <artifactId>jersey-servlet</artifactId>
+ </dependency>
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.eclipse.jetty</groupId>
+ <artifactId>jetty-maven-plugin</artifactId>
+ <configuration>
+ <scanIntervalSeconds>5</scanIntervalSeconds>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+</project>
http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
new file mode 100644
index 0000000..94ec767
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.resource;
+
+import java.util.List;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
+
+/**
+ * @since Apr 11, 2016
+ *
+ */
+@Path("/metadata")
+@Produces("application/json")
+@Consumes("application/json")
+public class MetadataResource {
+
+ private IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
+
+ @Path("/clusters")
+ @GET
+ public List<StreamingCluster> listClusters() {
+ return dao.listClusters();
+ }
+
+ @Path("/clear")
+ @POST
+ public OpResult clear() {
+ return dao.clear();
+ }
+
+ @Path("/export")
+ @GET
+ public Models export() {
+ return dao.export();
+ }
+
+ @Path("/import")
+ @GET
+ public OpResult importModels(Models model) {
+ return dao.importModels(model);
+ }
+
+ @Path("/clusters")
+ @POST
+ public OpResult addCluster(StreamingCluster cluster) {
+ return dao.addCluster(cluster);
+ }
+
+ @Path("/clusters/{clusterId}")
+ @DELETE
+ public OpResult removeCluster(@PathParam("clusterId") String clusterId) {
+ return dao.removeCluster(clusterId);
+ }
+
+ @Path("/streams")
+ @GET
+ public List<StreamDefinition> listStreams() {
+ return dao.listStreams();
+ }
+
+ @Path("/streams")
+ @POST
+ public OpResult createStream(StreamDefinition stream) {
+ return dao.createStream(stream);
+ }
+
+ @Path("/streams/{streamId}")
+ @DELETE
+ public OpResult removeStream(@PathParam("streamId") String streamId) {
+ return dao.removeStream(streamId);
+ }
+
+ @Path("/datasources")
+ @GET
+ public List<Kafka2TupleMetadata> listDataSources() {
+ return dao.listDataSources();
+ }
+
+ @Path("/datasources")
+ @POST
+ public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
+ return dao.addDataSource(dataSource);
+ }
+
+ @Path("/datasources/{datasourceId}")
+ @DELETE
+ public OpResult removeDataSource(@PathParam("datasourceId") String datasourceId) {
+ return dao.removeDataSource(datasourceId);
+ }
+
+ @Path("/policies")
+ @GET
+ public List<PolicyDefinition> listPolicies() {
+ return dao.listPolicies();
+ }
+
+ @Path("/policies")
+ @POST
+ public OpResult addPolicy(PolicyDefinition policy) {
+ return dao.addPolicy(policy);
+ }
+
+ @Path("/policies/{policyId}")
+ @DELETE
+ public OpResult removePolicy(@PathParam("policyId") String policyId) {
+ return dao.removePolicy(policyId);
+ }
+
+ @Path("/publishments")
+ @GET
+ public List<Publishment> listPublishment() {
+ return dao.listPublishment();
+ }
+
+ @Path("/publishments")
+ @POST
+ public OpResult addPublishment(Publishment publishment) {
+ return dao.addPublishment(publishment);
+ }
+
+ @Path("/publishments/{pubId}")
+ @DELETE
+ public OpResult removePublishment(@PathParam("pubId") String pubId) {
+ return dao.removePublishment(pubId);
+ }
+
+ @Path("/publishmentTypes")
+ @GET
+ public List<PublishmentType> listPublishmentType() {
+ return dao.listPublishmentType();
+ }
+
+ @Path("/publishmentTypes")
+ @POST
+ public OpResult addPublishmentType(PublishmentType publishmentType) {
+ return dao.addPublishmentType(publishmentType);
+ }
+
+ @Path("/publishmentTypes/{pubType}")
+ @DELETE
+ public OpResult removePublishmentType(@PathParam("pubType") String pubType) {
+ return dao.removePublishmentType(pubType);
+ }
+
+ @Path("/schedulestates/{versionId}")
+ @GET
+ public ScheduleState listScheduleState(@PathParam("versionId") String versionId) {
+ return dao.getScheduleState(versionId);
+ }
+
+ @Path("/schedulestates")
+ @GET
+ public ScheduleState latestScheduleState() {
+ return dao.getScheduleState();
+ }
+
+ @Path("/schedulestates")
+ @POST
+ public OpResult addScheduleState(ScheduleState state) {
+ return dao.addScheduleState(state);
+ }
+
+ @Path("/assignments")
+ @GET
+ public List<PolicyAssignment> listAssignmenets() {
+ return dao.listAssignments();
+ }
+
+ @Path("/assignments")
+ @POST
+ public OpResult addAssignmenet(PolicyAssignment pa) {
+ return dao.addAssignment(pa);
+ }
+
+ @Path("/topologies")
+ @GET
+ public List<Topology> listTopologies() {
+ return dao.listTopologies();
+ }
+
+ @Path("/topologies")
+ @POST
+ public OpResult addTopology(Topology t) {
+ return dao.addTopology(t);
+ }
+
+ @Path("/topologies/{topologyName}")
+ @DELETE
+ public OpResult removeTopology(@PathParam("topologyName") String topologyName) {
+ return dao.removeTopology(topologyName);
+ }
+
+}