You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/07/07 00:53:33 UTC

[3/6] incubator-eagle git commit: Rebase code base

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
new file mode 100644
index 0000000..f4e82bc
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/siddhi/SiddhiPolicyTest.java
@@ -0,0 +1,247 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.siddhi;
+
+import java.util.HashSet;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.wso2.siddhi.core.ExecutionPlanRuntime;
+import org.wso2.siddhi.core.SiddhiManager;
+import org.wso2.siddhi.core.event.Event;
+import org.wso2.siddhi.core.stream.input.InputHandler;
+import org.wso2.siddhi.core.stream.output.StreamCallback;
+
+/**
+ * @since Jun 21, 2016
+ *
+ */
+public class SiddhiPolicyTest {
+    
+    private static final Logger LOG = LoggerFactory.getLogger(SiddhiPolicyTest.class);
+
+    private String streams = " define stream syslog_stream("
+            + "dims_facility string, "
+            + "dims_severity string, "
+            + "dims_hostname string, "
+            + "dims_msgid string, "
+            + "timestamp string, "
+            + "conn string, "
+            + "op string, "
+            + "msgId string, "
+            + "command string, "
+            + "name string, "
+            + "namespace string, "
+            + "epochMillis long); ";
+    private SiddhiManager sm;
+    
+    @Before
+    public void setup() {
+        sm = new SiddhiManager();
+    }
+    
+    @After
+    public void shutdown() {
+        sm.shutdown();
+    }
+
+    @Test
+    public void testPolicy_grpby() {
+        String ql = " from syslog_stream#window.time(1min) select name, namespace, timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
+        StreamCallback sc = new StreamCallback() {
+            @Override
+            public void receive(Event[] arg0) {
+
+            };
+        };
+
+        String executionPlan = streams + ql;
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        runtime.addCallback("syslog_severity_check_output", sc);
+        runtime.start();
+    }
+
+    @Ignore
+    @Test
+    public void testPolicy_agg() throws Exception {
+        String sql = " from syslog_stream#window.time(1min) select "
+                + "name, "
+                + "namespace, "
+                + "timestamp, "
+                + "dims_hostname, "
+                + "count(*) as abortCount "
+                + "group by dims_hostname "
+                + "having abortCount > 3 insert into syslog_severity_check_output; ";
+
+        final AtomicBoolean checked = new AtomicBoolean(false);
+        StreamCallback sc = new StreamCallback() {
+            @Override
+            public void receive(Event[] arg0) {
+                checked.set(true);
+                LOG.info("event array size: " + arg0.length);
+                Set<String> hosts = new HashSet<String>();
+                for (Event e : arg0) {
+                    hosts.add((String) e.getData()[3]);
+                }
+
+                LOG.info(" grouped hosts : " + hosts);
+                Assert.assertTrue(hosts.contains("HOSTNAME-" + 0));
+                Assert.assertTrue(hosts.contains("HOSTNAME-" + 1));
+                Assert.assertTrue(hosts.contains("HOSTNAME-" + 2));
+                Assert.assertFalse(hosts.contains("HOSTNAME-" + 3));
+            };
+        };
+
+        String executionPlan = streams + sql;
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        runtime.addCallback("syslog_severity_check_output", sc);
+        runtime.start();
+        InputHandler handler = runtime.getInputHandler("syslog_stream");
+
+        sendInput(handler);
+
+        Thread.sleep(1000);
+
+        Assert.assertTrue(checked.get());
+
+        runtime.shutdown();
+    }
+    
+    /*
+        + "dims_facility string, "
+        + "dims_severity string, "
+        + "dims_hostname string, "
+        + "dims_msgid string, "
+        + "timestamp string, "
+        + "conn string, "
+        + "op string, "
+        + "msgId string, "
+        + "command string, "
+        + "name string, "
+        + "namespace string, "
+        + "epochMillis long)
+     */
+    private void sendInput(InputHandler handler) throws Exception {
+        int length = 15;
+        Event[] events = new Event[length];
+        for (int i = 0; i < length; i++) {
+            Event e = new Event(12);
+            e.setTimestamp(System.currentTimeMillis());
+            e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + i%4 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+            
+            events[i] = e;
+        }
+
+        handler.send(events);
+
+        Thread.sleep(61 * 1000);
+
+        Event e = new Event(12);
+        e.setTimestamp(System.currentTimeMillis());
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+        handler.send(e);
+    }
+
+    @Ignore
+    @Test
+    public void testPolicy_regex() throws Exception {
+        String sql = " from syslog_stream[regex:find(\"Abort\", op)]#window.time(1min) select timestamp, dims_hostname, count(*) as abortCount group by dims_hostname insert into syslog_severity_check_output; ";
+        
+        AtomicBoolean checked = new AtomicBoolean();
+        StreamCallback sc = new StreamCallback() {
+            @Override
+            public void receive(Event[] arg0) {
+                checked.set(true);
+            };
+        };
+
+        String executionPlan = streams + sql;
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        runtime.addCallback("syslog_severity_check_output", sc); 
+        runtime.start();
+        
+        InputHandler handler = runtime.getInputHandler("syslog_stream");
+        
+        sendInput(handler);
+        
+        Thread.sleep(1000);
+        
+        Assert.assertTrue(checked.get());
+        
+        runtime.shutdown();
+    }
+
+    @Ignore
+    @Test
+    public void testPolicy_seq() throws Exception {
+        String sql = ""
+                + " from every e1=syslog_stream[regex:find(\"UPDOWN\", op)] -> "
+                + " e2=syslog_stream[dims_hostname == e1.dims_hostname and regex:find(\"Abort\", op)] within 1 min "
+                + " select e1.timestamp as timestamp, e1.op as a_op, e2.op as b_op "
+                + " insert into syslog_severity_check_output; ";
+
+        AtomicBoolean checked = new AtomicBoolean();
+        StreamCallback sc = new StreamCallback() {
+            @Override
+            public void receive(Event[] arg0) {
+                checked.set(true);
+            };
+        };
+
+        String executionPlan = streams + sql;
+        ExecutionPlanRuntime runtime = sm.createExecutionPlanRuntime(executionPlan);
+        runtime.addCallback("syslog_severity_check_output", sc);
+        runtime.start();
+        InputHandler handler = runtime.getInputHandler("syslog_stream");
+
+        sendPatternInput(handler);
+
+        Thread.sleep(1000);
+        Assert.assertTrue(checked.get());
+
+        runtime.shutdown();
+    }
+
+    private void sendPatternInput(InputHandler handler) throws Exception {
+        // validate one
+        Event e = new Event(12);
+        e.setTimestamp(System.currentTimeMillis());
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-UPDOWN", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+            
+        e = new Event(12);
+        e.setTimestamp(System.currentTimeMillis());
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-nothing", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+        
+        e = new Event(12);
+        e.setTimestamp(System.currentTimeMillis());
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 0 , "MSGID-...", "Timestamp", "conn-sss", "op-msg-Abort", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+
+        Thread.sleep(61 * 1000);
+
+        e = new Event(12);
+        e.setTimestamp(System.currentTimeMillis());
+        e.setData(new Object[] {"facitliy", "SEVERITY_EMERG", "HOSTNAME-" + 11 , "MSGID-...", "Timestamp", "conn-sss", "op-msg", "msgId..", "command-...", "name-", "namespace", System.currentTimeMillis()});
+        handler.send(e);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
index 62427e0..30a0ef9 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/AlertTopologyTest.java
@@ -99,8 +99,8 @@ public class AlertTopologyTest implements Serializable{
         String topic = "testTopic3";
         int max = 1000;
         Properties configMap = new Properties();
-        configMap.put("bootstrap.servers", "sandbox.hortonworks.com:6667");
-        configMap.put("metadata.broker.list", "sandbox.hortonworks.com:6667");
+        configMap.put("bootstrap.servers", "localhost:6667");
+        configMap.put("metadata.broker.list", "localhost:6667");
         configMap.put("key.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         configMap.put("value.serializer", "org.apache.kafka.common.serialization.StringSerializer");
         configMap.put("request.required.acks", "1");

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
index 6960537..5c5a37f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/CorrelationSpoutTest.java
@@ -181,4 +181,50 @@ public class CorrelationSpoutTest {
         }
         Assert.assertTrue(verified.get());
     }
+
+//    @Ignore
+//    @SuppressWarnings("rawtypes")
+//    @Test
+//    public void testSpout() {
+//        String topoId = "testMetadataInjection";
+//        final AtomicBoolean verified = new AtomicBoolean(false);
+//        Config config = ConfigFactory.load();
+//        CorrelationSpout spout = new CorrelationSpout(config, topoId, null, 1);
+//
+//        TopologyBuilder builder = new TopologyBuilder();
+//        // only one spout
+//        builder.setSpout("cc-spout", spout);
+//        builder.setBolt("recv-bolt", new RecvBolt()).globalGrouping("cc-spout");
+//
+//        StormTopology topology = builder.createTopology();
+//        LocalCluster cluster = new LocalCluster();
+//        cluster.submitTopology(topoId, new HashMap(), topology);
+//
+//        while (true) {
+//            try {
+//                Thread.sleep(1000);
+//            } catch (Exception e) {
+//                e.printStackTrace();
+//            }
+//        }
+//    }
+//    
+//    @SuppressWarnings("serial")
+//    private static class RecvBolt extends BaseRichBolt {
+//
+//        @Override
+//        public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
+//        }
+//
+//        @Override
+//        public void execute(Tuple input) {
+//            
+//        }
+//
+//        @Override
+//        public void declareOutputFields(OutputFieldsDeclarer declarer) {
+//        }
+//        
+//    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
index 846c118..2dc37df 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/topology/TestBolt.java
@@ -20,31 +20,26 @@
 package org.apache.eagle.alert.engine.topology;
 
 
-import java.util.Map;
-
-import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
 import backtype.storm.task.OutputCollector;
 import backtype.storm.task.TopologyContext;
 import backtype.storm.topology.OutputFieldsDeclarer;
 import backtype.storm.topology.base.BaseRichBolt;
 import backtype.storm.tuple.Tuple;
+import org.junit.Ignore;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Map;
 
 /**
  * Created by yonzhang on 4/7/16.
  */
+@Ignore
 @SuppressWarnings({"rawtypes", "serial"})
 public class TestBolt extends BaseRichBolt {
     private static final Logger LOG = LoggerFactory.getLogger(TestBolt.class);
     private OutputCollector collector;
     private long count;
-
-    @Test
-    public void test(){
-
-    }
     @Override
     public void prepare(Map stormConf, TopologyContext context, OutputCollector collector) {
         this.collector = collector;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
index 0861597..8317943 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/utils/CompressionUtilsTest.java
@@ -16,13 +16,13 @@
  */
 package org.apache.eagle.alert.engine.utils;
 
+import java.io.IOException;
+
 import org.junit.Assert;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.IOException;
-
 public class CompressionUtilsTest {
     private final static Logger LOG = LoggerFactory.getLogger(CompressionUtilsTest.class);
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-integration.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-integration.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-integration.conf
deleted file mode 100644
index 4192715..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-integration.conf
+++ /dev/null
@@ -1,57 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-{
-  "topology" : {
-    "name" : "alertUnitTopology_1",
-    "numOfTotalWorkers": 20,
-    "numOfSpoutTasks" : 1,
-    "numOfRouterBolts" : 4,
-    "numOfAlertBolts" : 10,
-    "numOfPublishTasks" : 1,
-    "localMode" : "true"
-  },
-  "spout" : {
-    "kafkaBrokerZkQuorum": "localhost:2181",
-    "kafkaBrokerZkBasePath": "/brokers",
-    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
-    "stormKafkaTransactionZkQuorum": "",
-    "stormKafkaTransactionZkPath": "/consumers",
-    "stormKafkaEagleConsumer": "eagle_consumer",
-    "stormKafkaStateUpdateIntervalMs": 2000,
-    "stormKafkaFetchSizeBytes": 1048586,
-  },
-  "zkConfig" : {
-    "zkQuorum" : "localhost:2181",
-    "zkRoot" : "/alert",
-    "zkSessionTimeoutMs" : 10000,
-    "connectionTimeoutMs" : 10000,
-    "zkRetryTimes" : 3,
-    "zkRetryInterval" : 3000
-  },
-  "dynamicConfigSource" : {
-    "initDelayMillis": 3000,
-    "delayMillis" : 10000
-  },
-  "metadataService": {
-	"context" : "/rest",
-	"host" : "localhost",
-	"port" : 8080
-  },
-  "coordinatorService": {
-  	"host": "localhost",
-  	"port": "9090",
-  	"context" : "/api"
-  }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
index 2c99a68..4cfcc75 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test-backup.conf
@@ -24,7 +24,7 @@
     "localMode" : true
   },
   "spout" : {
-    "kafkaBrokerZkQuorum": "10.64.243.71:2181",
+    "kafkaBrokerZkQuorum": "localhost:2181",
     "kafkaBrokerZkBasePath": "/brokers",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
     "stormKafkaTransactionZkQuorum": "",
@@ -34,7 +34,7 @@
     "stormKafkaFetchSizeBytes": 1048586,
   },
   "zkConfig" : {
-    "zkQuorum" : "10.64.243.71:2181",
+    "zkQuorum" : "localhost:2181",
     "zkRoot" : "/alert",
     "zkSessionTimeoutMs" : 10000,
     "connectionTimeoutMs" : 10000,
@@ -63,7 +63,7 @@
         "level":"DEBUG"
       }
       "elasticsearch": {
-        "hosts": ["10.64.223.222:9200"]
+        "hosts": ["localhost:9200"]
         "index": "alert_metric_test"
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
index f4a797e..4cd932f 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/application-test.conf
@@ -60,7 +60,7 @@
         "level":"INFO"
       }
       "elasticsearch": {
-        "hosts": ["10.64.223.222:9200"]
+        "hosts": ["localhost:9200"]
         "index": "alert_metric_test"
       }
     }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
index bb998cd..80522fb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/application-integration-2.conf
@@ -45,13 +45,16 @@
     "delayMillis" : 10000
   },
   "metadataService": {
-	"context" : "/api",
+	"context" : "/rest",
 	"host" : "localhost",
 	"port" : 8080
   },
   "coordinatorService": {
   	"host": "localhost",
-  	"port": "9090",
-  	"context" : "/api"
+  	"port": "8080",
+  	"context" : "/rest"
+  },
+  "kafkaProducer": {
+  	"bootstrapServers": "localhost:9092"
   }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
index 7b531fc..6e1c136 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation/publishments.json
@@ -6,12 +6,13 @@
 		"log_stream_join_output"
 	],
 	"properties": {
-		"subject":"UMP Test Alert",
+		"subject":"Eagle Test Alert",
         "template":"",
 	  "sender": "sender@corp.com",
 	  "recipients": "receiver@corp.com",
 	  "smtp.server":"mailhost.com"
 	},
-	"dedupIntervalMin" : "1"
+	"dedupIntervalMin" : "PT1M",
+	"serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
 }
 ]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf
new file mode 100644
index 0000000..2b6f767
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/correlation_spouttest.conf
@@ -0,0 +1,53 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1_test",
+    "numOfSpoutTasks" : 3,
+    "numOfRouterBolts" : 6,
+    "numOfAlertBolts" : 6,
+    "numOfPublishTasks" : 1,
+    "numOfTotalWorkers":1,
+    "messageTimeoutSecs": 30,     // topology.message.timeout.secs: 30 by default
+    "localMode" : true
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+    "context" : "/rest",
+    "host" : "localhost",
+    "port" : 8080
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/datasources.json
deleted file mode 100644
index 77a280c..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/datasources.json
+++ /dev/null
@@ -1,19 +0,0 @@
-[
-{
-	"name": "perfmon_datasource",
-	"type": "KAFKA",
-	"properties": {
-	},
-	"topic": "perfmon_metrics",
-	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
-	"codec": {
-		"streamNameSelectorProp": {
-			"fieldNamesToInferStreamName" : "metric",
-			"streamNameFormat":"%s"
-		},
-		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
-		"timestampColumn": "timestamp",
-		"timestampFormat":""
-	}
-}
-]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
index 3ba587d..8025654 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/log4j.properties
@@ -20,4 +20,4 @@ log4j.appender.stdout=org.apache.log4j.ConsoleAppender
 log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
 log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n
 
-#log4j.logger.org.apache.eagle.alert.metric=DEBUG
\ No newline at end of file
+log4j.logger.org.apache.eagle.alert.engine.evaluator.nodata=DEBUG
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/application-nodata.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/application-nodata.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/application-nodata.conf
new file mode 100644
index 0000000..7094820
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/application-nodata.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers": 20,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "sandbox.hortonworks.com:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "sandbox.hortonworks.com:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+	"context" : "/rest",
+	"host" : "localhost",
+	"port" : 8080
+  },
+  "coordinatorService": {
+  	"host": "localhost",
+  	"port": "8080",
+  	"context" : "/rest"
+  },
+  "kafkaProducer": {
+  	"bootstrapServers": "sandbox.hortonworks.com:6667"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/datasources.json
new file mode 100644
index 0000000..988318e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/datasources.json
@@ -0,0 +1,17 @@
+[
+	{
+		"name": "noDataAlertDataSource",
+		"type": "KAFKA",
+		"properties": {},
+		"topic": "noDataAlertTopic",
+		"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+		"codec": {
+			"streamNameSelectorProp": {
+				"userProvidedStreamName": "noDataAlertStream"
+			},
+			"streamNameSelectorCls": "org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+			"timestampColumn": "timestamp",
+			"timestampFormat": ""
+		}
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies-provided-wisb.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies-provided-wisb.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies-provided-wisb.json
new file mode 100644
index 0000000..012fd9f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies-provided-wisb.json
@@ -0,0 +1,23 @@
+[
+	{
+		"name": "noDataAlertPolicy",
+		"description": "noDataAlertPolicy",
+		"inputStreams": [
+			"noDataAlertStream"
+		],
+		"outputStreams": [
+			"noDataAlertStream_out"
+		],
+		"definition": {
+			"type": "nodataalert",
+			"value": "PT1M,plain,1,host,host1,host2,host3"
+		},
+		"partitionSpec": [
+			{
+				"streamId": "noDataAlertStream",
+				"type": "GROUPBY"
+			}
+		],
+		"parallelismHint": 2
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
new file mode 100644
index 0000000..74f3016
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/policies.json
@@ -0,0 +1,23 @@
+[
+	{
+		"name": "noDataAlertPolicy",
+		"description": "noDataAlertPolicy",
+		"inputStreams": [
+			"noDataAlertStream"
+		],
+		"outputStreams": [
+			"noDataAlertStream_out"
+		],
+		"definition": {
+			"type": "nodataalert",
+			"value": "PT1M,dynamic,1,host"
+		},
+		"partitionSpec": [
+			{
+				"streamId": "noDataAlertStream",
+				"type": "GROUPBY"
+			}
+		],
+		"parallelismHint": 2
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/publishments.json
new file mode 100644
index 0000000..52208ee
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/publishments.json
@@ -0,0 +1,20 @@
+[
+	{
+		"name":"test-stream-output",
+		"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+		"policyIds": [
+			"noDataAlertPolicy"
+		],
+		"properties": {
+			"subject":"Eagle Test Alert",
+			"template":"",
+			"sender": "sender@corp.com",
+			"recipients": "services@corp.com",
+			"smtp.server":"smtp.mailhost.com",
+			"connection": "plaintext",
+			"smtp.port": "25"
+		},
+		"dedupIntervalMin" : "PT5M",
+		"serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/streamdefinitions.json
new file mode 100644
index 0000000..45b6241
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/streamdefinitions.json
@@ -0,0 +1,29 @@
+[
+	{
+		"streamId": "noDataAlertStream",
+		"dataSource": "noDataAlertDataSource",
+		"description": "the data stream for testing no data alert",
+		"validate": false,
+		"timeseries": false,
+		"columns": [
+			{
+				"name": "host",
+				"type": "STRING",
+				"defaultValue": "",
+				"required": true
+			},
+			{
+				"name": "timestamp",
+				"type": "LONG",
+				"defaultValue": 0,
+				"required": true
+			},
+			{
+				"name": "value",
+				"type": "DOUBLE",
+				"defaultValue": "0.0",
+				"required": true
+			}
+		]
+	}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/topologies.json
new file mode 100644
index 0000000..411cc48
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/nodata/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+	"name": "alertUnitTopology_1",
+	"numOfSpout":1,
+	"numOfAlertBolt": 10,
+	"numOfGroupBolt": 4,
+	"spoutId": "alertEngineSpout",
+	"groupNodeIds" : [
+		"streamRouterBolt0",
+		"streamRouterBolt1",
+		"streamRouterBolt2",
+		"streamRouterBolt3"
+	],
+	"alertBoltIds": [
+		"alertBolt0",
+		"alertBolt1",
+		"alertBolt2",
+		"alertBolt3",
+		"alertBolt4",
+		"alertBolt5",
+		"alertBolt6",
+		"alertBolt7",
+		"alertBolt8",
+		"alertBolt9"
+	],
+	"pubBoltId" : "alertPublishBolt",
+	"spoutParallelism": 1,
+	"groupParallelism": 1,
+	"alertParallelism": 1
+}
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/policies.json
deleted file mode 100644
index 5edece9..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/policies.json
+++ /dev/null
@@ -1,54 +0,0 @@
-[
-{
-	"name": "perfmon_cpu_host_check",
-	"description" : "policy to check host perfmon_cpu",
-	"inputStreams": [
-		"perfmon_cpu_stream"
-	],
-	"outputStreams": [
-		"perfmon_cpu_check_output"
-	],
-	"definition": {
-		"type": "siddhi",
-		"value": "from perfmon_cpu_stream[value > 90.0] select * group by host insert into perfmon_cpu_check_output;"
-	},
-	"partitionSpec": [
-		{
-			"streamId" : "perfmon_cpu_stream",
-			"type" : "GROUPBY",
-			"columns" : [
-				"host"
-			],
-			"sortSpec": {
-				"windowPeriod" : "PT1M"
-			}
-		}
-	]
-},
-{
-	"name": "perfmon_cpu_pool_check",
-	"description" : "policy to check pool perfmon_cpu",
-	"inputStreams": [
-		"perfmon_cpu_stream"
-	],
-	"outputStreams": [
-		"perfmon_cpu_check_output"
-	],
-	"definition": {
-		"type": "siddhi",
-		"value": "from perfmon_cpu_stream[value > 75.0] select * group by pool insert into perfmon_cpu_check_output;"
-	},
-	"partitionSpec": [
-		{
-			"streamId" : "perfmon_cpu_stream",
-			"type" : "GROUPBY",
-			"columns" : [
-				"pool"
-			],
-			"sortSpec": {
-				"windowPeriod" : "PT1M"
-			}
-		}
-	]
-}
-]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments.json
deleted file mode 100644
index 7b9c593..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments.json
+++ /dev/null
@@ -1,29 +0,0 @@
-[
-{
-	"name":"test-stream-output",
-	"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
-	"policyIds": [
-		"perfmon_cpu_host_check", "perfmon_cpu_pool_check"
-	],
-	"properties": {
-	  "subject":"UMP Test Alert",
-	  "template":"",
-	  "sender": "sender@corp.com",
-	  "recipients": "receiver@corp.com",
-	  "smtp.server":"mailhost.com",
-	  "connection": "plaintext",
-	  "smtp.port": "25"
-	},
-	"dedupIntervalMin" : "PT0M"
-},
-{
-  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
-  "name":"kafka-testAlertStream",
-  "policyIds": ["perfmon_cpu_host_check"],
-  "dedupIntervalMin": "PT1M",
-  "properties":{
-    "kafka_broker":"sandbox.hortonworks.com:6667",
-    "topic":"test_kafka"
-  }
-}
-]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments1.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments1.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments1.json
new file mode 100644
index 0000000..2e04e78
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/publishments1.json
@@ -0,0 +1,20 @@
+[
+{
+	"name":"test-stream-output",
+	"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+	"policyIds": [
+		"perfmon_cpu_host_check", "perfmon_cpu_pool_check"
+	],
+	"properties": {
+	  "subject":"UMP Test Alert",
+	  "template":"",
+	  "sender": "sender@corp.com",
+	  "recipients": "services@corp.com",
+	  "smtp.server":"sender.mailhost.com",
+	  "connection": "plaintext",
+	  "smtp.port": "25"
+	},
+	"dedupIntervalMin" : "PT1M",
+	"serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
new file mode 100644
index 0000000..09db673
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/application-integration.conf
@@ -0,0 +1,60 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+{
+  "topology" : {
+    "name" : "alertUnitTopology_1",
+    "numOfTotalWorkers": 20,
+    "numOfSpoutTasks" : 1,
+    "numOfRouterBolts" : 4,
+    "numOfAlertBolts" : 10,
+    "numOfPublishTasks" : 1,
+    "localMode" : "true"
+  },
+  "spout" : {
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/brokers",
+    "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
+    "stormKafkaTransactionZkQuorum": "",
+    "stormKafkaTransactionZkPath": "/consumers",
+    "stormKafkaEagleConsumer": "eagle_consumer",
+    "stormKafkaStateUpdateIntervalMs": 2000,
+    "stormKafkaFetchSizeBytes": 1048586,
+  },
+  "zkConfig" : {
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
+    "zkSessionTimeoutMs" : 10000,
+    "connectionTimeoutMs" : 10000,
+    "zkRetryTimes" : 3,
+    "zkRetryInterval" : 3000
+  },
+  "dynamicConfigSource" : {
+    "initDelayMillis": 3000,
+    "delayMillis" : 10000
+  },
+  "metadataService": {
+	"context" : "/rest",
+	"host" : "localhost",
+	"port" : 8080
+  },
+  "coordinatorService": {
+  	"host": "localhost",
+  	"port": "8080",
+  	"context" : "/rest"
+  },
+  "kafkaProducer": {
+  	"bootstrapServers": "localhost:9092"
+  }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/datasources.json
new file mode 100644
index 0000000..77a280c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/datasources.json
@@ -0,0 +1,19 @@
+[
+{
+	"name": "perfmon_datasource",
+	"type": "KAFKA",
+	"properties": {
+	},
+	"topic": "perfmon_metrics",
+	"schemeCls": "org.apache.eagle.alert.engine.scheme.JsonScheme",
+	"codec": {
+		"streamNameSelectorProp": {
+			"fieldNamesToInferStreamName" : "metric",
+			"streamNameFormat":"%s"
+		},
+		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+		"timestampColumn": "timestamp",
+		"timestampFormat":""
+	}
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/policies.json
new file mode 100644
index 0000000..5edece9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/policies.json
@@ -0,0 +1,54 @@
+[
+{
+	"name": "perfmon_cpu_host_check",
+	"description" : "policy to check host perfmon_cpu",
+	"inputStreams": [
+		"perfmon_cpu_stream"
+	],
+	"outputStreams": [
+		"perfmon_cpu_check_output"
+	],
+	"definition": {
+		"type": "siddhi",
+		"value": "from perfmon_cpu_stream[value > 90.0] select * group by host insert into perfmon_cpu_check_output;"
+	},
+	"partitionSpec": [
+		{
+			"streamId" : "perfmon_cpu_stream",
+			"type" : "GROUPBY",
+			"columns" : [
+				"host"
+			],
+			"sortSpec": {
+				"windowPeriod" : "PT1M"
+			}
+		}
+	]
+},
+{
+	"name": "perfmon_cpu_pool_check",
+	"description" : "policy to check pool perfmon_cpu",
+	"inputStreams": [
+		"perfmon_cpu_stream"
+	],
+	"outputStreams": [
+		"perfmon_cpu_check_output"
+	],
+	"definition": {
+		"type": "siddhi",
+		"value": "from perfmon_cpu_stream[value > 75.0] select * group by pool insert into perfmon_cpu_check_output;"
+	},
+	"partitionSpec": [
+		{
+			"streamId" : "perfmon_cpu_stream",
+			"type" : "GROUPBY",
+			"columns" : [
+				"pool"
+			],
+			"sortSpec": {
+				"windowPeriod" : "PT1M"
+			}
+		}
+	]
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/publishments.json
new file mode 100644
index 0000000..ab9a98e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/publishments.json
@@ -0,0 +1,31 @@
+[
+{
+	"name":"test-stream-output",
+	"type":"org.apache.eagle.alert.engine.publisher.impl.AlertEmailPublisher",
+	"policyIds": [
+		"perfmon_cpu_host_check", "perfmon_cpu_pool_check"
+	],
+	"properties": {
+	  "subject":"Eagle Test Alert",
+	  "template":"",
+	  "sender": "sender@corp.com",
+	  "recipients": "services@corp.com",
+	  "smtp.server":"smtp.mailhost.com",
+	  "connection": "plaintext",
+	  "smtp.port": "25"
+	},
+	"dedupIntervalMin" : "PT1M",
+	"serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+},
+{
+  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+  "name":"kafka-testAlertStream",
+  "policyIds": ["perfmon_cpu_host_check"],
+  "dedupIntervalMin": "PT1M",
+  "properties":{
+    "kafka_broker":"localhost:9092",
+    "topic":"test_kafka"
+  },
+  "serializer" : "org.apache.eagle.alert.engine.publisher.impl.StringEventSerializer"
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/streamdefinitions.json
new file mode 100644
index 0000000..d93822e
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/streamdefinitions.json
@@ -0,0 +1,44 @@
+[
+{
+	"streamId": "perfmon_cpu_stream",
+	"dataSource" : "perfmon_datasource",
+	"description":"the data stream for perfmon cpu metrics",
+	"validate": false,
+	"timeseries":false,
+	"columns": [
+		{
+			"name": "host",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		},
+		{
+			"name": "timestamp",
+			"type" : "LONG",
+			"defaultValue": 0,
+			"required":true
+		},{
+			"name": "metric",
+			"type" : "STRING",
+			"defaultValue": "perfmon_cpu",
+			"required": true
+		},{
+			"name": "pool",
+			"type" : "STRING",
+			"defaultValue": "raptor_general",
+			"required":true
+		},{
+			"name": "value",
+			"type" : "DOUBLE",
+			"defaultValue": 0.0,
+			"required":true
+		},
+		{
+			"name": "colo",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		}
+	]
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
new file mode 100644
index 0000000..411cc48
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/simple/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+	"name": "alertUnitTopology_1",
+	"numOfSpout":1,
+	"numOfAlertBolt": 10,
+	"numOfGroupBolt": 4,
+	"spoutId": "alertEngineSpout",
+	"groupNodeIds" : [
+		"streamRouterBolt0",
+		"streamRouterBolt1",
+		"streamRouterBolt2",
+		"streamRouterBolt3"
+	],
+	"alertBoltIds": [
+		"alertBolt0",
+		"alertBolt1",
+		"alertBolt2",
+		"alertBolt3",
+		"alertBolt4",
+		"alertBolt5",
+		"alertBolt6",
+		"alertBolt7",
+		"alertBolt8",
+		"alertBolt9"
+	],
+	"pubBoltId" : "alertPublishBolt",
+	"spoutParallelism": 1,
+	"groupParallelism": 1,
+	"alertParallelism": 1
+}
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/streamdefinitions.json
deleted file mode 100644
index cbeae19..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/streamdefinitions.json
+++ /dev/null
@@ -1,44 +0,0 @@
-[
-{
-	"streamId": "perfmon_cpu_stream",
-	"dataSource" : "perfmon_datasource",
-	"description":"the data stream for perfmon cpu metrics",
-	"validate": false,
-	"timeseries":false,
-	"columns": [
-		{
-			"name": "host",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		},
-		{
-			"name": "timestamp",
-			"type" : "long",
-			"defaultValue": 0,
-			"required":true
-		},{
-			"name": "metric",
-			"type" : "string",
-			"defaultValue": "perfmon_cpu",
-			"required": true
-		},{
-			"name": "pool",
-			"type" : "string",
-			"defaultValue": "raptor_general",
-			"required":true
-		},{
-			"name": "value",
-			"type" : "double",
-			"defaultValue": 0.0,
-			"required":true
-		},
-		{
-			"name": "colo",
-			"type" : "string",
-			"defaultValue": "",
-			"required":true
-		}
-	]
-}
-]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topologies.json
deleted file mode 100644
index 411cc48..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/resources/topologies.json
+++ /dev/null
@@ -1,31 +0,0 @@
-[
-{
-	"name": "alertUnitTopology_1",
-	"numOfSpout":1,
-	"numOfAlertBolt": 10,
-	"numOfGroupBolt": 4,
-	"spoutId": "alertEngineSpout",
-	"groupNodeIds" : [
-		"streamRouterBolt0",
-		"streamRouterBolt1",
-		"streamRouterBolt2",
-		"streamRouterBolt3"
-	],
-	"alertBoltIds": [
-		"alertBolt0",
-		"alertBolt1",
-		"alertBolt2",
-		"alertBolt3",
-		"alertBolt4",
-		"alertBolt5",
-		"alertBolt6",
-		"alertBolt7",
-		"alertBolt8",
-		"alertBolt9"
-	],
-	"pubBoltId" : "alertPublishBolt",
-	"spoutParallelism": 1,
-	"groupParallelism": 1,
-	"alertParallelism": 1
-}
-]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
index 1318a31..2ce2a15 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/pom.xml
@@ -1,14 +1,14 @@
 <?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one
-	or more ~ * contributor license agreements. See the NOTICE file distributed
-	with ~ * this work for additional information regarding copyright ownership.
-	~ * The ASF licenses this file to You under the Apache License, Version 2.0
-	~ * (the "License"); you may not use this file except in compliance with
-	~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0
-	~ * ~ * Unless required by applicable law or agreed to in writing, software
-	~ * distributed under the License is distributed on an "AS IS" BASIS, ~ *
-	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-	~ * See the License for the specific language governing permissions and ~
+<!-- ~ /* ~ * Licensed to the Apache Software Foundation (ASF) under one 
+	or more ~ * contributor license agreements. See the NOTICE file distributed 
+	with ~ * this work for additional information regarding copyright ownership. 
+	~ * The ASF licenses this file to You under the Apache License, Version 2.0 
+	~ * (the "License"); you may not use this file except in compliance with 
+	~ * the License. You may obtain a copy of the License at ~ * ~ * http://www.apache.org/licenses/LICENSE-2.0 
+	~ * ~ * Unless required by applicable law or agreed to in writing, software 
+	~ * distributed under the License is distributed on an "AS IS" BASIS, ~ * 
+	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
+	~ * See the License for the specific language governing permissions and ~ 
 	* limitations under the License. ~ */ -->
 <project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
@@ -17,15 +17,12 @@
 		<groupId>org.apache.eagle</groupId>
 		<artifactId>alert-metadata-parent</artifactId>
 		<version>0.5.0-incubating-SNAPSHOT</version>
-		<relativePath>../pom.xml</relativePath>
 	</parent>
 
 	<artifactId>alert-metadata-service</artifactId>
 	<packaging>jar</packaging>
 
 	<dependencies>
-		<!-- Storm depends on org.ow2.asm:asm:4.0 -->
-		<!-- Jersey depends on asm:asm:3.0 -->
 		<dependency>
 			<groupId>org.apache.eagle</groupId>
 			<artifactId>alert-engine</artifactId>
@@ -48,21 +45,25 @@
 			<artifactId>junit</artifactId>
 			<scope>test</scope>
 		</dependency>
+
 		<dependency>
 			<groupId>com.sun.jersey</groupId>
 			<artifactId>jersey-server</artifactId>
-			<!--<exclusions> -->
-			<!--<exclusion> -->
-			<!--<groupId>asm</groupId> -->
-			<!--<artifactId>asm</artifactId> -->
-			<!--</exclusion> -->
-			<!--</exclusions> -->
 		</dependency>
 		<dependency>
 			<groupId>com.sun.jersey.contribs</groupId>
 			<artifactId>jersey-multipart</artifactId>
 		</dependency>
 		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-servlet</artifactId>
+		</dependency>
+		<dependency>
+			<groupId>com.sun.jersey</groupId>
+			<artifactId>jersey-client</artifactId>
+		</dependency>
+
+		<dependency>
 			<groupId>org.codehaus.jackson</groupId>
 			<artifactId>jackson-mapper-asl</artifactId>
 		</dependency>
@@ -94,10 +95,7 @@
 			<groupId>io.swagger</groupId>
 			<artifactId>swagger-jaxrs</artifactId>
 		</dependency>
-		<dependency>
-			<groupId>com.sun.jersey</groupId>
-			<artifactId>jersey-servlet</artifactId>
-		</dependency>
+
 	</dependencies>
 
 	<build>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
index 12e4f5a..7e4dea7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/metadata/resource/MetadataResource.java
@@ -37,7 +37,7 @@ import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
 import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
-import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
 
@@ -57,7 +57,7 @@ public class MetadataResource {
     public List<StreamingCluster> listClusters() {
         return dao.listClusters();
     }
-
+    
     @Path("/clear")
     @POST
     public OpResult clear() {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
index ff9a65a..f9a6450 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/java/org/apache/eagle/service/topology/resource/impl/TopologyMgmtResourceImpl.java
@@ -28,7 +28,7 @@ import org.apache.eagle.alert.engine.UnitTopologyMain;
 import org.apache.eagle.alert.engine.runner.UnitTopologyRunner;
 import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
 import org.apache.eagle.alert.metadata.impl.MetadataDaoFactory;
-import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.apache.eagle.alert.metadata.IMetadataDao;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
deleted file mode 100644
index ada6a4d..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/resources/application.conf
+++ /dev/null
@@ -1,21 +0,0 @@
-# Licensed to the Apache Software Foundation (ASF) under one or more
-# contributor license agreements.  See the NOTICE file distributed with
-# this work for additional information regarding copyright ownership.
-# The ASF licenses this file to You under the Apache License, Version 2.0
-# (the "License"); you may not use this file except in compliance with
-# the License.  You may obtain a copy of the License at
-#
-#    http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing, software
-# distributed under the License is distributed on an "AS IS" BASIS,
-# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-# See the License for the specific language governing permissions and
-# limitations under the License.
-
-{
-	"datastore": {
-		"metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl",
-		"connection": "localhost:27017"
-	}
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
deleted file mode 100644
index 10e9504..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/WEB-INF/web.xml
+++ /dev/null
@@ -1,81 +0,0 @@
-<?xml version="1.0" encoding="UTF-8"?>
-<!-- ~ Licensed to the Apache Software Foundation (ASF) under one or more 
-	~ contributor license agreements. See the NOTICE file distributed with ~ 
-	this work for additional information regarding copyright ownership. ~ The 
-	ASF licenses this file to You under the Apache License, Version 2.0 ~ (the 
-	"License"); you may not use this file except in compliance with ~ the License. 
-	You may obtain a copy of the License at ~ ~ http://www.apache.org/licenses/LICENSE-2.0 
-	~ ~ Unless required by applicable law or agreed to in writing, software ~ 
-	distributed under the License is distributed on an "AS IS" BASIS, ~ WITHOUT 
-	WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. ~ See the 
-	License for the specific language governing permissions and ~ limitations 
-	under the License. -->
-<web-app xmlns="http://java.sun.com/xml/ns/javaee" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
-	xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
-		  http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
-	version="3.0">
-	<welcome-file-list>
-		<welcome-file>index.html</welcome-file>
-	</welcome-file-list>
-	<servlet>
-		<servlet-name>Jersey Web Application</servlet-name>
-		<servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
-		<init-param>
-			<param-name>com.sun.jersey.config.property.packages</param-name>
-			<param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.service,org.codehaus.jackson.jaxrs</param-value>
-		</init-param>
-		<init-param>
-			<param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
-			<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
-		</init-param>
-		<init-param>
-			<param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
-			<param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
-		</init-param>
-		<load-on-startup>1</load-on-startup>
-	</servlet>
-	<!-- Servlet for swagger initialization only, no URL mapping. -->
-	<servlet>
-		<servlet-name>swaggerConfig</servlet-name>
-		<servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
-		<init-param>
-			<param-name>api.version</param-name>
-			<param-value>1.0.0</param-value>
-		</init-param>
-		<init-param>
-			<param-name>swagger.api.basepath</param-name>
-			<param-value>/rest</param-value>
-		</init-param>
-		<load-on-startup>2</load-on-startup>
-	</servlet>
-
-	<servlet-mapping>
-		<servlet-name>Jersey Web Application</servlet-name>
-		<url-pattern>/rest/*</url-pattern>
-	</servlet-mapping>
-	<filter>
-		<filter-name>CorsFilter</filter-name>
-		<!-- Ideally, should be tomcat embed core's CORSFilter. See @SimpleCORSFiler comments. -->
-		<filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
-		<init-param>
-			<param-name>cors.allowed.origins</param-name>
-			<param-value>*</param-value>
-		</init-param>
-		<init-param>
-			<param-name>cors.allowed.headers</param-name>
-			<param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value>
-		</init-param>
-		<init-param>
-			<param-name>cors.allowed.methods</param-name>
-			<param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
-		</init-param>
-		<init-param>
-			<param-name>cors.support.credentials</param-name>
-			<param-value>true</param-value>
-		</init-param>
-	</filter>
-	<filter-mapping>
-		<filter-name>CorsFilter</filter-name>
-		<url-pattern>/*</url-pattern>
-	</filter-mapping>
-</web-app>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
deleted file mode 100644
index 5da5b32..0000000
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/main/webapp/index.html
+++ /dev/null
@@ -1,18 +0,0 @@
-<!--
-  Licensed to the Apache Software Foundation (ASF) under one
-  or more contributor license agreements.  See the NOTICE file
-  distributed with this work for additional information
-  regarding copyright ownership.  The ASF licenses this file
-  to you under the Apache License, Version 2.0 (the
-  "License"); you may not use this file except in compliance
-  with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-  Unless required by applicable law or agreed to in writing, software
-  distributed under the License is distributed on an "AS IS" BASIS,
-  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-  See the License for the specific language governing permissions and
-  limitations under the License.
-  -->
-Hello, this is UMP alert metadata service. You are welcome!
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
new file mode 100644
index 0000000..63e649c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata-service/src/test/resources/application.conf
@@ -0,0 +1,20 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+{
+	"datastore": {
+		"metadataDao": "org.apache.eagle.alert.metadata.impl.InMemMetadataDaoImpl"
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
index 5380963..7b789ab 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/pom.xml
@@ -10,18 +10,23 @@
 	WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied. 
 	~ * See the License for the specific language governing permissions and ~ 
 	* limitations under the License. ~ */ -->
-<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance" xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+	xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/xsd/maven-4.0.0.xsd">
 	<modelVersion>4.0.0</modelVersion>
 
 	<parent>
 		<groupId>org.apache.eagle</groupId>
 		<artifactId>alert-metadata-parent</artifactId>
 		<version>0.5.0-incubating-SNAPSHOT</version>
-		<relativePath>../pom.xml</relativePath>
 	</parent>
+
 	<artifactId>alert-metadata</artifactId>
 	<packaging>jar</packaging>
+
 	<dependencies>
+		<!-- Storm depends on org.ow2.asm:asm:4.0 -->
+		<!-- Jersey depends on asm:asm:3.0 -->
+
 		<dependency>
 			<groupId>junit</groupId>
 			<artifactId>junit</artifactId>
@@ -38,10 +43,20 @@
 			<version>${mongodb.version}</version>
 		</dependency>
 		<dependency>
+			<groupId>mysql</groupId>
+			<artifactId>mysql-connector-java</artifactId>
+			<version>${mysql-connector-java.version}</version>
+		</dependency>
+		<dependency>
+			<groupId>org.apache.ddlutils</groupId>
+			<artifactId>ddlutils</artifactId>
+			<version>${ddlutils.version}</version>
+		</dependency>
+		<dependency>
 			<groupId>de.flapdoodle.embed</groupId>
 			<artifactId>de.flapdoodle.embed.mongo</artifactId>
 			<version>1.50.5</version>
 			<scope>test</scope>
 		</dependency>
 	</dependencies>
-</project>
\ No newline at end of file
+</project>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
new file mode 100644
index 0000000..59bd4bf
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/IMetadataDao.java
@@ -0,0 +1,89 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.metadata;
+
+import java.io.Closeable;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.engine.coordinator.*;
+import org.apache.eagle.alert.metadata.resource.Models;
+import org.apache.eagle.alert.metadata.resource.OpResult;
+
+public interface IMetadataDao extends Closeable {
+
+    List<Topology> listTopologies();
+
+    OpResult addTopology(Topology t);
+
+    OpResult removeTopology(String topologyName);
+
+    List<StreamingCluster> listClusters();
+
+    OpResult addCluster(StreamingCluster cluster);
+
+    OpResult removeCluster(String clusterId);
+
+    List<StreamDefinition> listStreams();
+
+    OpResult createStream(StreamDefinition stream);
+
+    OpResult removeStream(String streamId);
+
+    List<Kafka2TupleMetadata> listDataSources();
+
+    OpResult addDataSource(Kafka2TupleMetadata dataSource);
+
+    OpResult removeDataSource(String datasourceId);
+
+    List<PolicyDefinition> listPolicies();
+
+    OpResult addPolicy(PolicyDefinition policy);
+
+    OpResult removePolicy(String policyId);
+
+    List<Publishment> listPublishment();
+
+    OpResult addPublishment(Publishment publishment);
+
+    OpResult removePublishment(String pubId);
+
+    List<PublishmentType> listPublishmentType();
+
+    OpResult addPublishmentType(PublishmentType publishmentType);
+
+    OpResult removePublishmentType(String pubType);
+
+    ScheduleState getScheduleState(String versionId);
+
+    ScheduleState getScheduleState();
+
+    OpResult addScheduleState(ScheduleState state);
+
+    List<PolicyAssignment> listAssignments();
+
+    OpResult addAssignment(PolicyAssignment assignment);
+
+    // APIs for test friendly
+    OpResult clear();
+    Models export();
+    OpResult importModels(Models models);
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
new file mode 100644
index 0000000..1210861
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/MetadataUtils.java
@@ -0,0 +1,74 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing, software
+ *  distributed under the License is distributed on an "AS IS" BASIS,
+ *  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  See the License for the specific language governing permissions and
+ *  limitations under the License.
+ *
+ */
+
+package org.apache.eagle.alert.metadata;
+
+import com.typesafe.config.Config;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.engine.coordinator.PublishmentType;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.lang.reflect.InvocationTargetException;
+import java.lang.reflect.Method;
+import java.sql.Connection;
+import java.sql.DriverManager;
+import java.sql.SQLException;
+
+public class MetadataUtils {
+
+    private final static Logger LOG = LoggerFactory.getLogger(MetadataUtils.class);
+
+    public static <T> String getKey(T t) {
+        if (t instanceof StreamDefinition) {
+            return ((StreamDefinition) t).getStreamId();
+        }
+        if (t instanceof PublishmentType) {
+            return ((PublishmentType) t).getType();
+        }
+        if (t instanceof PolicyAssignment) {
+            return ((PolicyAssignment) t).getPolicyName();
+        }
+        if (t instanceof ScheduleState) {
+            return ((ScheduleState) t).getVersion();
+        }
+
+        try {
+            Method m = t.getClass().getMethod("getName");
+            return (String) m.invoke(t);
+        } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException
+                | IllegalArgumentException e) {
+            LOG.error(" getName not found on given class :" + t.getClass().getName());
+        }
+        throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass()
+                .getName()));
+    }
+
+    public static Connection getJdbcConnection(Config config) {
+        Connection connection = null;
+        String conn = config.getString("connection");
+        try {
+            connection = DriverManager.getConnection(conn);
+        } catch (SQLException e) {
+            e.printStackTrace();
+        }
+        return connection;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/72a1501c/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
index 218ea6b..fc1bbaa 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-metadata-parent/alert-metadata/src/main/java/org/apache/eagle/alert/metadata/impl/InMemMetadataDaoImpl.java
@@ -16,8 +16,7 @@
  */
 package org.apache.eagle.alert.metadata.impl;
 
-import java.lang.reflect.InvocationTargetException;
-import java.lang.reflect.Method;
+import java.io.IOException;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.List;
@@ -35,7 +34,8 @@ import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.PublishmentType;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
-import org.apache.eagle.alert.metadata.resource.IMetadataDao;
+import org.apache.eagle.alert.metadata.IMetadataDao;
+import org.apache.eagle.alert.metadata.MetadataUtils;
 import org.apache.eagle.alert.metadata.resource.Models;
 import org.apache.eagle.alert.metadata.resource.OpResult;
 import org.slf4j.Logger;
@@ -82,7 +82,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
         Optional<T> scOp = clusters.stream().filter(new Predicate<T>() {
             @Override
             public boolean test(T t) {
-                if (getKey(t).equalsIgnoreCase(getKey(paramT))) {
+                if (MetadataUtils.getKey(t).equalsIgnoreCase(MetadataUtils.getKey(paramT))) {
                     return true;
                 }
                 return false;
@@ -102,32 +102,13 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
         return result;
     }
 
-    public static <T> String getKey(T t) {
-        if (t instanceof StreamDefinition) {
-            return ((StreamDefinition) t).getStreamId();
-        }
-        if (t instanceof PublishmentType) {
-            return ((PublishmentType) t).getType();
-        }
-
-        try {
-            Method m = t.getClass().getMethod("getName");
-            return (String) m.invoke(t);
-        } catch (NoSuchMethodException | SecurityException | InvocationTargetException | IllegalAccessException
-                | IllegalArgumentException e) {
-            LOG.error(" getName not found on given class :" + t.getClass().getName());
-        }
-        throw new RuntimeException(String.format("no getName() found on target class %s for matching", t.getClass()
-                .getName()));
-    }
-
     @SuppressWarnings("unchecked")
     private synchronized <T> OpResult remove(List<T> clusters, String id) {
         T[] matched = (T[]) clusters.stream().filter(new Predicate<T>() {
 
             @Override
             public boolean test(T t) {
-                if (getKey(t).equalsIgnoreCase(id)) {
+                if (MetadataUtils.getKey(t).equalsIgnoreCase(id)) {
                     return true;
                 }
                 return false;
@@ -230,7 +211,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
-    public OpResult addScheduleState(ScheduleState state) {
+    public synchronized OpResult addScheduleState(ScheduleState state) {
         // FIXME : might concurrent issue
         String toRemove = null;
         if (scheduleStates.size() > maxScheduleState) {
@@ -248,7 +229,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
     }
 
     @Override
-    public ScheduleState getScheduleState() {
+    public synchronized ScheduleState getScheduleState() {
         if (scheduleStates.size() > 0) {
             return scheduleStates.get(scheduleStates.lastKey());
         }
@@ -286,6 +267,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public OpResult clear() {
+        LOG.info("clear models...");
         this.assignments.clear();
         this.clusters.clear();
         this.datasources.clear();
@@ -316,6 +298,7 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
 
     @Override
     public OpResult importModels(Models models) {
+        LOG.info("clear and import models...");
         clear();
         this.assignments.addAll(models.assignments);
         this.clusters.addAll(models.clusters);
@@ -331,4 +314,8 @@ public class InMemMetadataDaoImpl implements IMetadataDao {
         return result;
     }
 
+    @Override
+    public void close() throws IOException {
+
+    }
 }