You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/07/21 06:40:00 UTC

[2/2] incubator-eagle git commit: [EAGLE-384] Alert Engine BugFix and Improvements

[EAGLE-384] Alert Engine BugFix and Improvements

1. Add schedule time in schedule state
2. Add publishment.toString()
3. Add cooridnator test for poicy update
4. Refine logging
5. Avoid explicit IP expose, use localhost instead
6. Ignore test cases that are empty
7. Add *.orig in apache rate exclusion
8. Ignore all EmbedHbase cases

Author: ralphsu
Reviewer: ralphsu

Closes #271


Project: http://git-wip-us.apache.org/repos/asf/incubator-eagle/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-eagle/commit/f0af3e5d
Tree: http://git-wip-us.apache.org/repos/asf/incubator-eagle/tree/f0af3e5d
Diff: http://git-wip-us.apache.org/repos/asf/incubator-eagle/diff/f0af3e5d

Branch: refs/heads/develop
Commit: f0af3e5dc7cebdff426d0450bea7256a31ca77ec
Parents: 04468a9
Author: Ralph, Su <su...@gmail.com>
Authored: Thu Jul 21 14:37:43 2016 +0800
Committer: Ralph, Su <su...@gmail.com>
Committed: Thu Jul 21 14:42:46 2016 +0800

----------------------------------------------------------------------
 .../alert/coordination/model/ScheduleState.java |  10 ++
 .../model/Tuple2StreamConverter.java            |   8 +-
 .../alert/engine/coordinator/Publishment.java   |   8 +
 .../eagle/alert/coordinator/Coordinator.java    |  31 +++-
 .../coordinator/impl/GreedyPolicyScheduler.java |   3 +-
 .../provider/InMemScheduleConext.java           |   4 +
 .../provider/ScheduleContextBuilder.java        |  32 +++-
 .../coordinator/DynamicPolicyLoaderTest.java    |   4 +-
 .../coordinator/ScheduleContextBuilderTest.java |  73 +++++++-
 .../apache/alert/coordinator/SchedulerTest.java | 175 +++++++++++++++----
 .../alert/coordinator/WorkSlotStrategyTest.java |  12 +-
 .../mock/InMemMetadataServiceClient.java        |  22 ++-
 .../mock/TestTopologyMgmtService.java           |  67 ++++---
 .../src/test/resources/multi/datasources.json   |  19 ++
 .../src/test/resources/multi/policies.json      |  52 ++++++
 .../src/test/resources/multi/publishments.json  |  26 +++
 .../test/resources/multi/streamdefinitions.json | 138 +++++++++++++++
 .../src/test/resources/multi/topologies.json    |  31 ++++
 .../evaluator/nodata/NoDataPolicyHandler.java   |  20 +++
 .../publisher/impl/AlertKafkaPublisher.java     |   4 +-
 .../publisher/impl/AlertPublisherImpl.java      |   4 +
 .../publisher/impl/KafkaProducerManager.java    |  45 +++--
 .../eagle/alert/engine/runner/AlertBolt.java    |   2 +-
 .../src/main/resources/application.conf         |  10 +-
 .../eagle/alert/engine/e2e/Integration1.java    |  38 ++--
 .../eagle/alert/engine/e2e/Integration3.java    | 107 ++++++++++++
 .../engine/e2e/Integration5AbsenceAlert.java    |   5 +-
 .../eagle/alert/engine/e2e/SampleClient3.java   |  96 ++++++++++
 .../eagle/alert/engine/topology/TestBolt.java   |  11 +-
 .../resources/absence/application-absence.conf  |  14 +-
 .../src/test/resources/absence/datasources.json |  30 ++--
 .../src/test/resources/absence/policies.json    |  44 ++---
 .../test/resources/absence/publishments.json    |  36 ++--
 .../resources/absence/streamdefinitions.json    |  54 +++---
 .../src/test/resources/absence/topologies.json  |  58 +++---
 .../src/test/resources/e2e/application-e2e.conf |  60 +++++++
 .../src/test/resources/e2e/datasources.json     |  21 +++
 .../src/test/resources/e2e/policies.json        |  28 +++
 .../src/test/resources/e2e/publishments.json    |  17 ++
 .../src/test/resources/e2e/sherlock.json        |  52 ++++++
 .../test/resources/e2e/streamdefinitions.json   |  76 ++++++++
 .../src/test/resources/e2e/topologies.json      |  31 ++++
 .../src/test/resources/e2e/ump_demo_schema.json | 170 ++++++++++++++++++
 .../src/test/resources/nodata/policies.json     |   2 +-
 .../src/test/resources/nodata/publishments.json |   2 +-
 .../simple/application-integration.conf         |   2 +-
 .../src/test/resources/simple/publishments.json |   2 +-
 .../metadata/impl/MongoMetadataDaoImpl.java     |   3 +-
 .../alert/resource/impl/MongoImplTest.java      |  26 ++-
 .../src/test/resources/application-mongo.conf   |   2 +-
 .../app/AlertDropWizardConfiguration.java       |  13 ++
 .../apache/eagle/service/app/ServiceApp.java    |   9 +
 .../src/main/resources/application.conf         |   2 +-
 .../src/main/resources/log4j.properties         |   2 +-
 .../eagle-alert-parent/eagle-alert/pom.xml      |   1 +
 .../eagle/service/hbase/TestHBaseBase.java      |   1 +
 .../service/generic/TestListQueryResource.java  |   1 +
 .../eagle/storage/hbase/TestHBaseStatement.java |   1 +
 .../coprocessor/TestGroupAggregateClient.java   |   1 +
 .../TestGroupAggregateTimeSeriesClient.java     |   1 +
 60 files changed, 1547 insertions(+), 272 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
index 6036f29..93d038f 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/ScheduleState.java
@@ -62,8 +62,10 @@ public class ScheduleState {
     private String generateTime;
     private int code = 200;
     private String message = "OK";
+    private int scheduleTimeMillis;
 
     public ScheduleState() {
+        this.generateTime = String.valueOf(new Date().getTime());
     }
 
     public ScheduleState(String version, 
@@ -212,4 +214,12 @@ public class ScheduleState {
         this.streamSnapshots = streamSnapshots;
     }
 
+    public int getScheduleTimeMillis() {
+        return scheduleTimeMillis;
+    }
+
+    public void setScheduleTimeMillis(int scheduleTimeMillis) {
+        this.scheduleTimeMillis = scheduleTimeMillis;
+    }
+
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
index f152b4c..8867fd7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/coordination/model/Tuple2StreamConverter.java
@@ -67,8 +67,8 @@ public class Tuple2StreamConverter {
         Object timeObject = m.get(metadata.getTimestampColumn());
         long timestamp = 0L;
         if (timeObject == null) {
-            if (LOG.isWarnEnabled()) {
-                LOG.warn("continue with current timestamp since no timestamp column specified! Metadata : ", metadata);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("continue with current timestamp since no timestamp column specified! Metadata : {} ", metadata);
             }
             timestamp = System.currentTimeMillis();
         } else if (timeObject instanceof Number) {
@@ -77,8 +77,8 @@ public class Tuple2StreamConverter {
             String timestampFieldValue = (String) m.get(metadata.getTimestampColumn());
             String dateFormat = metadata.getTimestampFormat();
             if (Strings.isNullOrEmpty(dateFormat)) {
-                if (LOG.isWarnEnabled()) {
-                    LOG.warn("continue with current timestamp becuase no data format sepcified! Metadata : ", metadata);
+                if (LOG.isDebugEnabled()) {
+                    LOG.debug("continue with current timestamp becuase no data format sepcified! Metadata : {} ", metadata);
                 }
                 timestamp = System.currentTimeMillis();
             } else 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
index b921b79..3ba2dcf 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-common/src/main/java/org/apache/eagle/alert/engine/coordinator/Publishment.java
@@ -101,4 +101,12 @@ public class Publishment {
                 .append(properties).build();
     }
 
+    @Override
+    public String toString() {
+        StringBuilder sb = new StringBuilder();
+        sb.append("Publishment[name:").append(name).append(",type:").append(type).append(",policyId:")
+                .append(policyIds).append(",properties:").append(properties);
+        return sb.toString();
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
index 9c97f9f..df4bc34 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
@@ -21,6 +21,7 @@ import java.util.Collection;
 import java.util.List;
 import java.util.concurrent.Executors;
 import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
 import java.util.concurrent.TimeUnit;
 import java.util.concurrent.atomic.AtomicBoolean;
 
@@ -39,14 +40,17 @@ import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.google.common.base.Stopwatch;
 import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 /**
- * @since Mar 24, 2016 Coordinator is a standalone java application, which
- *        listens to policy changes and use schedule algorithm to distribute
- *        policies 1) reacting to shutdown events 2) start non-daemon thread to
- *        pull policies and figure out if polices are changed
+ * TODO: To simply avoid concurrent call of schdule, make the schedule as synchronized. This is not safe when multiple
+ * instance, consider a distributed lock for prevent multiple schedule happen concurrently.
+ * 
+ * @since Mar 24, 2016 Coordinator is a standalone java application, which listens to policy changes and use schedule
+ *        algorithm to distribute policies 1) reacting to shutdown events 2) start non-daemon thread to pull policies
+ *        and figure out if polices are changed
  */
 public class Coordinator {
 
@@ -94,6 +98,7 @@ public class Coordinator {
     }
 
     public synchronized ScheduleState schedule(ScheduleOption option) {
+        Stopwatch watch = Stopwatch.createStarted();
         IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
         TopologyMgmtService mgmtService = new TopologyMgmtService();
         IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
@@ -101,9 +106,18 @@ public class Coordinator {
         scheduler.init(context, mgmtService);
         ScheduleState state = scheduler.schedule(option);
 
+        long scheduleTime = watch.elapsed(TimeUnit.MILLISECONDS);
+        state.setScheduleTimeMillis((int) scheduleTime);// hardcode to integer
+        watch.reset();
+        watch.start();
+
         // persist & notify
         postSchedule(client, state, producer);
 
+        watch.stop();
+        long postTime = watch.elapsed(TimeUnit.MILLISECONDS);
+        LOG.info("Schedule result, schedule time {} ms, post schedule time {} ms !", scheduleTime, postTime);
+
         currentState = state;
         return state;
     }
@@ -207,7 +221,14 @@ public class Coordinator {
         // schedule dynamic policy loader
         long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
         long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
-        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2);
+        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(2, new ThreadFactory() {
+            @Override
+            public Thread newThread(Runnable r) {
+                Thread t = new Thread();
+                t.setDaemon(true);
+                return t;
+            }
+        });
         scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
         
         // 

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
index a9b6c00..6c98fa6 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
@@ -137,6 +137,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
                 expectParal = policyDefaultParallelism;
             }
             // how to handle expand of an policy in a smooth transition manner
+            // TODO policy fix
             PolicyAssignment assignment = context.getPolicyAssignments().get(def.getName());
             if (assignment != null) {
                 LOG.info("policy {} already allocated", def.getName());
@@ -297,7 +298,7 @@ public class GreedyPolicyScheduler implements IPolicyScheduler {
      * @return
      */
     private int getQueueSize(int hint) {
-        return initialQueueSize;
+        return initialQueueSize * ((hint + initialQueueSize - 1) / initialQueueSize);
     }
 
     private boolean isQueueAvailable(StreamWorkSlotQueue queue, PolicyDefinition def) {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
index 84a4061..dea9419 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
@@ -145,4 +145,8 @@ public class InMemScheduleConext implements IScheduleContext {
         return publishments;
     }
 
+    public void addPublishment(Publishment pub) {
+        this.publishments.put(pub.getName(), pub);
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
index d4d6c0c..fd09462 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -92,9 +92,16 @@ public class ScheduleContextBuilder {
 
         // TODO: See ScheduleState comments on how to improve the storage
         ScheduleState state = client.getVersionedSpec();
-        assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : cleanupDeprecatedAssignments(state.getAssignments()));
 
-        monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams()));
+        // detect policy update, remove the policy assignments.
+        // definition change : the assignment would NOT change, the runtime will do reload and check
+        // stream change : the assignment would NOT change, the runtime will do reload and check
+        // data source change : the assignment would NOT change, the runtime will do reload and check
+        // parallelism change : the policies' assignment would be dropped when it's bigger than assign queue, and expect
+        //      to be assigned in scheduler.
+        assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : detectAssignmentsChange(state.getAssignments(), state));
+
+        monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : detectMonitoredStreams(state.getMonitoredStreams()));
 
         // build based on existing data
         usages = buildTopologyUsage();
@@ -119,7 +126,7 @@ public class ScheduleContextBuilder {
      * @param monitoredStreams
      * @return
      */
-    private List<MonitoredStream> cleanupDeprecatedStreamsAndAssignment(List<MonitoredStream> monitoredStreams) {
+    private List<MonitoredStream> detectMonitoredStreams(List<MonitoredStream> monitoredStreams) {
         List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams);
         
         // clear deprecated streams
@@ -193,14 +200,31 @@ public class ScheduleContextBuilder {
         }
     }
 
-    private List<PolicyAssignment> cleanupDeprecatedAssignments(List<PolicyAssignment> list) {
+    private List<PolicyAssignment> detectAssignmentsChange(List<PolicyAssignment> list, ScheduleState state) {
+        // FIXME: duplciated build map ?
+        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
+        for (MonitoredStream ms : state.getMonitoredStreams()) {
+            for (StreamWorkSlotQueue q : ms.getQueues()) {
+                queueMap.put(q.getQueueId(), q);
+            }
+        }
+
         List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list);
         Iterator<PolicyAssignment> paIt = result.iterator();
         while (paIt.hasNext()) {
             PolicyAssignment assignment = paIt.next();
+
             if (!policies.containsKey(assignment.getPolicyName())) {
                 LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment);
                 paIt.remove();
+            } else {
+                StreamWorkSlotQueue queue = queueMap.get(assignment.getQueueId());
+                if (queue == null
+                        || policies.get(assignment.getPolicyName()).getParallelismHint() > queue.getQueueSize()) {
+                    // queue not found or policy has hint bigger than queue (possible a poilcy update)
+                    LOG.info("Policy assignment {} 's policy doesnt match queue: {}!", assignment, queue);
+                    paIt.remove();
+                }
             }
         }
         return result;

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
index dcb031e..fe62b3b 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
@@ -19,11 +19,9 @@
 
 package org.apache.alert.coordinator;
 
-import org.junit.Ignore;
-
 /**
  * Since 4/28/16.
  */
-@Ignore
+@org.junit.Ignore
 public class DynamicPolicyLoaderTest {
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
index f2e67de..ed9d7b7 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
@@ -38,6 +38,7 @@ import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
 import org.apache.eagle.alert.coordinator.model.TopologyUsage;
 import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition;
 import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
@@ -91,7 +92,7 @@ public class ScheduleContextBuilderTest {
         Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
         StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
 
-        client.listPolicies().remove(0);
+        client.removePolicy(0);
         context = builder.buildContext();
         Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
 
@@ -102,7 +103,7 @@ public class ScheduleContextBuilderTest {
     }
 
     @Test
-    public void test_changed_policy() {
+    public void test_changed_policy_partition() {
         InMemMetadataServiceClient client = getSampleMetadataService();
         ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
         PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
@@ -140,6 +141,56 @@ public class ScheduleContextBuilderTest {
     }
 
     @Test
+    public void test_changed_policy_parallelism() {
+        InMemMetadataServiceClient client = getSampleMetadataService();
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
+
+        IScheduleContext context = builder.buildContext();
+        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
+
+        PolicyDefinition pd1 = client.listPolicies().get(0);
+        pd1.setParallelismHint(4); // default queue is 5 , change to smaller, has no effect
+
+        context = builder.buildContext();
+        PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next();
+        StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight();
+        Assert.assertNotNull(queueNew);
+        // just to make sure queueNew is present
+        Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
+
+        // default queue is 5 , change to bigger 6, policy assignment removed
+        pd1.setParallelismHint(queue.getQueueSize() + 1);
+        context = builder.buildContext();
+
+        Assert.assertFalse(context.getPolicyAssignments().values().iterator().hasNext());
+    }
+
+    @Test
+    public void test_changed_policy_definition() {
+        InMemMetadataServiceClient client = getSampleMetadataService();
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
+
+        IScheduleContext context = builder.buildContext();
+        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
+
+        PolicyDefinition pd1 = client.listPolicies().get(0);
+        pd1.getDefinition().value = "define.. new...";
+
+        context = builder.buildContext();
+        PolicyAssignment assignmentNew = context.getPolicyAssignments().values().iterator().next();
+        StreamWorkSlotQueue queueNew = SchedulerTest.getQueue(context, assignmentNew.getQueueId()).getRight();
+        Assert.assertNotNull(queueNew);
+        // just to make sure queueNew is present
+        Assert.assertEquals(queue.getQueueId(), queueNew.getQueueId());
+    }
+
+    @Test
     public void test_renamed_topologies() {
         InMemMetadataServiceClient client = getSampleMetadataService();
         ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
@@ -167,12 +218,11 @@ public class ScheduleContextBuilderTest {
 
     public static InMemMetadataServiceClient getSampleMetadataService() {
         InMemMetadataServiceClient client = new InMemMetadataServiceClient();
-        client.listTopologies().add(createSampleTopology());
-        client.listDataSources().add(createKafka2TupleMetadata());
-        // client.listSpoutMetadata().add(createS)
-        client.listPolicies().add(createPolicy());
-        client.listPublishment().add(createPublishment());
-        client.listStreams().add(createStreamDefinition());
+        client.addTopology(createSampleTopology());
+        client.addDataSource(createKafka2TupleMetadata());
+        client.addPolicy(createPolicy());
+        client.addPublishment(createPublishment());
+        client.addStreamDefinition(createStreamDefinition());
         client.addScheduleState(createScheduleState());
         return client;
     }
@@ -195,9 +245,15 @@ public class ScheduleContextBuilderTest {
         WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0);
         WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1);
         WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2);
+        WorkSlot slot3 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 3);
+        WorkSlot slot4 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 4);
+        WorkSlot slot5 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 5);
         slots.add(slot0);
         slots.add(slot1);
         slots.add(slot2);
+        slots.add(slot3);
+        slots.add(slot4);
+        slots.add(slot5);
 
         StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots);
         ms.addQueues(q);
@@ -216,6 +272,7 @@ public class ScheduleContextBuilderTest {
         def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1));
         def.setOutputStreams(Arrays.asList(OUT_STREAM1));
         def.setParallelismHint(5);
+        def.setDefinition(new Definition());
 
         streamGroup = new StreamGroup();
         par = new StreamPartition();

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
index f90decc..53de19a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/SchedulerTest.java
@@ -16,6 +16,7 @@
  */
 package org.apache.alert.coordinator;
 
+import java.io.File;
 import java.util.ArrayList;
 import java.util.Arrays;
 import java.util.Iterator;
@@ -23,6 +24,7 @@ import java.util.List;
 import java.util.Map;
 
 import org.apache.alert.coordinator.mock.TestTopologyMgmtService;
+import org.apache.commons.io.FileUtils;
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
@@ -41,32 +43,42 @@ import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.coordinator.IScheduleContext;
 import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
 import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
 import org.apache.eagle.alert.coordinator.model.TopologyUsage;
 import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
 import org.apache.eagle.alert.engine.coordinator.PolicyDefinition.Definition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn;
 import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
 import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.engine.coordinator.StreamPartition;
 import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
-import org.codehaus.jackson.map.ObjectMapper;
 import org.junit.Assert;
 import org.junit.BeforeClass;
 import org.junit.Test;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
 import com.typesafe.config.ConfigFactory;
 
 /**
+ * 
  * @since Apr 22, 2016
  *
  */
 public class SchedulerTest {
 
+    private static final String TOPO2 = "topo2";
+    private static final String TOPO1 = "topo1";
+    private static final int PARALELLISM = 5;
     private static final String STREAM2 = "stream2";
     private static final String JOIN_POLICY_1 = "join-policy-1";
     private static final String TEST_TOPIC = "test-topic";
@@ -83,12 +95,13 @@ public class SchedulerTest {
         ConfigFactory.invalidateCaches();
         System.setProperty("config.resource", "/application.conf");
     }
-    
+
     @Test
     public void test01_simple() throws Exception {
         GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-        IScheduleContext context = createScheduleContext();
-        ps.init(context, createMgmtService());
+        TestTopologyMgmtService mgmtService = createMgmtService();
+        IScheduleContext context = createScheduleContext(mgmtService);
+        ps.init(context, mgmtService);
         ps.schedule(new ScheduleOption());
 
         ScheduleState status = ps.getState();
@@ -97,7 +110,7 @@ public class SchedulerTest {
 
         LOG.info(mapper.writeValueAsString(spec));
         Assert.assertEquals(2, spec.size());
-        Assert.assertTrue(spec.containsKey("topo1"));
+        Assert.assertTrue(spec.containsKey(TOPO1));
         assertFirstPolicyScheduled(context, status);
     }
 
@@ -123,8 +136,8 @@ public class SchedulerTest {
                 Assert.assertEquals(1, streamMeta.groupingStrategies.size());
 
                 StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
                 Assert.assertEquals(0, gs.startSequence);
 
                 Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
@@ -198,24 +211,23 @@ public class SchedulerTest {
         }
     }
 
-    private TopologyMgmtService createMgmtService() {
-        TestTopologyMgmtService.BOLT_NUMBER = 5;
-        TopologyMgmtService mgmtService = new TestTopologyMgmtService();
+    private TestTopologyMgmtService createMgmtService() {
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
         return mgmtService;
     }
 
-    private InMemScheduleConext createScheduleContext() {
+    private InMemScheduleConext createScheduleContext(TestTopologyMgmtService mgmtService) {
         InMemScheduleConext context = new InMemScheduleConext();
         // topo
-        Pair<Topology, TopologyUsage> pair1 = TestTopologyMgmtService.createEmptyTopology("topo1");
-        Pair<Topology, TopologyUsage> pair2 = TestTopologyMgmtService.createEmptyTopology("topo2");
+        Pair<Topology, TopologyUsage> pair1 = mgmtService.createEmptyTopology(TOPO1);
+        Pair<Topology, TopologyUsage> pair2 = mgmtService.createEmptyTopology(TOPO2);
         context.addTopology(pair1.getLeft());
         context.addTopologyUsages(pair1.getRight());
         context.addTopology(pair2.getLeft());
         context.addTopologyUsages(pair2.getRight());
 
         // policy
-        createSamplePolicy(context, TEST_POLICY_1, STREAM1);
+        createSamplePolicy(context, TEST_POLICY_1, STREAM1, PARALELLISM);
 
         // data source
         Kafka2TupleMetadata ds = new Kafka2TupleMetadata();
@@ -269,9 +281,9 @@ public class SchedulerTest {
      */
     @Test
     public void test_schedule_add2() {
-        IScheduleContext context = createScheduleContext();
+        TestTopologyMgmtService mgmtService = createMgmtService();
+        IScheduleContext context = createScheduleContext(mgmtService);
         GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-        TopologyMgmtService mgmtService = new TopologyMgmtService();
         ps.init(context, mgmtService);
 
         ScheduleOption option = new ScheduleOption();
@@ -280,7 +292,7 @@ public class SchedulerTest {
         context = ps.getContext(); // context updated!
         assertFirstPolicyScheduled(context, status);
 
-        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_2, STREAM1);
+        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_2, STREAM1, PARALELLISM);
 
         ps.init(context, mgmtService); // reinit
         ps.schedule(option);
@@ -290,7 +302,7 @@ public class SchedulerTest {
         assertSecondPolicyCreated(context, status);
 
         // add one policy on different stream of the same topic
-        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_3, STREAM2);
+        createSamplePolicy((InMemScheduleConext) context, TEST_POLICY_3, STREAM2, PARALELLISM);
 
         ps.init(context, mgmtService); // re-init
         ps.schedule(option);
@@ -320,8 +332,8 @@ public class SchedulerTest {
                 Assert.assertEquals(1, streamMeta.groupingStrategies.size());
 
                 StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
                 Assert.assertEquals(0, gs.startSequence);
 
                 PolicyAssignment pa1 = context.getPolicyAssignments().get(TEST_POLICY_1);
@@ -347,8 +359,8 @@ public class SchedulerTest {
                 Assert.assertEquals(1, streamMeta.groupingStrategies.size());
 
                 StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
                 Assert.assertEquals(0, gs.startSequence);
 
                 // assert policy assignment for the three policies
@@ -362,7 +374,8 @@ public class SchedulerTest {
                 Assert.assertNotEquals(pa1.getQueueId(), pa3.getQueueId());
                 StreamWorkSlotQueue queue1 = getQueue(context, pa1.getQueueId()).getRight();
                 StreamWorkSlotQueue queue3 = getQueue(context, pa3.getQueueId()).getRight();
-                Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(), queue3.getWorkingSlots().get(0).getTopologyName());
+                Assert.assertNotEquals(queue1.getWorkingSlots().get(0).getTopologyName(),
+                        queue3.getWorkingSlots().get(0).getTopologyName());
             }
         }
         // group spec
@@ -425,8 +438,8 @@ public class SchedulerTest {
                 Assert.assertEquals(1, streamMeta.groupingStrategies.size());
 
                 StreamRepartitionStrategy gs = streamMeta.groupingStrategies.iterator().next();
-                Assert.assertEquals(5, gs.numTotalParticipatingRouterBolts);
-                Assert.assertEquals(5, gs.totalTargetBoltIds.size());
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.numTotalParticipatingRouterBolts);
+                Assert.assertEquals(TestTopologyMgmtService.ROUTER_BOLT_NUMBER, gs.totalTargetBoltIds.size());
                 Assert.assertEquals(0, gs.startSequence);
 
                 // assert two policy on the same queue
@@ -492,7 +505,8 @@ public class SchedulerTest {
                 LOG.info("alert spec topology name {}", topo1);
                 for (List<String> definitions : alertSpec.getBoltPolicyIdsMap().values()) {
                     Assert.assertEquals(2, definitions.size());
-//                    List<String> names = Arrays.asList(definitions.stream().map((t) -> t.getName()).toArray(String[]::new));
+                    // List<String> names = Arrays.asList(definitions.stream().map((t) ->
+                    // t.getName()).toArray(String[]::new));
                     Assert.assertTrue(definitions.contains(TEST_POLICY_1));
                     Assert.assertTrue(definitions.contains(TEST_POLICY_2));
                 }
@@ -540,9 +554,9 @@ public class SchedulerTest {
         Assert.assertTrue(list.contains(gs2));
     }
 
-    private void createSamplePolicy(InMemScheduleConext context, String policyName, String stream) {
+    private void createSamplePolicy(InMemScheduleConext context, String policyName, String stream, int hint) {
         PolicyDefinition pd = new PolicyDefinition();
-        pd.setParallelismHint(5);
+        pd.setParallelismHint(hint);
         Definition def = new Definition();
         pd.setDefinition(def);
         pd.setName(policyName);
@@ -566,7 +580,12 @@ public class SchedulerTest {
 
     @Test
     public void test_schedule_updateParitition() {
-        // TODO
+        // This case design test is move to outter logic of ScheduleConetxtBuilder
+    }
+
+    @Test
+    public void test_schedule_updateDefinition() {
+        // This case design test is move to outter logic of ScheduleConetxtBuilder
     }
 
     @Test
@@ -577,9 +596,9 @@ public class SchedulerTest {
     @SuppressWarnings("unused")
     @Test
     public void test_schedule_multipleStream() throws Exception {
-        IScheduleContext context = createScheduleContext();
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
+        IScheduleContext context = createScheduleContext(mgmtService);
         GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
-        TopologyMgmtService mgmtService = new TopologyMgmtService();
 
         createJoinPolicy((InMemScheduleConext) context, JOIN_POLICY_1, Arrays.asList(STREAM1, STREAM2));
 
@@ -632,4 +651,96 @@ public class SchedulerTest {
         }
         context.addPoilcy(pd);
     }
+
+    @Test
+    public void testIrregularPolicyParallelismHint() {
+        Config config = ConfigFactory.load();
+        int defaultParallelism = config.getInt("coordinator.policyDefaultParallelism");
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 12);
+        InMemScheduleConext context = createScheduleContext(mgmtService);
+        // recreate test poicy
+        context.getPolicies().clear();
+        // make the hint bigger than bolt number
+        int irregularParallelism = defaultParallelism + 2;
+        createSamplePolicy(context, "irregularPolicy", STREAM1, irregularParallelism);
+        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
+
+        ps.init(context, mgmtService);
+
+        ScheduleState scheduled = ps.schedule(new ScheduleOption());
+        Assert.assertEquals(2, scheduled.getSpoutSpecs().size());
+        Assert.assertEquals(2, scheduled.getGroupSpecs().size());
+        Assert.assertEquals(2, scheduled.getAlertSpecs().size());
+        // assertion
+        RouterSpec spec = scheduled.getGroupSpecs().get(TOPO1);
+        Assert.assertTrue(spec.getRouterSpecs().size() > 0); // must be allocated
+        for (StreamRouterSpec routerSpec : spec.getRouterSpecs()) {
+            Assert.assertEquals(1, routerSpec.getTargetQueue().size());
+            // irregularParallelism is prompted to 2 * defaultParallelism = 10
+            Assert.assertEquals(10, routerSpec.getTargetQueue().get(0).getWorkers().size());
+        }
+    }
+
+    @Test
+    public void testDataSources() throws Exception {
+        InMemScheduleConext context = loadContext("/multi/");
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(4, 10);
+
+        GreedyPolicyScheduler ps = new GreedyPolicyScheduler();
+        ps.init(context, mgmtService);
+
+        ScheduleState state = ps.schedule(new ScheduleOption());
+        Assert.assertNotNull(state);
+        Assert.assertEquals(2, state.getAssignments().size());
+        Assert.assertEquals(1, state.getAlertSpecs().size());
+        Assert.assertEquals(10, state.getAlertSpecs().get("alertUnitTopology_1").getBoltPolicyIdsMap().size());
+    }
+
+    private InMemScheduleConext loadContext(String base) throws Exception {
+        InMemScheduleConext context = new InMemScheduleConext();
+
+        List<Kafka2TupleMetadata> metadata = loadEntities(base + "datasources.json", Kafka2TupleMetadata.class);
+        for (Kafka2TupleMetadata k : metadata) {
+            context.addDataSource(k);
+        }
+
+        List<PolicyDefinition> policies = loadEntities(base + "policies.json", PolicyDefinition.class);
+        for (PolicyDefinition p : policies) {
+            context.addPoilcy(p);
+        }
+
+        List<Publishment> pubs = loadEntities(base + "publishments.json", Publishment.class);
+        for (Publishment pub : pubs) {
+            context.addPublishment(pub);
+        }
+
+        List<StreamDefinition> defs = loadEntities(base + "streamdefinitions.json", StreamDefinition.class);
+        for (StreamDefinition def : defs) {
+            context.addSchema(def);
+        }
+
+        List<Topology> topos = loadEntities(base + "topologies.json", Topology.class);
+        for (Topology t : topos) {
+            context.addTopology(t);
+            
+            TopologyUsage u = new TopologyUsage(t.getName());
+            for (String gnid : t.getGroupNodeIds()) {
+                u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid));
+            }
+            for (String anid : t.getAlertBoltIds()) {
+                u.getAlertUsages().put(anid, new AlertBoltUsage(anid));
+            }
+            context.addTopologyUsages(u);
+        }
+
+        return context;
+    }
+
+    public static <T> List<T> loadEntities(String path, Class<T> tClz) throws Exception {
+        System.out.println(FileUtils.readFileToString(new File(SchedulerTest.class.getResource(path).getPath())));
+        JavaType type = CollectionType.construct(List.class, SimpleType.construct(tClz));
+        List<T> l = mapper.readValue(SchedulerTest.class.getResourceAsStream(path), type);
+        return l;
+    }
+
 }

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
index e17ebab..52ea022 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/WorkSlotStrategyTest.java
@@ -58,18 +58,18 @@ public class WorkSlotStrategyTest {
         StreamGroup group = new StreamGroup();
         group.addStreamPartition(partition);
 
-        TestTopologyMgmtService.BOLT_NUMBER = 3;
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
 
-        SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService);
         {
+            TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(3, 3, "prefix-time1", true);
+            SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService);
             List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>());
             Assert.assertEquals(0, slots.size());
             Assert.assertEquals(1, context.getTopologies().size());
         }
 
-        TestTopologyMgmtService.BOLT_NUMBER = 5;
         {
+            TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-time2", true);
+            SameTopologySlotStrategy strategy = new SameTopologySlotStrategy(context, group, mgmtService);
             List<WorkSlot> slots = strategy.reserveWorkSlots(5, false, new HashMap<String, Object>());
             Assert.assertEquals(5, slots.size());
             LOG.info(slots.get(0).getTopologyName());
@@ -110,8 +110,7 @@ public class WorkSlotStrategyTest {
 
         MonitoredStream ms1 = new MonitoredStream(sg);
 
-        TestTopologyMgmtService.BOLT_NUMBER = 5;
-        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService();
+        TestTopologyMgmtService mgmtService = new TestTopologyMgmtService(5, 5, "prefix-3", true);
 
         String topo1 = null;
         String bolt1 = null;
@@ -145,7 +144,6 @@ public class WorkSlotStrategyTest {
         sg2.addStreamPartition(partition2);
         MonitoredStream ms2 = new MonitoredStream(sg2);
         queue = wrb.createQueue(ms2, false, 5, new HashMap<String, Object>());
-        TestTopologyMgmtService.BOLT_NUMBER = 5;
         {
             Assert.assertEquals(5, queue.getWorkingSlots().size());
             Assert.assertEquals(2, context.getTopologies().size());

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
index 6b88f7d..f19533a 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/InMemMetadataServiceClient.java
@@ -18,6 +18,7 @@ package org.apache.alert.coordinator.mock;
 
 import java.io.IOException;
 import java.util.ArrayList;
+import java.util.Collections;
 import java.util.Iterator;
 import java.util.List;
 import java.util.Map.Entry;
@@ -35,6 +36,9 @@ import org.apache.eagle.alert.engine.coordinator.StreamingCluster;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 
 /**
+ * According to metadata servic client semantic, change to the interface returned value should not direclty change the
+ * states.
+ * 
  * @since May 5, 2016
  *
  */
@@ -57,37 +61,41 @@ public class InMemMetadataServiceClient implements IMetadataServiceClient {
 
     @Override
     public List<StreamingCluster> listClusters() {
-        return clusters;
+        return Collections.unmodifiableList(clusters);
     }
 
     @Override
     public List<Topology> listTopologies() {
-        return topologies;
+        return Collections.unmodifiableList(topologies);
     }
 
     @Override
     public List<PolicyDefinition> listPolicies() {
-        return policies;
+        return Collections.unmodifiableList(policies);
+    }
+    
+    public void removePolicy(int idx) {
+        policies.remove(idx);
     }
 
     @Override
     public List<StreamDefinition> listStreams() {
-        return definitions;
+        return Collections.unmodifiableList(definitions);
     }
 
     @Override
     public List<Kafka2TupleMetadata> listDataSources() {
-        return datasources;
+        return Collections.unmodifiableList(datasources);
     }
 
     @Override
     public List<Publishment> listPublishment() {
-        return publishmetns;
+        return Collections.unmodifiableList(publishmetns);
     }
 
     @Override
     public List<SpoutSpec> listSpoutMetadata() {
-        return spoutSpecs;
+        return Collections.unmodifiableList(spoutSpecs);
     }
 
     @Override

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
index fe36d18..d897454 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/mock/TestTopologyMgmtService.java
@@ -16,52 +16,77 @@
  */
 package org.apache.alert.coordinator.mock;
 
+import java.util.List;
+
 import org.apache.commons.lang3.tuple.Pair;
 import org.apache.eagle.alert.coordination.model.internal.Topology;
 import org.apache.eagle.alert.coordinator.TopologyMgmtService;
 import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
 import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
 import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.junit.Ignore;
 
-import java.util.List;
-
-@Ignore
+@org.junit.Ignore
 public class TestTopologyMgmtService extends TopologyMgmtService {
 
-    public static int BOLT_NUMBER = 5;
-    
+    public static final int ROUTER_BOLT_NUMBER = 6;
+    public static final int BOLT_NUMBER = 7;
+
+    private int boltNumber;
+    private int routerNumber;
     private int i = 0;
+    private String namePrefix = "Topology";
+    
+    // a config used to check if createTopology is enabled. FIXME: another class of mgmt service might be better
+    private boolean enableCreateTopology = false;
+
+    public TestTopologyMgmtService() {
+        boltNumber = BOLT_NUMBER;// default behaivor
+        this.routerNumber = ROUTER_BOLT_NUMBER;
+    }
+
+    public TestTopologyMgmtService(int routerNumber, int boltNumber) {
+        this.routerNumber = routerNumber;
+        this.boltNumber = boltNumber;
+    }
+
+    public TestTopologyMgmtService(int routerNumber, int boltNumber, String prefix, boolean enable) {
+        this.routerNumber = routerNumber;
+        this.boltNumber = boltNumber;
+        this.namePrefix = prefix;
+        this.enableCreateTopology = enable;
+    }
 
     @Override
     public TopologyMeta creatTopology() {
-        TopologyMeta tm = new TopologyMeta();
-        tm.topologyId = "Topoloy" + (i++);
-        tm.clusterId = "default-cluster";
-        tm.nimbusHost = "localhost";
-        tm.nimbusPort = "3000";
-        Pair<Topology, TopologyUsage> pair = TestTopologyMgmtService.createEmptyTopology(tm.topologyId);
-        tm.topology = pair.getLeft();
-        tm.usage = pair.getRight();
-        
-        return tm;
+        if (enableCreateTopology) {
+            TopologyMeta tm = new TopologyMeta();
+            tm.topologyId = namePrefix + (i++);
+            tm.clusterId = "default-cluster";
+            tm.nimbusHost = "localhost";
+            tm.nimbusPort = "3000";
+            Pair<Topology, TopologyUsage> pair = createEmptyTopology(tm.topologyId);
+            tm.topology = pair.getLeft();
+            tm.usage = pair.getRight();
+            return tm;
+        } else {
+            throw new UnsupportedOperationException("not supported yet!");
+        }
     }
 
     @Override
     public List<TopologyMeta> listTopologies() {
-        // TODO Auto-generated method stub
         return super.listTopologies();
     }
 
-    public static Pair<Topology, TopologyUsage> createEmptyTopology(String topoName) {
-        Topology t = new Topology(topoName, TestTopologyMgmtService.BOLT_NUMBER, TestTopologyMgmtService.BOLT_NUMBER);
+    public Pair<Topology, TopologyUsage> createEmptyTopology(String topoName) {
+        Topology t = new Topology(topoName, routerNumber, boltNumber);
         for (int i = 0; i < t.getNumOfGroupBolt(); i++) {
             t.getGroupNodeIds().add(t.getName() + "-grp-" + i);
         }
         for (int i = 0; i < t.getNumOfAlertBolt(); i++) {
             t.getAlertBoltIds().add(t.getName() + "-alert-" + i);
         }
-    
+
         TopologyUsage u = new TopologyUsage(topoName);
         for (String gnid : t.getGroupNodeIds()) {
             u.getGroupUsages().put(gnid, new GroupBoltUsage(gnid));
@@ -69,7 +94,7 @@ public class TestTopologyMgmtService extends TopologyMgmtService {
         for (String anid : t.getAlertBoltIds()) {
             u.getAlertUsages().put(anid, new AlertBoltUsage(anid));
         }
-    
+
         return Pair.of(t, u);
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
new file mode 100644
index 0000000..8678fe6
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/datasources.json
@@ -0,0 +1,19 @@
+[
+{
+	"name": "network_syslog_datasource",
+	"type": "KAFKA",
+	"properties": {
+	},
+	"topic": "logoutput",
+	"schemeCls": "org.apache.eagle.alert.engine.extension.SherlockEventScheme",
+	"codec": {
+		"streamNameSelectorProp": {
+			"fieldNamesToInferStreamName" : "namespace",
+			"streamNameFormat":"stream_%s"
+		},
+		"streamNameSelectorCls":"org.apache.eagle.alert.engine.scheme.JsonStringStreamNameSelector",
+		"timestampColumn": null,
+		"timestampFormat":""
+	}
+}
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json
new file mode 100644
index 0000000..ba7d120
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/policies.json
@@ -0,0 +1,52 @@
+[
+  {
+    "name": "syslog_severity_check_stream_demons1",
+    "description": "syslog demons1",
+    "inputStreams": [
+      "stream_demons1"
+    ],
+    "outputStreams": [
+      "syslog_severity_check_output"
+    ],
+    "definition": {
+      "type": "siddhi",
+      "value": "from syslog_stream[dims_severity == \"NOTICE\"] select 'alert' as name, namespace, epochMillis, timestamp, interface,state,version,type,origmsg, dims_severity, dims_facility, dims_hostname, dims_msgid insert into syslog_severity_check_output;"
+    },
+    "partitionSpec": [
+      {
+        "streamId": "stream_demons1",
+        "type": "GROUPBY",
+        "columns": [
+          "dims_hostname"
+        ],
+        "sortSpec": null
+      }
+    ],
+    "parallelismHint": 5
+  },
+  {
+    "name": "syslog_severity_check_stream_umpns2",
+    "description": "syslog umpn2",
+    "inputStreams": [
+      "stream_umpns2"
+    ],
+    "outputStreams": [
+      "syslog_severity_check_output"
+    ],
+    "definition": {
+      "type": "siddhi",
+      "value": "from syslog_stream[dims_severity == \"NOTICE\"] select 'alert' as name, namespace, epochMillis, program, message,  dims_severity, dims_facility, dims_hostname, dims_msgid  insert into syslog_severity_check_output;"
+    },
+    "partitionSpec": [
+      {
+        "streamId": "stream_umpns2",
+        "type": "GROUPBY",
+        "columns": [
+          "dims_hostname"
+        ],
+        "sortSpec": null
+      }
+    ],
+    "parallelismHint": 5
+  }
+]

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
new file mode 100644
index 0000000..d306be2
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/publishments.json
@@ -0,0 +1,26 @@
+[
+{
+  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+  "name":"network-syslog-publish",
+  "policyIds": ["syslog_severity_check_stream_demons1", "syslog_severity_check_stream_umpns2"],
+  "dedupIntervalMin": "PT0M",
+  "properties":{
+    "kafka_broker":"127.0.0.1:9092",
+    "topic":"syslog_alerts",
+    "value_deserializer": "org.apache.kafka.common.serialization.ByteArrayDeserializer",
+    "value_serializer": "org.apache.kafka.common.serialization.ByteArraySerializer"
+  },
+  "serializer" : "org.apache.eagle.alert.engine.extension.SherlockAlertSerializer"
+},
+{
+  "type": "org.apache.eagle.alert.engine.publisher.impl.AlertKafkaPublisher",
+  "name":"network-syslog-publish_rawjson",
+  "policyIds": ["syslog_severity_check_stream_demons1", "syslog_severity_check_stream_umpns2"],
+  "dedupIntervalMin": "PT0M",
+  "properties":{
+    "kafka_broker":"127.0.0.1:9092",
+    "topic":"syslog_alerts_json"
+  },
+  "serializer" : "org.apache.eagle.alert.engine.publisher.impl.JsonEventSerializer"
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
new file mode 100644
index 0000000..cee6f8c
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/streamdefinitions.json
@@ -0,0 +1,138 @@
+[
+{
+	"streamId": "stream_demons1",
+	"dataSource" : "network_syslog_datasource",
+	"description":"the data stream for syslog events",
+	"validate": false,
+	"timeseries":false,
+	"columns": [
+		{
+			"name": "dims_facility",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		},{
+			"name": "dims_severity",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},
+		{
+			"name": "dims_hostname",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},
+		{
+			"name": "dims_msgid",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "timestamp",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		},{
+			"name": "state",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "interface",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		},{
+			"name": "version",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		},{
+			"name": "type",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "origmsg",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "name",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},
+		{
+			"name": "namespace",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "epochMillis",
+			"type" : "LONG",
+			"defaultValue": 0,
+			"required": true
+		}
+	]
+},
+{
+	"streamId": "stream_umpns2",
+	"dataSource" : "network_syslog_datasource",
+	"description":"the data stream for syslog events",
+	"validate": false,
+	"timeseries":false,
+	"columns": [
+		{
+			"name": "dims_facility",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		},{
+			"name": "dims_severity",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},
+		{
+			"name": "dims_hostname",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},
+		{
+			"name": "dims_msgid",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "program",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required":true
+		},{
+			"name": "message",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "name",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},
+		{
+			"name": "namespace",
+			"type" : "STRING",
+			"defaultValue": "",
+			"required": true
+		},{
+			"name": "epochMillis",
+			"type" : "LONG",
+			"defaultValue": 0,
+			"required": true
+		}
+	]
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
new file mode 100644
index 0000000..7b1732f
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-coordinator/src/test/resources/multi/topologies.json
@@ -0,0 +1,31 @@
+[
+{
+	"name": "alertUnitTopology_1",
+	"numOfSpout":1,
+	"numOfGroupBolt": 4,
+	"numOfAlertBolt": 10,
+	"spoutId": "alertEngineSpout",
+	"groupNodeIds" : [
+		"streamRouterBolt0",
+		"streamRouterBolt1",
+		"streamRouterBolt2",
+		"streamRouterBolt3"
+	],
+	"alertBoltIds": [
+		"alertBolt0",
+		"alertBolt1",
+		"alertBolt2",
+		"alertBolt3",
+		"alertBolt4",
+		"alertBolt5",
+		"alertBolt6",
+		"alertBolt7",
+		"alertBolt8",
+		"alertBolt9"
+	],
+	"pubBoltId" : "alertPublishBolt",
+	"spoutParallelism": 1,
+	"groupParallelism": 1,
+	"alertParallelism": 1
+}
+]
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
index 6e5beb6..0e9ab6c 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/evaluator/nodata/NoDataPolicyHandler.java
@@ -72,6 +72,26 @@ import org.slf4j.LoggerFactory;
  ],
  "parallelismHint": 2
  }
+     "name": "noDataAlertPolicy",
+     "description": "noDataAlertPolicy",
+     "inputStreams": [
+        "noDataAlertStream"
+     ],
+     "outputStreams": [
+        "noDataAlertStream_out"
+     ],
+     "definition": {
+        "type": "nodataalert",
+        "value": "PT1M,plain,1,host,host1,host2"   // or "value": "PT1M,dynamic,1,host"
+     },
+     "partitionSpec": [
+     {
+        "streamId": "noDataAlertStream",
+        "type": "GROUPBY"
+     }
+     ],
+     "parallelismHint": 2
+   }
  */
 public class NoDataPolicyHandler implements PolicyStreamHandler{
     private static final Logger LOG = LoggerFactory.getLogger(NoDataPolicyHandler.class);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
index 2566f79..ca46aeb 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertKafkaPublisher.java
@@ -79,7 +79,9 @@ public class AlertKafkaPublisher extends AbstractPublishPlugin {
             future.get(MAX_TIMEOUT_MS, TimeUnit.MILLISECONDS);
             status.successful = true;
             status.errorMessage = "";
-            LOG.info("Successfully send message to Kafka: " + brokerList);
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Successfully send message to Kafka: " + brokerList);
+            }
         } catch (InterruptedException | ExecutionException e) {
             status.successful = false;
             status.errorMessage = String.format("Failed to send message to %s, due to:%s", brokerList, e);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
index 6baa616..f9306dc 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/AlertPublisherImpl.java
@@ -104,6 +104,10 @@ public class AlertPublisherImpl implements AlertPublisher {
         }
 
         for (Publishment publishment : added) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug(publishment.toString());
+            }
+
             AlertPublishPlugin plugin = AlertPublishPluginsFactory.createNotificationPlugin(publishment, config, conf);
             if(plugin != null) {
                 publishPluginMapping.put(publishment.getName(), plugin);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
index 6b7fc61..fb038ba 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/publisher/impl/KafkaProducerManager.java
@@ -21,18 +21,30 @@ import java.util.Map;
 import java.util.Properties;
 
 import org.apache.kafka.clients.producer.KafkaProducer;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * The producer is thread safe and sharing a single producer instance across threads will generally be faster than
  * having multiple instances.
  */
 public class KafkaProducerManager {
+    private static final Logger LOG = LoggerFactory.getLogger(KafkaProducerManager.class);
 
     private static final String STRING_SERIALIZER = "org.apache.kafka.common.serialization.StringSerializer";
+    private static final String STRING_DESERIALIZER = "org.apache.kafka.common.serialization.StringDeserializer";
+    
     private static final String VALUE_DESERIALIZER = "value.deserializer";
-    private static final String KEY_DESERIALIZER = "key.deserializer";
+    private static final String VALUE_DESERIALIZER_UNDERSCORE = "value_deserializer";
+    
     private static final String VALUE_SERIALIZER = "value.serializer";
+    private static final String VALUE_SERIALIZER_UNDERSCORE = "value_serializer";
+    
+    private static final String KEY_DESERIALIZER = "key.deserializer";
+    private static final String KEY_DESERIALIZER_UNDERSCORE = "key_deserializer";
+    
     private static final String KEY_SERIALIZER = "key.serializer";
+    private static final String KEY_SERIALIZER_UNDERSCORE = "key_serializer";
 
     public static final KafkaProducerManager INSTANCE = new KafkaProducerManager();
 
@@ -41,29 +53,36 @@ public class KafkaProducerManager {
         configMap.put("bootstrap.servers", brokerList);
         configMap.put("metadata.broker.list", brokerList);
 
-        if (kafkaConfig.containsKey(KEY_SERIALIZER)) {
-            configMap.put(KEY_SERIALIZER, kafkaConfig.get(KEY_SERIALIZER));
+        // key serializer
+        if (kafkaConfig.containsKey(KEY_SERIALIZER_UNDERSCORE)) {
+            configMap.put(KEY_SERIALIZER, kafkaConfig.get(KEY_SERIALIZER_UNDERSCORE));
         } else {
             configMap.put(KEY_SERIALIZER, STRING_SERIALIZER);
         }
 
-        if (kafkaConfig.containsKey(VALUE_SERIALIZER)) {
-            configMap.put(VALUE_SERIALIZER, kafkaConfig.get(VALUE_SERIALIZER));
+        if (kafkaConfig.containsKey(KEY_DESERIALIZER_UNDERSCORE)) {
+            configMap.put(KEY_DESERIALIZER, kafkaConfig.get(KEY_DESERIALIZER_UNDERSCORE));
+        } else {
+            configMap.put(KEY_DESERIALIZER, STRING_DESERIALIZER);
+        }
+
+        // value serializer
+        if (kafkaConfig.containsKey(VALUE_SERIALIZER_UNDERSCORE)) {
+            configMap.put(VALUE_SERIALIZER, kafkaConfig.get(VALUE_SERIALIZER_UNDERSCORE));
         } else {
             configMap.put(VALUE_SERIALIZER, STRING_SERIALIZER);
         }
         configMap.put("request.required.acks", "1");
-
-        if (kafkaConfig.containsKey(KEY_DESERIALIZER)) {
-            configMap.put(KEY_DESERIALIZER, kafkaConfig.get(KEY_DESERIALIZER));
+        
+        // value deserializer
+        if (kafkaConfig.containsKey(VALUE_DESERIALIZER_UNDERSCORE)) {
+            configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER_UNDERSCORE));
         } else {
-            configMap.put(KEY_DESERIALIZER, STRING_SERIALIZER);
+            configMap.put(VALUE_DESERIALIZER, STRING_DESERIALIZER);
         }
 
-        if (kafkaConfig.containsKey(VALUE_DESERIALIZER)) {
-            configMap.put(VALUE_DESERIALIZER, kafkaConfig.get(VALUE_DESERIALIZER));
-        } else {
-            configMap.put(VALUE_DESERIALIZER, STRING_SERIALIZER);
+        if (LOG.isInfoEnabled()) {
+            LOG.info(" given kafka config {}, create producer config map {}", kafkaConfig, configMap);
         }
 
         KafkaProducer<String, Object> producer = new KafkaProducer<>(configMap);

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
index e53b0ba..140fbff 100755
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/java/org/apache/eagle/alert/engine/runner/AlertBolt.java
@@ -67,7 +67,7 @@ public class AlertBolt extends AbstractStreamBolt implements AlertBoltSpecListen
     private volatile Map<String, PolicyDefinition> cachedPolicies = new HashMap<>(); // for one streamGroup, there are multiple policies
 
     private StreamContext streamContext;
-    private volatile Map<String, StreamDefinition> sdf;
+    private volatile Map<String, StreamDefinition> sdf  = new HashMap<String, StreamDefinition>();
     private PartitionedEventSerializer serializer;
 
     public AlertBolt(String boltId, PolicyGroupEvaluator policyGroupEvaluator, Config config, IMetadataChangeNotifyService changeNotifyService){

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
index e3be1b7..094fd92 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/main/resources/application.conf
@@ -24,8 +24,8 @@
     "localMode" : "true"
   },
   "spout" : {
-    "kafkaBrokerZkQuorum": "10.254.194.245:2181",
-    "kafkaBrokerZkBasePath": "/brokers",
+    "kafkaBrokerZkQuorum": "localhost:2181",
+    "kafkaBrokerZkBasePath": "/kafka",
     "stormKafkaUseSameZkQuorumWithKafkaBroker": true,
     "stormKafkaTransactionZkQuorum": "",
     "stormKafkaTransactionZkPath": "/consumers",
@@ -34,8 +34,8 @@
     "stormKafkaFetchSizeBytes": 1048586,
   },
   "zkConfig" : {
-    "zkQuorum" : "10.254.194.245:2181",
-    "zkRoot" : "/kafka",
+    "zkQuorum" : "localhost:2181",
+    "zkRoot" : "/alert",
     "zkSessionTimeoutMs" : 10000,
     "connectionTimeoutMs" : 10000,
     "zkRetryTimes" : 3,
@@ -47,7 +47,7 @@
   },
   "metadataService": {
     "context" : "/rest",
-    "host" : "127.0.0.1",
+    "host" : "localhost",
     "port" : 8080
   },
   "coordinatorService": {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
index ac07d19..8b28080 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration1.java
@@ -16,17 +16,16 @@
  */
 package org.apache.eagle.alert.engine.e2e;
 
-import java.util.List;
-import java.util.Properties;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ThreadFactory;
-
+import backtype.storm.utils.Utils;
+import com.fasterxml.jackson.databind.JavaType;
+import com.fasterxml.jackson.databind.ObjectMapper;
+import com.fasterxml.jackson.databind.type.CollectionType;
+import com.fasterxml.jackson.databind.type.SimpleType;
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
 import kafka.admin.AdminUtils;
 import kafka.utils.ZKStringSerializer$;
-
 import org.I0Itec.zkclient.ZkClient;
-import org.I0Itec.zkclient.ZkConnection;
 import org.apache.eagle.alert.config.ZKConfig;
 import org.apache.eagle.alert.config.ZKConfigBuilder;
 import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
@@ -38,22 +37,15 @@ import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
 import org.apache.eagle.alert.service.IMetadataServiceClient;
 import org.apache.eagle.alert.service.MetadataServiceClientImpl;
 import org.apache.eagle.alert.utils.KafkaEmbedded;
-import org.junit.AfterClass;
-import org.junit.Assert;
-import org.junit.BeforeClass;
-import org.junit.Ignore;
-import org.junit.Test;
+import org.junit.*;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import backtype.storm.utils.Utils;
-
-import com.fasterxml.jackson.databind.JavaType;
-import com.fasterxml.jackson.databind.ObjectMapper;
-import com.fasterxml.jackson.databind.type.CollectionType;
-import com.fasterxml.jackson.databind.type.SimpleType;
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
+import java.util.List;
+import java.util.Properties;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ThreadFactory;
 
 
 /**
@@ -155,9 +147,7 @@ public class Integration1 {
 
         ZkClient zkClient = new ZkClient(zkconfig.zkQuorum, 10000, 10000, ZKStringSerializer$.MODULE$);
         Properties topicConfiguration = new Properties();
-        ZkConnection zkConnection = new ZkConnection(zkconfig.zkQuorum);
-//        ZkUtils zkUtils = new ZkUtils(zkClient, zkConnection, false);
-        AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration);// RackAwareMode.Disabled$.MODULE$);
+        AdminUtils.createTopic(zkClient, topic, 1, 1, topicConfiguration);
     }
 
     public static void proactive_schedule(Config config) throws Exception {

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
new file mode 100644
index 0000000..cb8cda9
--- /dev/null
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration3.java
@@ -0,0 +1,107 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.engine.e2e;
+
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+
+import org.apache.eagle.alert.engine.UnitTopologyMain;
+import org.apache.eagle.alert.utils.KafkaEmbedded;
+import org.junit.AfterClass;
+import org.junit.BeforeClass;
+import org.junit.Test;
+
+import backtype.storm.utils.Utils;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 10, 2016
+ *
+ */
+public class Integration3 {
+
+    private String[] args;
+    private ExecutorService executors = Executors.newFixedThreadPool(5);
+    private static KafkaEmbedded kafka;
+
+    @BeforeClass
+    public static void setup() {
+        // FIXME : start local kafka
+    }
+
+    @AfterClass
+    public static void end() {
+        if (kafka != null) {
+            kafka.shutdown();
+        }
+    }
+
+    /**
+     * Assumption:
+     * <p>
+     * start metadata service 8080 /rest
+     * 
+     * <pre>
+     * user@kafka-host:~$ $KAFKA_HOME/bin/kafka-topics.sh --create --zookeeper localhost:2181 --replication-factor 1 --partitions 1 --topic syslog_events
+     * </pre>
+     * <p>
+     * 
+     * @throws InterruptedException
+     */
+    @Test
+    public void testSeverity() throws Exception {
+        System.setProperty("config.resource", "/e2e/application-e2e.conf");
+        ConfigFactory.invalidateCaches();
+        Config config = ConfigFactory.load();
+
+        System.out.println("loading metadatas...");
+        Integration1.loadMetadatas("/e2e/", config);
+        System.out.println("loading metadatas done!");
+
+        // send sample sherlock data
+        executors.submit(() -> {
+            try {
+                SampleClient3.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        executors.submit(() -> {
+            try {
+                UnitTopologyMain.main(args);
+            } catch (Exception e) {
+                e.printStackTrace();
+            }
+        });
+
+        Utils.sleep(1000 * 5l);
+        while (true) {
+            Integration1.proactive_schedule(config);
+
+            Utils.sleep(1000 * 60l * 5);
+        }
+    }
+
+    @Test
+    public void testJson() throws Exception {
+        Integration1.checkAll("/e2e/");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/f0af3e5d/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
index 52d1e5d..4d039d4 100644
--- a/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
+++ b/eagle-core/eagle-alert-parent/eagle-alert/alert-engine/src/test/java/org/apache/eagle/alert/engine/e2e/Integration5AbsenceAlert.java
@@ -23,7 +23,6 @@ import org.apache.eagle.alert.engine.UnitTopologyMain;
 import org.apache.eagle.alert.utils.KafkaEmbedded;
 import org.junit.AfterClass;
 import org.junit.BeforeClass;
-import org.junit.Ignore;
 import org.junit.Test;
 
 import java.util.concurrent.ExecutorService;
@@ -50,7 +49,7 @@ public class Integration5AbsenceAlert {
             kafka.shutdown();
         }
     }
-    @Test @Ignore
+    @Test
     public void testTriggerAbsenceAlert() throws Exception{
         System.setProperty("config.resource", "/absence/application-absence.conf");
         ConfigFactory.invalidateCaches();
@@ -91,4 +90,4 @@ public class Integration5AbsenceAlert {
             Utils.sleep(1000 * 60l * 5);
         }
     }
-}
\ No newline at end of file
+}