You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/06/01 06:00:03 UTC

[03/18] incubator-eagle git commit: EAGLE-324 Init branch-v0.5

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
new file mode 100644
index 0000000..279a041
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormParallelism.java
@@ -0,0 +1,161 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+/**
+ * Since 4/29/16.
+ */
+@SuppressWarnings({"serial", "rawtypes"})
+public class TestStormParallelism {
+    /**
+     * When run this test, please check the following through jstack and log
+     * 1) for blue-spout, num of executors is 2, # of tasks is 2
+     *
+     * Expected:
+     *
+     * a. 2 threads uniquely named Thread-*-blue-spout-executor[*,*]
+     * b. each thread will have single task
+     *
+     * 2) for green-bolt, num of executors is 2, # of tasks is 4
+     *
+     * Expected:
+     *
+     * a. 2 threads uniquely named Thread-*-green-bolt-executor[*,*]
+     * b. each thread will have 2 tasks
+     *
+     * 3) for yellow-bolt, num of executors is 6, # of tasks is 6
+     *
+     * Expected:
+     *
+     * a. 6 threads uniquely named Thread-*-yellow-bolt-executor[*,*]
+     * b. each thread will have 1 tasks
+     *
+     *
+     * Continue to think:
+     *
+     * For alter engine, if we use multiple tasks per component instead of one task per component,
+     * what will the parallelism mechanism affect?
+     *
+     * @throws Exception
+     */
+    @Ignore
+    @Test
+    public void testParallelism() throws Exception{
+        Config conf = new Config();
+        conf.setNumWorkers(2); // use two worker processes
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("blue-spout", new BlueSpout(), 2); // parallelism hint
+
+        topologyBuilder.setBolt("green-bolt", new GreenBolt(), 2)
+                .setNumTasks(4)
+                .shuffleGrouping("blue-spout");
+
+        topologyBuilder.setBolt("yellow-bolt", new YellowBolt(), 6)
+                .shuffleGrouping("green-bolt");
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
+
+        while(true) {
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static class BlueSpout extends BaseRichSpout{
+        static int count = 0;
+        public BlueSpout(){
+        }
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("a"));
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            count++;
+            System.out.println("# of spout objects " + count + ", current spout " + this);
+        }
+
+        @Override
+        public void nextTuple() {
+
+        }
+    }
+
+    private static class GreenBolt extends BaseRichBolt{
+        static int count;
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            count++;
+            System.out.println("# of green bolt objects " + count + ", current green bolt " + this);
+        }
+
+        @Override
+        public void execute(Tuple input) {
+
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("a"));
+        }
+    }
+
+    private static class YellowBolt extends BaseRichBolt{
+        static int count;
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+            count++;
+            System.out.println("# of yellow bolt objects " + count + ", current yellow bolt " + this);
+        }
+
+        @Override
+        public void execute(Tuple input) {
+
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("a"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
new file mode 100644
index 0000000..9d3892a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestStormStreamIdRouting.java
@@ -0,0 +1,140 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.junit.Ignore;
+import org.junit.Test;
+
+import backtype.storm.Config;
+import backtype.storm.LocalCluster;
+import backtype.storm.spout.SpoutOutputCollector;
+import backtype.storm.task.OutputCollector;
+import backtype.storm.task.TopologyContext;
+import backtype.storm.topology.OutputFieldsDeclarer;
+import backtype.storm.topology.TopologyBuilder;
+import backtype.storm.topology.base.BaseRichBolt;
+import backtype.storm.topology.base.BaseRichSpout;
+import backtype.storm.tuple.Fields;
+import backtype.storm.tuple.Tuple;
+
+
+/**
+ * Since 4/29/16.
+ */
+@SuppressWarnings({"serial", "rawtypes", "unused"})
+public class TestStormStreamIdRouting {
+    @Ignore
+    @Test
+    public void testRoutingByStreamId() throws Exception{
+        Config conf = new Config();
+        conf.setNumWorkers(2); // use two worker processes
+        TopologyBuilder topologyBuilder = new TopologyBuilder();
+        topologyBuilder.setSpout("blue-spout", new BlueSpout()); // parallelism hint
+
+        topologyBuilder.setBolt("green-bolt-1", new GreenBolt(1))
+                .shuffleGrouping("blue-spout", "green-bolt-stream-1");
+        topologyBuilder.setBolt("green-bolt-2", new GreenBolt(2))
+                .shuffleGrouping("blue-spout", "green-bolt-stream-2");
+
+        LocalCluster cluster = new LocalCluster();
+        cluster.submitTopology("mytopology", new HashMap(), topologyBuilder.createTopology());
+
+        while(true) {
+            try {
+                Thread.sleep(1000);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        }
+    }
+
+    private static class BlueSpout extends BaseRichSpout {
+        int count = 0;
+        private SpoutOutputCollector collector;
+        public BlueSpout(){
+        }
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declareStream("green-bolt-stream-1", new Fields("a"));
+            declarer.declareStream("green-bolt-stream-2", new Fields("a"));
+        }
+
+        @Override
+        public void open(Map conf, TopologyContext context, SpoutOutputCollector collector) {
+            this.collector = collector;
+        }
+
+        @Override
+        public void nextTuple() {
+            if(count % 2 == 0) {
+                this.collector.emit("green-bolt-stream-1", Arrays.asList("testdata" + count));
+                count++;
+            }else{
+                this.collector.emit("green-bolt-stream-2", Arrays.asList("testdata" + count));
+                count++;
+            }
+            try{
+                Thread.sleep(10000);
+            }catch(Exception ex){
+
+            }
+        }
+    }
+
+    private static class GreenBolt extends BaseRichBolt {
+        private int id;
+        public GreenBolt(int id){
+            this.id = id;
+        }
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        }
+
+        @Override
+        public void execute(Tuple input) {
+            System.out.println("bolt " + id + " received data " + input.getString(0));
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("a"));
+        }
+    }
+
+    private static class YellowBolt extends BaseRichBolt{
+        @Override
+        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+        }
+
+        @Override
+        public void execute(Tuple input) {
+
+        }
+
+        @Override
+        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+            declarer.declare(new Fields("a"));
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
new file mode 100644
index 0000000..b0e5c4a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestTuple2StreamConverter.java
@@ -0,0 +1,62 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Properties;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.Tuple2StreamConverter;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * Since 5/3/16.
+ */
+public class TestTuple2StreamConverter {
+    @SuppressWarnings({ "unchecked", "rawtypes" })
+    @Test
+    public void test(){
+        Tuple2StreamMetadata metadata = new Tuple2StreamMetadata();
+        Set activeStreamNames = new HashSet<>();
+        activeStreamNames.add("defaultStringStream");
+        metadata.setStreamNameSelectorCls("org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector");
+        metadata.setStreamNameSelectorProp(new Properties());
+        metadata.getStreamNameSelectorProp().put("userProvidedStreamName", "defaultStringStream");
+        metadata.setActiveStreamNames(activeStreamNames);
+        metadata.setTimestampColumn("timestamp");
+        Tuple2StreamConverter convert = new Tuple2StreamConverter(metadata);
+        String topic = "testTopic";
+        Map m = new HashMap<>();
+        m.put("value", "IAmPlainString");
+        long t = System.currentTimeMillis();
+        m.put("timestamp", t);
+        List<Object> ret = convert.convert(Arrays.asList(topic, m));
+        Assert.assertEquals(topic, ret.get(0));
+        Assert.assertEquals("defaultStringStream", ret.get(1));
+        Assert.assertEquals(t, ret.get(2));
+        Assert.assertEquals(m, ret.get(3));
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
new file mode 100644
index 0000000..7132f5a
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/topology/TestUnitTopologyMain.java
@@ -0,0 +1,57 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.engine.topology;
+
+import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * Since 5/4/16.
+ */
+public class TestUnitTopologyMain {
+    @Ignore
+    @Test
+    public void testTopologyRun(){
+        testTopologyRun("/application-test.conf");
+    }
+
+    public void testTopologyRun(String configResourceName){
+        ConfigFactory.invalidateCaches();
+        System.setProperty("config.resource", configResourceName);
+        System.out.print("Set config.resource = "+configResourceName);
+        Config config = ConfigFactory.load();
+        String topologyId = config.getString("topology.name");
+        MockMetadataChangeNotifyService changeNotifyService =
+                new MockMetadataChangeNotifyService(topologyId,"alertEngineSpout");
+        new UnitTopologyRunner(changeNotifyService).run(topologyId,config);
+    }
+
+    public static void main(String[] args){
+        if(args.length>0) {
+            new TestUnitTopologyMain().testTopologyRun(args[0]);
+        } else {
+            new TestUnitTopologyMain().testTopologyRun();
+        }
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
new file mode 100644
index 0000000..0861597
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.utils;
+
+import org.junit.Assert;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.io.IOException;
+
+public class CompressionUtilsTest {
+    private final static Logger LOG = LoggerFactory.getLogger(CompressionUtilsTest.class);
+
+    @Test
+    public void testCompressAndDecompress() throws IOException {
+        String value = "http://www.apache.org/licenses/LICENSE-2.0";
+        byte[] original = value.getBytes();
+        byte[] compressed = CompressionUtils.compress(original);
+        byte[] decompressed = CompressionUtils.decompress(compressed);
+
+        LOG.info("original size: {}",original.length);
+        LOG.info("compressed size: {}",compressed.length);
+        LOG.info("decompressed size: {}",decompressed.length);
+
+        String decompressedValue = new String(decompressed);
+        Assert.assertEquals(value,decompressedValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
new file mode 100644
index 0000000..cad53d4
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/java/org/apache/eagle/alert/engine/utils/TimePeriodUtilsTest.java
@@ -0,0 +1,91 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.utils;
+
+import java.text.ParseException;
+
+import org.apache.eagle.alert.utils.DateTimeUtil;
+import org.apache.eagle.alert.utils.TimePeriodUtils;
+import org.joda.time.Period;
+import org.joda.time.Seconds;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class TimePeriodUtilsTest {
+    @Test
+    public void testJodaTimePeriod() throws ParseException {
+        String periodText = "PT10m";
+        Period period = new Period(periodText);
+        int seconds = period.toStandardSeconds().getSeconds();
+        Assert.assertEquals(600, seconds);
+        Assert.assertEquals(60, period.toStandardSeconds().dividedBy(10).getSeconds());
+    }
+
+    @Test
+    public void testJodaTimePeriodBySeconds() throws ParseException {
+        String periodText = "PT10s";
+        Period period = new Period(periodText);
+        int seconds = period.toStandardSeconds().getSeconds();
+        Assert.assertEquals(10, seconds);
+    }
+
+    @Test
+    public void testFormatSecondsByPeriod15M() throws ParseException {
+
+        Period period = new Period("PT15m");
+        Seconds seconds = period.toStandardSeconds();
+        Assert.assertEquals(15*60,seconds.getSeconds());
+
+        long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
+        long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:45:00");
+        long result = TimePeriodUtils.formatSecondsByPeriod(time,seconds);
+        Assert.assertEquals(expect,result);
+
+        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
+        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+        Assert.assertEquals(expect,result);
+
+        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
+        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+        Assert.assertEquals(expect,result);
+    }
+
+    @Test
+     public void testFormatSecondsByPeriod1H() throws ParseException {
+
+        Period period = new Period("PT1h");
+        Seconds seconds = period.toStandardSeconds();
+        Assert.assertEquals(60*60,seconds.getSeconds());
+
+        long time = DateTimeUtil.humanDateToSeconds("2015-07-01 13:56:12");
+        long expect = DateTimeUtil.humanDateToSeconds("2015-07-01 13:00:00");
+        long result = TimePeriodUtils.formatSecondsByPeriod(time,seconds);
+        Assert.assertEquals(expect,result);
+
+        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:14:59");
+        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+        Assert.assertEquals(expect,result);
+
+        time = DateTimeUtil.humanDateToSeconds("2015-07-01 03:30:59");
+        expect = DateTimeUtil.humanDateToSeconds("2015-07-01 03:00:00");
+        result = TimePeriodUtils.formatSecondsByPeriod(time, seconds);
+        Assert.assertEquals(expect,result);
+    }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf
new file mode 100644
index 0000000..13d4ee1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-integration.conf
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers": 20,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+	"context" : "/api",
+	"host" : "localhost",
+	"port" : 8080
+  },
+  "coordinatorService": {
+  	"host": "localhost",
+  	"port": "9090",
+  	"context" : "/api"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf
new file mode 100755
index 0000000..2c99a68
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test-backup.conf
@@ -0,0 +1,71 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_bug",
+    "numOfSpoutTasks" : 3,
+    "numOfRouterBolts" : 6,
+    "numOfAlertBolts" : 6,
+    "numOfPublishTasks" : 1,
+    "numOfTotalWorkers":1,
+    "messageTimeoutSecs": 30,     // topology.message.timeout.secs: 30 by default
+    "localMode" : true
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "10.64.243.71:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "10.64.243.71:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/api",
+    "host" : "localhost",
+    "port" : 8080
+  },
+  "metric":{
+    "tags":{
+      "topologyName":"alertUnitTopology_1"
+    }
+    "sink": {
+//      "kafka": {
+//        "topic": "alert_metric_test"
+//        "bootstrap.servers": "localhost:9092"
+//      }
+      "logger": {
+        "level":"DEBUG"
+      }
+      "elasticsearch": {
+        "hosts": ["10.64.223.222:9200"]
+        "index": "alert_metric_test"
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf
new file mode 100755
index 0000000..f4a797e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/application-test.conf
@@ -0,0 +1,68 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1_test",
+    "numOfSpoutTasks" : 3,
+    "numOfRouterBolts" : 6,
+    "numOfAlertBolts" : 6,
+    "numOfPublishTasks" : 1,
+    "numOfTotalWorkers":1,
+    "messageTimeoutSecs": 30,     // topology.message.timeout.secs: 30 by default
+    "localMode" : true
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/api",
+    "host" : "localhost",
+    "port" : 8080
+  },
+  "metric":{
+    "sink": {
+//      "kafka": {
+//        "topic": "alert_metric_test"
+//        "bootstrap.servers": "localhost:9092"
+//      }
+      "logger": {
+        "level":"INFO"
+      }
+      "elasticsearch": {
+        "hosts": ["10.64.223.222:9200"]
+        "index": "alert_metric_test"
+      }
+    }
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf
new file mode 100644
index 0000000..bb998cd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/application-integration-2.conf
@@ -0,0 +1,57 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_2",
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "numOfTotalWorkers": 20,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+	"context" : "/api",
+	"host" : "localhost",
+	"port" : 8080
+  },
+  "coordinatorService": {
+  	"host": "localhost",
+  	"port": "9090",
+  	"context" : "/api"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json
new file mode 100644
index 0000000..946acd6
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/datasources.json
@@ -0,0 +1,37 @@
+[
+{
+	"name": "eslog_datasource",
+	"type": "KAFKA",
+	"properties": {
+	},
+	"topic": "eslogs",
+	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+	"codec": {
+		"streamNameSelectorProp": {
+			"userProvidedStreamName" : "esStream",
+			"streamNameFormat":"%s"
+		},
+		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+		"timestampColumn": "timestamp",
+		"timestampFormat":""
+	}
+},
+{
+	"name": "bootfailure_datasource",
+	"type": "KAFKA",
+	"properties": {
+	},
+	"topic": "bootfailures",
+	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+	"codec": {
+		"streamNameSelectorProp": {
+			"userProvidedStreamName" : "ifStream",
+			"streamNameFormat":"%s"
+		},
+		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+		"timestampColumn": "timestamp",
+		"timestampFormat":""
+	}
+}
+
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json
new file mode 100644
index 0000000..4702575
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/policies.json
@@ -0,0 +1,39 @@
+[
+{
+	"name": "logStreamJoinBootFailure",
+	"description" : "policy to check host perfmon_cpu",
+	"inputStreams": [
+		"esStream",
+		"ifStream"
+	],
+	"outputStreams": [
+		"log_stream_join_output"
+	],
+	"definition": {
+		"type": "siddhi",
+		"value": " from esStream#window.externalTime(timestamp, 20 min) as a join ifStream#window.externalTime(timestamp, 5 min) as b on a.instanceUuid == b.instanceUuid select logLevel, a.host as aHost, a.component, a.message as logMessage, b.message as failMessage, a.timestamp as t1, b.timestamp as t2, b.host as bHost, count(1) as errorCount insert into log_stream_join_output;  "
+	},
+	"partitionSpec": [
+		{
+			"streamId" : "esStream",
+			"type" : "GROUPBY",
+			"columns" : [
+				"instanceUuid"
+			],
+			"sortSpec": {
+				"windowPeriod" : "PT1M"
+			}
+		},
+		{
+			"streamId" : "ifStream",
+			"type" : "GROUPBY",
+			"columns" : [
+				"instanceUuid"
+			],
+			"sortSpec": {
+				"windowPeriod" : "PT1M"
+			}
+		}
+	]
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json
new file mode 100644
index 0000000..7b531fc
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/publishments.json
@@ -0,0 +1,17 @@
+[
+{
+	"name":"log-stream-join-output",
+	"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+	"inputStreams": [
+		"log_stream_join_output"
+	],
+	"properties": {
+		"subject":"UMP Test Alert",
+        "template":"",
+	  "sender": "sender@corp.com",
+	  "recipients": "receiver@corp.com",
+	  "smtp.server":"mailhost.com"
+	},
+	"dedupIntervalMin" : "1"
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json
new file mode 100644
index 0000000..0a26a28
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/streamdefinitions.json
@@ -0,0 +1,93 @@
+[
+{
+	"streamId": "esStream",
+	"dataSource" : "eslog_datasource",
+	"description":"the data stream for es log of different modules",
+	"validate": false,
+	"timeseries":false,
+	"columns": [
+		{
+			"name": "instanceUuid",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "timestamp",
+			"type" : "long",
+			"defaultValue": 0,
+			"required":true
+		},
+		{
+			"name": "logLevel",
+			"type" : "string",
+			"defaultValue": "ERROR",
+			"required": true
+		},
+		{
+			"name": "message",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "reqId",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "host",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "component",
+			"type" : "string",
+			"defaultValue": "nova",
+			"required":true
+		}
+	]
+}
+,
+{
+	"streamId": "ifStream",
+	"dataSource" : "bootfailure_datasource",
+	"description":"the data stream for boot failure(instance fault)",
+	"validate": false,
+	"timeseries":false,
+	"columns": [
+		{
+			"name": "instanceUuid",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "timestamp",
+			"type" : "long",
+			"defaultValue": 0,
+			"required":true
+		},
+		{
+			"name": "reqId",
+			"type" : "string",
+			"defaultValue": "",
+			"required": true
+		},
+		{
+			"name": "message",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "host",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		}
+	]
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json
new file mode 100644
index 0000000..9aa8716
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/correlation/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+	"name": "alertUnitTopology_2",
+	"numOfSpout":1,
+	"numOfGroupBolt": 4,
+	"numOfAlertBolt": 10,
+	"spoutId": "alertEngineSpout",
+	"groupNodeIds" : [
+		"streamRouterBolt0",
+		"streamRouterBolt1",
+		"streamRouterBolt2",
+		"streamRouterBolt3"
+	],
+	"alertBoltIds": [
+		"alertBolt0",
+		"alertBolt1",
+		"alertBolt2",
+		"alertBolt3",
+		"alertBolt4",
+		"alertBolt5",
+		"alertBolt6",
+		"alertBolt7",
+		"alertBolt8",
+		"alertBolt9"
+	],
+	"pubBoltId" : "alertPublishBolt",
+	"spoutParallelism": 1,
+	"groupParallelism": 1,
+	"alertParallelism": 1
+}
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json
new file mode 100644
index 0000000..77a280c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/datasources.json
@@ -0,0 +1,19 @@
+[
+{
+	"name": "perfmon_datasource",
+	"type": "KAFKA",
+	"properties": {
+	},
+	"topic": "perfmon_metrics",
+	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+	"codec": {
+		"streamNameSelectorProp": {
+			"fieldNamesToInferStreamName" : "metric",
+			"streamNameFormat":"%s"
+		},
+		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+		"timestampColumn": "timestamp",
+		"timestampFormat":""
+	}
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties
new file mode 100644
index 0000000..3ba587d
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/log4j.properties
@@ -0,0 +1,23 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
+
+#log4j.logger.org.apache.eagle.alert.metric=DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json
new file mode 100644
index 0000000..5edece9
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/policies.json
@@ -0,0 +1,54 @@
+[
+{
+	"name": "perfmon_cpu_host_check",
+	"description" : "policy to check host perfmon_cpu",
+	"inputStreams": [
+		"perfmon_cpu_stream"
+	],
+	"outputStreams": [
+		"perfmon_cpu_check_output"
+	],
+	"definition": {
+		"type": "siddhi",
+		"value": "from perfmon_cpu_stream[value > 90.0] select * group by host insert into perfmon_cpu_check_output;"
+	},
+	"partitionSpec": [
+		{
+			"streamId" : "perfmon_cpu_stream",
+			"type" : "GROUPBY",
+			"columns" : [
+				"host"
+			],
+			"sortSpec": {
+				"windowPeriod" : "PT1M"
+			}
+		}
+	]
+},
+{
+	"name": "perfmon_cpu_pool_check",
+	"description" : "policy to check pool perfmon_cpu",
+	"inputStreams": [
+		"perfmon_cpu_stream"
+	],
+	"outputStreams": [
+		"perfmon_cpu_check_output"
+	],
+	"definition": {
+		"type": "siddhi",
+		"value": "from perfmon_cpu_stream[value > 75.0] select * group by pool insert into perfmon_cpu_check_output;"
+	},
+	"partitionSpec": [
+		{
+			"streamId" : "perfmon_cpu_stream",
+			"type" : "GROUPBY",
+			"columns" : [
+				"pool"
+			],
+			"sortSpec": {
+				"windowPeriod" : "PT1M"
+			}
+		}
+	]
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json
new file mode 100644
index 0000000..b3840a5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments.json
@@ -0,0 +1,29 @@
+[
+{
+	"name":"test-stream-output",
+	"type":"org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+	"policyIds": [
+		"perfmon_cpu_host_check", "perfmon_cpu_pool_check"
+	],
+	"properties": {
+	  "subject":"UMP Test Alert",
+	  "template":"",
+	  "sender": "sender@corp.com",
+	  "recipients": "receiver@corp.com",
+	  "smtp.server":"mailhost.com",
+	  "connection": "plaintext",
+	  "smtp.port": "25"
+	},
+	"dedupIntervalMin" : "PT0M"
+},
+{
+  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+  "name":"kafka-testAlertStream",
+  "policyIds": ["perfmon_cpu_host_check"],
+  "dedupIntervalMin": "PT1M",
+  "properties":{
+    "kafka_broker":"sandbox.hortonworks.com:6667",
+    "topic":"test_kafka"
+  }
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json
new file mode 100644
index 0000000..277ba75
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/publishments2.json
@@ -0,0 +1,19 @@
+[
+{
+	"name":"test-stream-output",
+	"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+	"policyIds": [
+		"policy1", "policy2"
+	],
+	"properties": {
+	  "subject":"UMP Test Alert",
+	  "template":"",
+	  "sender": "sender@corp.com",
+	  "recipients": "receiver@corp.com",
+	  "smtp.server":"mailhost.com",
+	  "connection": "plaintext",
+	  "smtp.port": "25"
+	},
+	"dedupIntervalMin" : "PT0M"
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json
new file mode 100644
index 0000000..c63b9ff
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/sample_perfmon_data.json
@@ -0,0 +1,3 @@
+[
+{"host": "", "timestamp" : "", "metric" : "", "pool": "", "value": 1.0, "colo": "phx"},
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json
new file mode 100644
index 0000000..cbeae19
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/streamdefinitions.json
@@ -0,0 +1,44 @@
+[
+{
+	"streamId": "perfmon_cpu_stream",
+	"dataSource" : "perfmon_datasource",
+	"description":"the data stream for perfmon cpu metrics",
+	"validate": false,
+	"timeseries":false,
+	"columns": [
+		{
+			"name": "host",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "timestamp",
+			"type" : "long",
+			"defaultValue": 0,
+			"required":true
+		},{
+			"name": "metric",
+			"type" : "string",
+			"defaultValue": "perfmon_cpu",
+			"required": true
+		},{
+			"name": "pool",
+			"type" : "string",
+			"defaultValue": "raptor_general",
+			"required":true
+		},{
+			"name": "value",
+			"type" : "double",
+			"defaultValue": 0.0,
+			"required":true
+		},
+		{
+			"name": "colo",
+			"type" : "string",
+			"defaultValue": "",
+			"required":true
+		}
+	]
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json
new file mode 100644
index 0000000..fd255cd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testAlertBoltSpec.json
@@ -0,0 +1,92 @@
+{
+  "version": "version1",
+  "topologyName": "testTopology",
+  "boltPoliciesMap": {
+    "alertBolt0": [
+      {
+        "name": "policy1",
+        "description": null,
+        "inputStreams": [
+          "testTopic3Stream"
+        ],
+        "outputStreams": [
+          "testAlertStream"
+        ],
+        "definition": {
+          "type": "siddhi",
+          "value": "from testTopic3Stream[value=='xyz'] select value insert into testAlertStream;"
+        },
+        "partitionSpec": [
+          {
+            "streamId": "testTopic3Stream",
+            "type": "GROUPBY",
+            "columns": [
+              "value"
+            ],
+            "sortSpec": {
+              "windowPeriod": "PT10S",
+              "windowMargin": 1000
+            }
+          }
+        ],
+        "parallelismHint": 0
+      },
+      {
+        "name": "policy2",
+        "description": null,
+        "inputStreams": [
+          "testTopic4Stream"
+        ],
+        "outputStreams": [
+          "testAlertStream"
+        ],
+        "definition": {
+          "type": "siddhi",
+          "value": "from testTopic4Stream[value=='xyz'] select value insert into testAlertStream;"
+        },
+        "partitionSpec": [
+          {
+            "streamId": "testTopic4Stream",
+            "type": "GROUPBY",
+            "columns": [
+              "value"
+            ],
+            "sortSpec": {
+              "windowPeriod": "PT10S",
+              "windowMargin": 1000
+            }
+          }
+        ],
+        "parallelismHint": 0
+      },
+      {
+        "name": "policy3",
+        "description": null,
+        "inputStreams": [
+          "testTopic5Stream"
+        ],
+        "outputStreams": [
+          "testAlertStream"
+        ],
+        "definition": {
+          "type": "siddhi",
+          "value": "from testTopic5Stream[value=='xyz'] select value insert into testAlertStream;"
+        },
+        "partitionSpec": [
+          {
+            "streamId": "testTopic5Stream",
+            "type": "GROUPBY",
+            "columns": [
+              "value"
+            ],
+            "sortSpec": {
+              "windowPeriod": "PT10S",
+              "windowMargin": 1000
+            }
+          }
+        ],
+        "parallelismHint": 0
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json
new file mode 100644
index 0000000..220c8fe
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec.json
@@ -0,0 +1,32 @@
+{
+  "version": "version1",
+  "topologyName": "testTopology",
+  "boltId": "alertPublishBolt",
+  "publishments": [
+    {
+      "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+      "name":"email-testAlertStream",
+      "policyIds": ["policy1", "policy2", "policy3"],
+      "dedupIntervalMin": "PT1M",
+      "properties":{
+        "subject":"UMP Test Alert",
+        "template":"",
+	  "sender": "sender@corp.com",
+	  "recipients": "receiver@corp.com",
+	  "smtp.server":"mailhost.com",
+        "connection": "plaintext",
+        "smtp.port": "25"
+      }
+    }
+/*    {
+      "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+      "name":"kafka-testAlertStream",
+      "policyIds": ["testPolicy"],
+      "dedupIntervalMin": "PT1M",
+      "properties":{
+        "kafka_broker":"sandbox.hortonworks.com:6667",
+        "topic":"test_kafka"
+      }
+    }*/
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json
new file mode 100644
index 0000000..0a16540
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testPublishSpec2.json
@@ -0,0 +1,32 @@
+{
+  "version": "version1",
+  "topologyName": "testTopology",
+  "boltId": "alertPublishBolt",
+  "publishments": [
+    {
+      "type": "org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+      "name":"email-testAlertStream",
+      "policyIds": ["policy1"],
+      "dedupIntervalMin": "PT2M",
+      "properties":{
+        "subject":"UMP Test Alert",
+        "template":"",
+	  "sender": "sender@corp.com",
+	  "recipients": "receiver@corp.com",
+	  "smtp.server":"mailhost.com",
+        "connection": "plaintext",
+        "smtp.port": "25"
+      }
+    }
+//    {
+//      "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+//      "name":"kafka-testAlertStream",
+//      "policyIds": ["testPolicy"],
+//      "dedupIntervalMin": "PT1M",
+//      "properties":{
+//        "kafka_broker":"sandbox.hortonworks.com:6667",
+//        "topic":"test_kafka"
+//      }
+//    }
+  ]
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json
new file mode 100644
index 0000000..c97ec43
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testSpoutSpec.json
@@ -0,0 +1,139 @@
+{
+  "version": null,
+  "topologyId": "testTopology",
+  "kafka2TupleMetadataMap": {
+    "testTopic5": {
+      "type": null,
+      "name": "testTopic5",
+      "properties": null,
+      "topic": "testTopic5",
+      "schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+      "codec": null
+    },
+    "testTopic3": {
+      "type": null,
+      "name": "testTopic3",
+      "properties": null,
+      "topic": "testTopic3",
+      "schemeCls": "org.apache.eagle.alert.engine.scheme.PlainStringScheme",
+      "codec": null
+    },
+    "testTopic4": {
+      "type": null,
+      "name": "testTopic4",
+      "properties": null,
+      "topic": "testTopic4",
+      "schemeCls": "org.apache.eagle.alert.engine.scheme.PlainStringScheme",
+      "codec": null
+    }
+  },
+  "tuple2StreamMetadataMap": {
+    "testTopic5": {
+      "activeStreamNames": [
+        "testTopic5Stream"
+      ],
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "testTopic5Stream"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": null
+    },
+    "testTopic3": {
+      "activeStreamNames": [
+        "testTopic3Stream"
+      ],
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "testTopic3Stream"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": null
+    },
+    "testTopic4": {
+      "activeStreamNames": [
+        "testTopic4Stream"
+      ],
+      "streamNameSelectorProp": {
+        "userProvidedStreamName": "testTopic4Stream"
+      },
+      "streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.PlainStringStreamNameSelector",
+      "timestampColumn": "timestamp",
+      "timestampFormat": null
+    }
+  },
+  "streamRepartitionMetadataMap": {
+    "testTopic5": [
+      {
+        "topicName": "testTopic5",
+        "streamId": "defaultStringStream",
+        "groupingStrategies": [
+          {
+            "partition": {
+              "streamId": "testTopic5Stream",
+              "type": "GROUPBY",
+              "columns": [
+                "value"
+              ],
+              "sortSpec": {
+                "windowPeriod": "PT10S",
+                "windowMargin": 1000
+              }
+            },
+            "numTotalParticipatingRouterBolts": 1,
+            "startSequence": 0,
+            "totalTargetBoltIds": []
+          }
+        ]
+      }
+    ],
+    "testTopic3": [
+      {
+        "topicName": "testTopic3",
+        "streamId": "defaultStringStream",
+        "groupingStrategies": [
+          {
+            "partition": {
+              "streamId": "testTopic3Stream",
+              "type": "GROUPBY",
+              "columns": [
+                "value"
+              ],
+              "sortSpec": {
+                "windowPeriod": "PT10S",
+                "windowMargin": 1000
+              }
+            },
+            "numTotalParticipatingRouterBolts": 1,
+            "startSequence": 0,
+            "totalTargetBoltIds": []
+          }
+        ]
+      }
+    ],
+    "testTopic4": [
+      {
+        "topicName": "testTopic4",
+        "streamId": "defaultStringStream",
+        "groupingStrategies": [
+          {
+            "partition": {
+              "streamId": "testTopic4Stream",
+              "type": "GROUPBY",
+              "columns": [
+                "value"
+              ],
+              "sortSpec": {
+                "windowPeriod": "PT10S",
+                "windowMargin": 1000
+              }
+            },
+            "numTotalParticipatingRouterBolts": 1,
+            "startSequence": 0,
+            "totalTargetBoltIds": []
+          }
+        ]
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json
new file mode 100644
index 0000000..6b3ccff
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamDefinitionsSpec.json
@@ -0,0 +1,47 @@
+{
+  "testTopic5Stream": {
+    "streamId": "testTopic5Stream",
+    "dataSource": null,
+    "description": null,
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "value",
+        "type": "STRING",
+        "defaultValue": null,
+        "required": false
+      }
+    ]
+  },
+  "testTopic4Stream": {
+    "streamId": "testTopic4Stream",
+    "dataSource": null,
+    "description": null,
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "value",
+        "type": "STRING",
+        "defaultValue": null,
+        "required": false
+      }
+    ]
+  },
+  "testTopic3Stream": {
+    "streamId": "testTopic3Stream",
+    "dataSource": null,
+    "description": null,
+    "validate": false,
+    "timeseries": false,
+    "columns": [
+      {
+        "name": "value",
+        "type": "STRING",
+        "defaultValue": null,
+        "required": false
+      }
+    ]
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
new file mode 100644
index 0000000..f4e72bf
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/testStreamRouterBoltSpec.json
@@ -0,0 +1,123 @@
+{
+  "version": null,
+  "topologyName": "testTopology",
+  "routerSpecs": [
+    {
+      "streamId": "testTopic3Stream",
+      "partition": {
+        "streamId": "testTopic3Stream",
+        "type": "GROUPBY",
+        "columns": [
+          "value"
+        ],
+        "sortSpec": {
+          "windowPeriod": "PT10S",
+          "windowMargin": 1000
+        }
+      },
+      "targetQueue": [
+        {
+          "partition": {
+            "streamId": "testTopic3Stream",
+            "type": "GROUPBY",
+            "columns": [
+              "value"
+            ],
+            "sortSpec": {
+              "windowPeriod": "PT10S",
+              "windowMargin": 1000
+            }
+          },
+          "workers": [
+            {
+              "topologyName": "testTopology",
+              "boltId": "alertBolt0"
+            },
+            {
+              "topologyName": "testTopology",
+              "boltId": "alertBolt1"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "streamId": "testTopic4Stream",
+      "partition": {
+        "streamId": "testTopic4Stream",
+        "type": "GROUPBY",
+        "columns": [
+          "value"
+        ],
+        "sortSpec": {
+          "windowPeriod": "PT10S",
+          "windowMargin": 1000
+        }
+      },
+      "targetQueue": [
+        {
+          "partition": {
+            "streamId": "testTopic4Stream",
+            "type": "GROUPBY",
+            "columns": [
+              "value"
+            ],
+            "sortSpec": {
+              "windowPeriod": "PT10S",
+              "windowMargin": 1000
+            }
+          },
+          "workers": [
+            {
+              "topologyName": "testTopology",
+              "boltId": "alertBolt0"
+            },
+            {
+              "topologyName": "testTopology",
+              "boltId": "alertBolt1"
+            }
+          ]
+        }
+      ]
+    },
+    {
+      "streamId": "testTopic5Stream",
+      "partition": {
+        "streamId": "testTopic5Stream",
+        "type": "GROUPBY",
+        "columns": [
+          "value"
+        ],
+        "sortSpec": {
+          "windowPeriod": "PT10S",
+          "windowMargin": 1000
+        }
+      },
+      "targetQueue": [
+        {
+          "partition": {
+            "streamId": "testTopic5Stream",
+            "type": "GROUPBY",
+            "columns": [
+              "value"
+            ],
+            "sortSpec": {
+              "windowPeriod": "PT10S",
+              "windowMargin": 1000
+            }
+          },
+          "workers": [
+            {
+              "topologyName": "testTopology",
+              "boltId": "alertBolt0"
+            },
+            {
+              "topologyName": "testTopology",
+              "boltId": "alertBolt1"
+            }
+          ]
+        }
+      ]
+    }
+  ]
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
new file mode 100644
index 0000000..b49d6ad
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topic.json
@@ -0,0 +1 @@
+nn_jmx_metric_sandbox
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
new file mode 100644
index 0000000..411cc48
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/alert-engine-base/src/test/resources/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+	"name": "alertUnitTopology_1",
+	"numOfSpout":1,
+	"numOfAlertBolt": 10,
+	"numOfGroupBolt": 4,
+	"spoutId": "alertEngineSpout",
+	"groupNodeIds" : [
+		"streamRouterBolt0",
+		"streamRouterBolt1",
+		"streamRouterBolt2",
+		"streamRouterBolt3"
+	],
+	"alertBoltIds": [
+		"alertBolt0",
+		"alertBolt1",
+		"alertBolt2",
+		"alertBolt3",
+		"alertBolt4",
+		"alertBolt5",
+		"alertBolt6",
+		"alertBolt7",
+		"alertBolt8",
+		"alertBolt9"
+	],
+	"pubBoltId" : "alertPublishBolt",
+	"spoutParallelism": 1,
+	"groupParallelism": 1,
+	"alertParallelism": 1
+}
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-engine/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-engine/pom.xml b/eagle-core/eagle-alert/alert/alert-engine/pom.xml
new file mode 100644
index 0000000..4e68b4c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-engine/pom.xml
@@ -0,0 +1,110 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
+	or more ~ * contributor license agreements. See the NOTICE file distributed
+	with ~ * this work for additional information regarding copyright ownership.
+	~ * The ASF licenses this file to You under the Apache License, Version 2.0
+	~ * (the "License"); you may not use this file except in compliance with
+	~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
+	~ * ~ * Unless required by applicable law or agreed to in writing, software
+	~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
+	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+	~ * See the License for the specific language governing permissions and ~
+	* limitations under the License. ~ */ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>io.sherlock</groupId>
+		<artifactId>alert-parent</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>alert-engine-parent</artifactId>
+	<packaging>pom</packaging>
+
+	<modules>
+		<module>alert-engine-base</module>
+	</modules>
+
+	<dependencies>
+		<dependency>
+			<groupId>io.sherlock</groupId>
+			<artifactId>alert-common</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-kafka</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.storm</groupId>
+			<artifactId>storm-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>${kafka.artifact.id}</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-client</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-jaxrs</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.netflix.archaius</groupId>
+			<artifactId>archaius-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-query-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-query-compiler</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-extension-regex</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.wso2.siddhi</groupId>
+			<artifactId>siddhi-extension-string</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.kafka</groupId>
+			<artifactId>kafka-clients</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.101tec</groupId>
+			<artifactId>zkclient</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>joda-time</groupId>
+			<artifactId>joda-time</artifactId>
+		</dependency>
+		<!--<dependency>-->
+			<!--<groupId>org.slf4j</groupId>-->
+			<!--<artifactId>slf4j-log4j12</artifactId>-->
+		<!--</dependency>-->
+		<!--<dependency>-->
+			<!--<groupId>org.slf4j</groupId>-->
+			<!--<artifactId>slf4j-api</artifactId>-->
+		<!--</dependency>-->
+		<dependency>
+			<groupId>com.typesafe</groupId>
+			<artifactId>config</artifactId>
+		</dependency>
+	</dependencies>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
new file mode 100644
index 0000000..1dd3331
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/.gitignore
@@ -0,0 +1,2 @@
+/target/
+/target/

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
new file mode 100644
index 0000000..83749b5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/pom.xml
@@ -0,0 +1,113 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one 
+	or more ~ * contributor license agreements. See the NOTICE file distributed 
+	with ~ * this work for additional information regarding copyright ownership. 
+	~ * The ASF licenses this file to You under the Apache License, Version 2.0 
+	~ * (the "License"); you may not use this file except in compliance with 
+	~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0 
+	~ * ~ * Unless required by applicable law or agreed to in writing, software 
+	~ * distributed under the License is distributed on an "AS IS" BASIS, ~ * 
+	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+	~ * See the License for the specific language governing permissions and ~ 
+	* limitations under the License. ~ */ -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+	<modelVersion>4.0.0</modelVersion>
+
+	<parent>
+		<groupId>org.apache.eagle</groupId>
+		<artifactId>alert-metadata-parent</artifactId>
+		<version>0.0.1-SNAPSHOT</version>
+	</parent>
+
+	<artifactId>alert-metadata-service</artifactId>
+	<packaging>war</packaging>
+
+	<dependencies>
+		<!-- Storm depends on org.ow2.asm:asm:4.0 -->
+		<!-- Jersey depends on asm:asm:3.0 -->
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>alert-engine-base</artifactId>
+			<version>${project.version}</version>
+			<exclusions>
+				<exclusion>
+					<groupId>org.ow2.asm</groupId>
+					<artifactId>asm</artifactId>
+				</exclusion>
+			</exclusions>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.eagle</groupId>
+			<artifactId>alert-metadata</artifactId>
+			<version>${project.version}</version>
+		</dependency>
+
+		<dependency>
+			<groupId>junit</groupId>
+			<artifactId>junit</artifactId>
+			<scope>test</scope>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-server</artifactId>
+			<!--<exclusions> -->
+			<!--<exclusion> -->
+			<!--<groupId>asm</groupId> -->
+			<!--<artifactId>asm</artifactId> -->
+			<!--</exclusion> -->
+			<!--</exclusions> -->
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey.contribs</groupId>
+			<artifactId>jersey-multipart</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-mapper-asl</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-jaxrs</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.codehaus.jackson</groupId>
+			<artifactId>jackson-xc</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.fasterxml.jackson.core</groupId>
+			<artifactId>jackson-databind</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.typesafe</groupId>
+			<artifactId>config</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.slf4j</groupId>
+			<artifactId>slf4j-api</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.tomcat.embed</groupId>
+			<artifactId>tomcat-embed-core</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>io.swagger</groupId>
+			<artifactId>swagger-jaxrs</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-servlet</artifactId>
+		</dependency>
+	</dependencies>
+
+	<build>
+		<plugins>
+			<plugin>
+				<groupId>org.eclipse.jetty</groupId>
+				<artifactId>jetty-maven-plugin</artifactId>
+				<configuration>
+					<scanIntervalSeconds>5</scanIntervalSeconds>
+				</configuration>
+			</plugin>
+		</plugins>
+	</build>
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/614c118e/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
new file mode 100644
index 0000000..94ec767
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/alert/metadata/resource/MetadataResource.java
@@ -0,0 +1,225 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata.resource;
+
+import java.util.List;
+
+import javax.ws.rs.Consumes;
+import javax.ws.rs.DELETE;
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.PathParam;
+import javax.ws.rs.Produces;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
+import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
+
+/**
+ * @since Apr 11, 2016
+ *
+ */
+@Path("/metadata")
+@Produces("application/json")
+@Consumes("application/json")
+public class MetadataResource {
+
+    private IMetadataDao dao = MetadataDaoFactory.getInstance().getMetadataDao();
+
+    @Path("/clusters")
+    @GET
+    public List<StreamingCluster> listClusters() {
+        return dao.listClusters();
+    }
+    
+    @Path("/clear")
+    @POST
+    public OpResult clear() {
+        return dao.clear();
+    }
+
+    @Path("/export")
+    @GET
+    public Models export() {
+        return dao.export();
+    }
+
+    @Path("/import")
+    @GET
+    public OpResult importModels(Models model) {
+        return dao.importModels(model);
+    }
+
+    @Path("/clusters")
+    @POST
+    public OpResult addCluster(StreamingCluster cluster) {
+        return dao.addCluster(cluster);
+    }
+
+    @Path("/clusters/{clusterId}")
+    @DELETE
+    public OpResult removeCluster(@PathParam("clusterId") String clusterId) {
+        return dao.removeCluster(clusterId);
+    }
+
+    @Path("/streams")
+    @GET
+    public List<StreamDefinition> listStreams() {
+        return dao.listStreams();
+    }
+
+    @Path("/streams")
+    @POST
+    public OpResult createStream(StreamDefinition stream) {
+        return dao.createStream(stream);
+    }
+
+    @Path("/streams/{streamId}")
+    @DELETE
+    public OpResult removeStream(@PathParam("streamId") String streamId) {
+        return dao.removeStream(streamId);
+    }
+
+    @Path("/datasources")
+    @GET
+    public List<Kafka2TupleMetadata> listDataSources() {
+        return dao.listDataSources();
+    }
+
+    @Path("/datasources")
+    @POST
+    public OpResult addDataSource(Kafka2TupleMetadata dataSource) {
+        return dao.addDataSource(dataSource);
+    }
+
+    @Path("/datasources/{datasourceId}")
+    @DELETE
+    public OpResult removeDataSource(@PathParam("datasourceId") String datasourceId) {
+        return dao.removeDataSource(datasourceId);
+    }
+
+    @Path("/policies")
+    @GET
+    public List<PolicyDefinition> listPolicies() {
+        return dao.listPolicies();
+    }
+
+    @Path("/policies")
+    @POST
+    public OpResult addPolicy(PolicyDefinition policy) {
+        return dao.addPolicy(policy);
+    }
+
+    @Path("/policies/{policyId}")
+    @DELETE
+    public OpResult removePolicy(@PathParam("policyId") String policyId) {
+        return dao.removePolicy(policyId);
+    }
+
+    @Path("/publishments")
+    @GET
+    public List<Publishment> listPublishment() {
+        return dao.listPublishment();
+    }
+
+    @Path("/publishments")
+    @POST
+    public OpResult addPublishment(Publishment publishment) {
+        return dao.addPublishment(publishment);
+    }
+
+    @Path("/publishments/{pubId}")
+    @DELETE
+    public OpResult removePublishment(@PathParam("pubId") String pubId) {
+        return dao.removePublishment(pubId);
+    }
+
+    @Path("/publishmentTypes")
+    @GET
+    public List<PublishmentType> listPublishmentType() {
+        return dao.listPublishmentType();
+    }
+
+    @Path("/publishmentTypes")
+    @POST
+    public OpResult addPublishmentType(PublishmentType publishmentType) {
+        return dao.addPublishmentType(publishmentType);
+    }
+
+    @Path("/publishmentTypes/{pubType}")
+    @DELETE
+    public OpResult removePublishmentType(@PathParam("pubType") String pubType) {
+        return dao.removePublishmentType(pubType);
+    }
+
+    @Path("/schedulestates/{versionId}")
+    @GET
+    public ScheduleState listScheduleState(@PathParam("versionId") String versionId) {
+        return dao.getScheduleState(versionId);
+    }
+
+    @Path("/schedulestates")
+    @GET
+    public ScheduleState latestScheduleState() {
+        return dao.getScheduleState();
+    }
+
+    @Path("/schedulestates")
+    @POST
+    public OpResult addScheduleState(ScheduleState state) {
+        return dao.addScheduleState(state);
+    }
+
+    @Path("/assignments")
+    @GET
+    public List<PolicyAssignment> listAssignmenets() {
+        return dao.listAssignments();
+    }
+
+    @Path("/assignments")
+    @POST
+    public OpResult addAssignmenet(PolicyAssignment pa) {
+        return dao.addAssignment(pa);
+    }
+
+    @Path("/topologies")
+    @GET
+    public List<Topology> listTopologies() {
+        return dao.listTopologies();
+    }
+
+    @Path("/topologies")
+    @POST
+    public OpResult addTopology(Topology t) {
+        return dao.addTopology(t);
+    }
+
+    @Path("/topologies/{topologyName}")
+    @DELETE
+    public OpResult removeTopology(@PathParam("topologyName") String topologyName) {
+        return dao.removeTopology(topologyName);
+    }
+
+}