You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ra...@apache.org on 2016/06/01 05:56:30 UTC

[15/19] incubator-eagle git commit: EAGLE-324: Init branch-v0.5

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
new file mode 100644
index 0000000..e755237
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
@@ -0,0 +1,164 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.impl.strategies;
+
+import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
+
+import java.util.ArrayList;
+import java.util.Collections;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.CoordinatorConstants;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService;
+import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * A simple strategy that only find the bolts in the same topology as the
+ * required work slots.
+ * 
+ * Invariant:<br/>
+ * One slot queue only on the one topology.<br/>
+ * One topology doesn't contains two same partition slot queues.
+ * 
+ * @since Apr 27, 2016
+ *
+ */
+public class SameTopologySlotStrategy implements IWorkSlotStrategy {
+
+    private static final Logger LOG = LoggerFactory.getLogger(SameTopologySlotStrategy.class);
+
+    private final IScheduleContext context;
+    private final StreamGroup partitionGroup;
+    private final TopologyMgmtService mgmtService;
+
+//    private final int numOfPoliciesBoundPerBolt;
+    private final double topoLoadUpbound;
+
+    public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
+            TopologyMgmtService mgmtService) {
+        this.context = context;
+        this.partitionGroup = streamPartitionGroup;
+        this.mgmtService = mgmtService;
+
+        Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
+//        numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
+        topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
+    }
+
+    /**
+     * @param isDedicated
+     *            - not used yet!
+     */
+    @Override
+    public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) {
+        Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size)
+                .iterator();
+        // priority strategy first???
+        List<WorkSlot> slots = new ArrayList<WorkSlot>();
+        while (it.hasNext()) {
+            Topology t = it.next();
+            if (getQueueOnTopology(size, slots, t)) {
+                break;
+            }
+        }
+
+        if (slots.size() == 0) {
+            int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology();
+            if (size > supportedSize) {
+                LOG.error("can not find available slots for queue, required size {}, supported size {} !", size, supportedSize);
+                return Collections.emptyList();
+            }
+            TopologyMeta topoMeta = mgmtService.creatTopology();
+            if (topoMeta == null) {
+                LOG.error("can not create topology for given queue requirement, required size {}, requried partition group {} !", size, partitionGroup);
+                return Collections.emptyList();
+            }
+
+            context.getTopologies().put(topoMeta.topologyId, topoMeta.topology);
+            context.getTopologyUsages().put(topoMeta.topologyId, topoMeta.usage);
+            boolean placed = getQueueOnTopology(size, slots, topoMeta.topology);
+            if (!placed) {
+                LOG.error("can not find available slots from new created topology, required size {}. This indicates an error !", size);
+            }
+        }
+        return slots;
+    }
+
+    private boolean getQueueOnTopology(int size, List<WorkSlot> slots, Topology t) {
+        TopologyUsage u = context.getTopologyUsages().get(t.getName());
+        if (!isTopologyAvailable(u)) {
+            return false;
+        }
+
+        List<String> bolts = new ArrayList<String>();
+        for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) {
+            if (isBoltAvailable(alertUsage)) {
+                bolts.add(alertUsage.getBoltId());
+            }
+
+            if (bolts.size() == size) {
+                break;
+            }
+        }
+
+        if (bolts.size() == size) {
+            for (String boltId : bolts) {
+                WorkSlot slot = new WorkSlot(t.getName(), boltId);
+                slots.add(slot);
+            }
+            return true;
+        }
+        return false;
+    }
+
+    private boolean isTopologyAvailable(TopologyUsage u) {
+//        for (MonitoredStream stream : u.getMonitoredStream()) {
+//            if (partition.equals(stream.getStreamParitition())) {
+//                return false;
+//            }
+//        }
+        if (u == null || u.getLoad() > topoLoadUpbound) {
+            return false;
+        }
+        
+        return true;
+    }
+
+    private boolean isBoltAvailable(AlertBoltUsage alertUsage) {
+        // FIXME : more detail to compare on queue exclusion check
+        if (alertUsage.getQueueSize() > 0) {
+            return false;
+        }
+        // actually it's now 0;
+        return true;
+//        return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
new file mode 100644
index 0000000..e9148f5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
@@ -0,0 +1,93 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class AlertBoltUsage {
+
+    private String boltId;
+    private List<String> policies = new ArrayList<String>();
+    // the stream partitions group that scheduled for this given alert bolt
+    private List<StreamGroup> partitions = new ArrayList<StreamGroup>();
+    // the slot queue that scheduled for this given aler bolt
+    private List<StreamWorkSlotQueue> referQueues = new ArrayList<StreamWorkSlotQueue>();
+    private double load;
+
+    public AlertBoltUsage(String anid) {
+        this.boltId = anid;
+    }
+
+    public String getBoltId() {
+        return boltId;
+    }
+
+    public void setBoltId(String boltId) {
+        this.boltId = boltId;
+    }
+
+    public List<String> getPolicies() {
+        return policies;
+    }
+
+    public void addPolicies(PolicyDefinition pd) {
+        policies.add(pd.getName());
+        // add first partition
+//        for (StreamPartition par : pd.getPartitionSpec()) {
+//            partitions.add(par);
+//        }
+    }
+
+    public double getLoad() {
+        return load;
+    }
+
+    public void setLoad(double load) {
+        this.load = load;
+    }
+
+    public List<StreamGroup> getPartitions() {
+        return partitions;
+    }
+
+    public List<StreamWorkSlotQueue> getReferQueues() {
+        return referQueues;
+    }
+    
+    public int getQueueSize() {
+        return referQueues.size();
+    }
+
+    public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue queue) {
+        this.referQueues.add(queue);
+        this.partitions.add(streamPartition);
+    }
+
+    public void removeQueue(StreamWorkSlotQueue queue) {
+        this.referQueues.remove(queue);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
new file mode 100644
index 0000000..86238d1
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
@@ -0,0 +1,67 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class GroupBoltUsage {
+
+    private String boltId;
+    private double load;
+    
+    public GroupBoltUsage(String boltId) {
+        this.boltId = boltId;
+    }
+
+//    private final Set<String> streams = new HashSet<String>();
+//    private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
+
+//    private final Map<String, List<StreamPartition>> groupByMeta;
+
+    public double getLoad() {
+        return load;
+    }
+
+    public void setLoad(double load) {
+        this.load = load;
+    }
+
+//    public Set<String> getStreams() {
+//        return streams;
+//    }
+//
+//
+//    public Map<String, StreamFilter> getFilters() {
+//        return filters;
+//    }
+
+//    public Map<String, List<StreamPartition>> getGroupByMeta() {
+//        return groupByMeta;
+//    }
+
+    public String getBoltId() {
+        return boltId;
+    }
+
+    public void setBoltId(String boltId) {
+        this.boltId = boltId;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
new file mode 100644
index 0000000..6eb6195
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
@@ -0,0 +1,102 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.model;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+
+/**
+ * @since Mar 27, 2016
+ *
+ */
+public class TopologyUsage {
+    // topo info
+    private String topoName;
+    private final Set<String> datasources = new HashSet<String>();
+    // usage information
+    private final Set<String> policies = new HashSet<String>();
+    private final Map<String, AlertBoltUsage> alertUsages = new HashMap<String, AlertBoltUsage>();
+    private final Map<String, GroupBoltUsage> groupUsages = new HashMap<String, GroupBoltUsage>();
+    private final List<MonitoredStream> monitoredStream = new ArrayList<MonitoredStream>();
+
+    private double load;
+
+    /**
+     * This is to be the existing/previous meta-data. <br/>
+     * Only one group meta-data for all of the group bolts in this topology.
+     */
+
+    public TopologyUsage() {
+    }
+    
+    public TopologyUsage(String name) {
+        this.topoName = name;
+    }
+    
+    public String getTopoName() {
+        return topoName;
+    }
+
+    public void setTopoName(String topoId) {
+        this.topoName = topoId;
+    }
+
+    public Set<String> getDataSources() {
+        return datasources;
+    }
+
+    public Set<String> getPolicies() {
+        return policies;
+    }
+
+    public Map<String, AlertBoltUsage> getAlertUsages() {
+        return alertUsages;
+    }
+
+    public AlertBoltUsage getAlertBoltUsage(String boltId) {
+        return alertUsages.get(boltId);
+    }
+
+    public Map<String, GroupBoltUsage> getGroupUsages() {
+        return groupUsages;
+    }
+
+    public List<MonitoredStream> getMonitoredStream() {
+        return monitoredStream;
+    }
+
+    public void addMonitoredStream(MonitoredStream par) {
+        if (!this.monitoredStream.contains(par)) {
+            this.monitoredStream.add(par);
+        }
+    }
+
+    public double getLoad() {
+        return load;
+    }
+
+    public void setLoad(double load) {
+        this.load = load;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
new file mode 100644
index 0000000..84a4061
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/InMemScheduleConext.java
@@ -0,0 +1,148 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+
+/**
+ * @since Mar 28, 2016
+ *
+ */
+public class InMemScheduleConext implements IScheduleContext {
+
+    private Map<String, Topology> topologies = new HashMap<String, Topology>();
+    private Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>();
+    private Map<String, PolicyDefinition> policies = new HashMap<String, PolicyDefinition>();
+    private Map<String, Kafka2TupleMetadata> datasources = new HashMap<String, Kafka2TupleMetadata>();
+    private Map<String, PolicyAssignment> policyAssignments = new HashMap<String, PolicyAssignment>();
+    private Map<String, StreamDefinition> schemas = new HashMap<String, StreamDefinition>();
+    private Map<StreamGroup, MonitoredStream> monitoredStreams = new HashMap<StreamGroup, MonitoredStream>();
+    private Map<String, Publishment> publishments = new HashMap<String, Publishment>();
+
+    public InMemScheduleConext() {
+    }
+
+    public InMemScheduleConext(IScheduleContext context) {
+        this.topologies = new HashMap<String, Topology>(context.getTopologies());
+        this.usages = new HashMap<String, TopologyUsage>(context.getTopologyUsages());
+        this.policies = new HashMap<String, PolicyDefinition>(context.getPolicies());
+        this.datasources = new HashMap<String, Kafka2TupleMetadata>(context.getDataSourceMetadata());
+        this.policyAssignments = new HashMap<String, PolicyAssignment>(context.getPolicyAssignments());
+        this.schemas = new HashMap<String, StreamDefinition>(context.getStreamSchemas());
+        this.monitoredStreams = new HashMap<StreamGroup, MonitoredStream>(context.getMonitoredStreams());
+        this.publishments = new HashMap<String, Publishment>(context.getPublishments());
+    }
+
+    public InMemScheduleConext(Map<String, Topology> topologies2, Map<String, PolicyAssignment> assignments,
+            Map<String, Kafka2TupleMetadata> kafkaSources, Map<String, PolicyDefinition> policies2,
+            Map<String, Publishment> publishments2, Map<String, StreamDefinition> streamDefinitions,
+            Map<StreamGroup, MonitoredStream> monitoredStreamMap, Map<String, TopologyUsage> usages2) {
+        this.topologies = topologies2;
+        this.policyAssignments = assignments;
+        this.datasources = kafkaSources;
+        this.policies = policies2;
+        this.publishments = publishments2;
+        this.schemas = streamDefinitions;
+        this.monitoredStreams = monitoredStreamMap;
+        this.usages = usages2;
+    }
+
+    public Map<String, Topology> getTopologies() {
+        return topologies;
+    }
+
+    public void addTopology(Topology topo) {
+        topologies.put(topo.getName(), topo);
+    }
+
+    public Map<String, TopologyUsage> getTopologyUsages() {
+        return usages;
+    }
+
+    public void addTopologyUsages(TopologyUsage usage) {
+        usages.put(usage.getTopoName(), usage);
+    }
+
+    public Map<String, PolicyDefinition> getPolicies() {
+        return policies;
+    }
+
+    public void addPoilcy(PolicyDefinition pd) {
+        this.policies.put(pd.getName(), pd);
+    }
+
+    public Map<String, Kafka2TupleMetadata> getDatasources() {
+        return datasources;
+    }
+
+    public void setDatasources(Map<String, Kafka2TupleMetadata> datasources) {
+        this.datasources = datasources;
+    }
+
+    public void addDataSource(Kafka2TupleMetadata dataSource) {
+        this.datasources.put(dataSource.getName(), dataSource);
+    }
+
+    @Override
+    public Map<String, Kafka2TupleMetadata> getDataSourceMetadata() {
+        return datasources;
+    }
+
+    public void setPolicyOrderedTopologies(Map<String, PolicyAssignment> policyAssignments) {
+        this.policyAssignments = policyAssignments;
+    }
+
+    public Map<String, PolicyAssignment> getPolicyAssignments() {
+        return this.policyAssignments;
+    }
+
+    @Override
+    public Map<String, StreamDefinition> getStreamSchemas() {
+        return schemas;
+    }
+
+    public void addSchema(StreamDefinition schema) {
+        this.schemas.put(schema.getStreamId(), schema);
+    }
+
+    public void setStreamSchemas(Map<String, StreamDefinition> schemas) {
+        this.schemas = schemas;
+    }
+
+    @Override
+    public Map<StreamGroup, MonitoredStream> getMonitoredStreams() {
+        return monitoredStreams;
+    }
+
+    @Override
+    public Map<String, Publishment> getPublishments() {
+        return publishments;
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
new file mode 100644
index 0000000..d4d6c0c
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/provider/ScheduleContextBuilder.java
@@ -0,0 +1,400 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.provider;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.Objects;
+import java.util.Set;
+
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.GroupBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.typesafe.config.Config;
+
+/**
+ * FIXME: this class focus on correctness, not the efficiency now. There might
+ * be problem when metadata size grows too big.
+ * 
+ * @since May 3, 2016
+ *
+ */
+public class ScheduleContextBuilder {
+
+    private static final Logger LOG = LoggerFactory.getLogger(ScheduleContextBuilder.class);
+    private static final String UNIQUE_BOLT_ID = "%s-%s";// toponame-boltname
+
+    private IMetadataServiceClient client;
+
+    private Map<String, Topology> topologies;
+    private Map<String, PolicyAssignment> assignments;
+    private Map<String, Kafka2TupleMetadata> kafkaSources;
+    private Map<String, PolicyDefinition> policies;
+    private Map<String, Publishment> publishments;
+    private Map<String, StreamDefinition> streamDefinitions;
+    private Map<StreamGroup, MonitoredStream> monitoredStreamMap;
+    private Map<String, TopologyUsage> usages;
+
+    public ScheduleContextBuilder(Config config) {
+        client = new MetadataServiceClientImpl(config);
+    }
+
+    public ScheduleContextBuilder(IMetadataServiceClient client) {
+        this.client = client;
+    }
+
+    /**
+     * Built a shcedule context for metadata client service.
+     * 
+     * @return
+     */
+    public IScheduleContext buildContext() {
+        topologies = listToMap(client.listTopologies());
+        kafkaSources = listToMap(client.listDataSources());
+        policies = listToMap(client.listPolicies());
+        publishments = listToMap(client.listPublishment());
+        streamDefinitions = listToMap(client.listStreams());
+
+        // TODO: See ScheduleState comments on how to improve the storage
+        ScheduleState state = client.getVersionedSpec();
+        assignments = listToMap(state == null ? new ArrayList<PolicyAssignment>() : cleanupDeprecatedAssignments(state.getAssignments()));
+
+        monitoredStreamMap = listToMap(state == null ? new ArrayList<MonitoredStream>() : cleanupDeprecatedStreamsAndAssignment(state.getMonitoredStreams()));
+
+        // build based on existing data
+        usages = buildTopologyUsage();
+
+        // copy to shedule context now
+        return new InMemScheduleConext(topologies, assignments, kafkaSources, policies, publishments,
+                streamDefinitions, monitoredStreamMap, usages);
+    }
+
+    /**
+     * 1.
+     * <pre>
+     * Check for deprecated policy stream group with its assigned monitored stream groups.
+     * If this is unmatched, we think the policy' stream group has been changed, remove the policy assignments
+     * If finally, no assignment refer to a given monitored stream, this monitored stream could be removed.
+     * Log when every time a remove happens.
+     * </pre>
+     * 2.
+     * <pre>
+     * if monitored stream's queue's is on non-existing topology, remove it.
+     * </pre>
+     * @param monitoredStreams
+     * @return
+     */
+    private List<MonitoredStream> cleanupDeprecatedStreamsAndAssignment(List<MonitoredStream> monitoredStreams) {
+        List<MonitoredStream> result = new ArrayList<MonitoredStream>(monitoredStreams);
+        
+        // clear deprecated streams
+        clearMonitoredStreams(monitoredStreams);
+
+        // build queueId-> streamGroup
+        Map<String, StreamGroup> queue2StreamGroup = new HashMap<String, StreamGroup>();
+        for (MonitoredStream ms : result) {
+            for (StreamWorkSlotQueue q : ms.getQueues()) {
+                queue2StreamGroup.put(q.getQueueId(), ms.getStreamGroup());
+            }
+        }
+
+        // decide the assignment delete set
+        Set<StreamGroup> usedGroups = new HashSet<StreamGroup>();
+        Set<String> toRemove = new HashSet<String>();
+        // check if queue is still referenced by policy assignments
+        for (PolicyAssignment assignment : assignments.values()) {
+            PolicyDefinition def = policies.get(assignment.getPolicyName());
+            StreamGroup group = queue2StreamGroup.get(assignment.getQueueId());
+            if (group == null || !Objects.equals(group.getStreamPartitions(), def.getPartitionSpec())) {
+                LOG.warn(" policy assgiment {} 's policy group {} is different to the monitored stream's partition group {}, "
+                                + "this indicates a policy stream partition spec change, the assignment would be removed! ",
+                        assignment, def.getPartitionSpec(), group == null ? "'not found'" :group.getStreamPartitions());
+                toRemove.add(assignment.getPolicyName());
+            } else {
+                usedGroups.add(group);
+            }
+        }
+
+        // remove useless
+        assignments.keySet().removeAll(toRemove);
+        // remove non-referenced monitored streams
+        result.removeIf((t) -> {
+            boolean used = usedGroups.contains(t.getStreamGroup());
+            if (!used) {
+                LOG.warn("monitor stream with stream group {} is not referenced, "
+                        + "this monitored stream and its worker queu will be removed", t.getStreamGroup());
+            }
+            return !used; 
+        });
+
+        return result;
+    }
+
+    private void clearMonitoredStreams(List<MonitoredStream> monitoredStreams) {
+        Iterator<MonitoredStream> it = monitoredStreams.iterator();
+        while (it.hasNext()) {
+            MonitoredStream ms = it.next();
+            Iterator<StreamWorkSlotQueue> queueIt = ms.getQueues().iterator();
+            // clean queue that underly topology is changed(removed/down)
+            while (queueIt.hasNext()) {
+                StreamWorkSlotQueue queue = queueIt.next();
+                boolean deprecated = false;
+                for (WorkSlot ws : queue.getWorkingSlots()) {
+                    // check if topology available or bolt available
+                    if (!topologies.containsKey(ws.topologyName)
+                            || !topologies.get(ws.topologyName).getAlertBoltIds().contains(ws.boltId)) {
+                        deprecated = true;
+                        break;
+                    }
+                }
+                if (deprecated) {
+                    queueIt.remove();
+                }
+            }
+
+            if (ms.getQueues().isEmpty()) {
+                it.remove();
+            }
+        }
+    }
+
+    private List<PolicyAssignment> cleanupDeprecatedAssignments(List<PolicyAssignment> list) {
+        List<PolicyAssignment> result = new ArrayList<PolicyAssignment>(list);
+        Iterator<PolicyAssignment> paIt = result.iterator();
+        while (paIt.hasNext()) {
+            PolicyAssignment assignment = paIt.next();
+            if (!policies.containsKey(assignment.getPolicyName())) {
+                LOG.info("Policy assignment {} 's policy not found, this assignment will be removed!", assignment);
+                paIt.remove();
+            }
+        }
+        return result;
+    }
+
+    private <T, K> Map<K, T> listToMap(List<T> collections) {
+        Map<K, T> maps = new HashMap<K, T>(collections.size());
+        for (T t : collections) {
+            maps.put(getKey(t), t);
+        }
+        return maps;
+    }
+
+    /*
+     * One drawback, once we add class, this code need to be changed!
+     */
+    @SuppressWarnings("unchecked")
+    private <T, K> K getKey(T t) {
+        if (t instanceof Topology) {
+            return (K) ((Topology) t).getName();
+        } else if (t instanceof PolicyAssignment) {
+            return (K) ((PolicyAssignment) t).getPolicyName();
+        } else if (t instanceof Kafka2TupleMetadata) {
+            return (K) ((Kafka2TupleMetadata) t).getName();
+        } else if (t instanceof PolicyDefinition) {
+            return (K) ((PolicyDefinition) t).getName();
+        } else if (t instanceof Publishment) {
+            return (K) ((Publishment) t).getName();
+        } else if (t instanceof StreamDefinition) {
+            return (K) ((StreamDefinition) t).getStreamId();
+        } else if (t instanceof MonitoredStream) {
+            return (K) ((MonitoredStream) t).getStreamGroup();
+        }
+        throw new RuntimeException("unexpected key class " + t.getClass());
+    }
+
+    private Map<String, TopologyUsage> buildTopologyUsage() {
+        Map<String, TopologyUsage> usages = new HashMap<String, TopologyUsage>();
+
+        // pre-build data structure for help
+        Map<String, Set<MonitoredStream>> topo2MonitorStream = new HashMap<String, Set<MonitoredStream>>();
+        Map<String, Set<String>> topo2Policies = new HashMap<String, Set<String>>();
+        // simply assume no bolt with the same id
+        Map<String, Set<String>> bolt2Policies = new HashMap<String, Set<String>>();
+        // simply assume no bolt with the same id
+        Map<String, Set<StreamGroup>> bolt2Partition = new HashMap<String, Set<StreamGroup>>();
+        // simply assume no bolt with the same id
+        Map<String, Set<String>> bolt2QueueIds = new HashMap<String, Set<String>>();
+        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
+
+        preBuildQueue2TopoMap(topo2MonitorStream, topo2Policies, bolt2Policies, bolt2Partition, bolt2QueueIds, queueMap);
+        
+        for (Topology t : topologies.values()) {
+            TopologyUsage u = new TopologyUsage(t.getName());
+            // add group/bolt usages
+            for (String grpBolt : t.getGroupNodeIds()) {
+                GroupBoltUsage grpUsage = new GroupBoltUsage(grpBolt);
+                u.getGroupUsages().put(grpBolt, grpUsage);
+            }
+            for (String alertBolt : t.getAlertBoltIds()) {
+                String uniqueBoltId = String.format(UNIQUE_BOLT_ID, t.getName(), alertBolt);
+
+                AlertBoltUsage alertUsage = new AlertBoltUsage(alertBolt);
+                u.getAlertUsages().put(alertBolt, alertUsage);
+                // complete usage
+                addBoltUsageInfo(bolt2Policies, bolt2Partition, bolt2QueueIds, uniqueBoltId, alertUsage, queueMap);
+            }
+
+            // policy -- policy assignment
+            if (topo2Policies.containsKey(u.getTopoName())) {
+                u.getPolicies().addAll(topo2Policies.get(u.getTopoName()));
+            }
+
+            // data source
+            buildTopologyDataSource(u);
+
+            // topology usage monitored stream -- from monitored steams' queue slot item info
+            if (topo2MonitorStream.containsKey(u.getTopoName())) {
+                u.getMonitoredStream().addAll(topo2MonitorStream.get(u.getTopoName()));
+            }
+
+            usages.put(u.getTopoName(), u);
+        }
+
+        return usages;
+    }
+
+    private void addBoltUsageInfo(Map<String, Set<String>> bolt2Policies,
+            Map<String, Set<StreamGroup>> bolt2Partition, Map<String, Set<String>> bolt2QueueIds, String uniqueAlertBolt,
+            AlertBoltUsage alertUsage, Map<String, StreamWorkSlotQueue> queueMap) {
+        //
+        if (bolt2Policies.containsKey(uniqueAlertBolt)) {
+            alertUsage.getPolicies().addAll(bolt2Policies.get(uniqueAlertBolt));
+        }
+        //
+        if (bolt2Partition.containsKey(uniqueAlertBolt)) {
+            alertUsage.getPartitions().addAll(bolt2Partition.get(uniqueAlertBolt));
+        }
+        //
+        if (bolt2QueueIds.containsKey(uniqueAlertBolt)) {
+            for (String qId : bolt2QueueIds.get(uniqueAlertBolt)) {
+                if (queueMap.containsKey(qId)) {
+                    alertUsage.getReferQueues().add(queueMap.get(qId));
+                } else {
+                    LOG.error(" queue id {} not found in queue map !", qId);
+                }
+            }
+        }
+    }
+
+    private void buildTopologyDataSource(TopologyUsage u) {
+        for (String policyName : u.getPolicies()) {
+            PolicyDefinition def = policies.get(policyName);
+            if (def != null) {
+                u.getDataSources().addAll(findDatasource(def));
+            } else {
+                LOG.error(" policy not find {}, but reference in topology usage {} !", policyName, u.getTopoName());
+            }
+        }
+    }
+
+    private List<String> findDatasource(PolicyDefinition def) {
+        List<String> result = new ArrayList<String>();
+        List<String> inputStreams = def.getInputStreams();
+        for (String is : inputStreams) {
+            StreamDefinition ss = this.streamDefinitions.get(is);
+            if (ss == null) {
+                LOG.error("policy {} referenced stream definition {} not found in definiton !", def.getName(), is);
+            } else {
+                result.add(ss.getDataSource());
+            }
+        }
+        return result;
+    }
+
+    private void preBuildQueue2TopoMap(
+            Map<String, Set<MonitoredStream>> topo2MonitorStream,
+            Map<String, Set<String>> topo2Policies, 
+            Map<String, Set<String>> bolt2Policies, 
+            Map<String, Set<StreamGroup>> bolt2Partition, 
+            Map<String, Set<String>> bolt2QueueIds,
+            Map<String, StreamWorkSlotQueue> queueMap) {
+        // pre-build structure
+        // why don't reuse the queue.getPolicies
+        Map<String, Set<String>> queue2Policies= new HashMap<String, Set<String>>();
+        for (PolicyAssignment pa : assignments.values()) {
+            if (!queue2Policies.containsKey(pa.getQueueId())) {
+                queue2Policies.put(pa.getQueueId(), new HashSet<String>());
+            }
+            queue2Policies.get(pa.getQueueId()).add(pa.getPolicyName());
+        }
+
+        for (MonitoredStream stream : monitoredStreamMap.values()) {
+            for (StreamWorkSlotQueue q : stream.getQueues()) {
+                queueMap.put(q.getQueueId(), q);
+                Set<String> policiesOnQ = queue2Policies.containsKey(q.getQueueId()) ? queue2Policies.get(q.getQueueId()) : new HashSet<String>();
+                
+                for (WorkSlot slot : q.getWorkingSlots()) {
+                    // topo2monitoredstream
+                    if (!topo2MonitorStream.containsKey(slot.getTopologyName())) {
+                        topo2MonitorStream.put(slot.getTopologyName(), new HashSet<MonitoredStream>());
+                    }
+                    topo2MonitorStream.get(slot.getTopologyName()).add(stream);
+                    
+                    // topo2policy
+                    if (!topo2Policies.containsKey(slot.getTopologyName())) {
+                        topo2Policies.put(slot.getTopologyName(), new HashSet<String>());
+                    }
+                    topo2Policies.get(slot.getTopologyName()).addAll(policiesOnQ);
+                    
+                    // bolt2Policy
+                    if (!bolt2Policies.containsKey(getUniqueBoltId(slot))) {
+                        bolt2Policies.put(getUniqueBoltId(slot), new HashSet<String>());
+                    }
+                    bolt2Policies.get(getUniqueBoltId(slot)).addAll(policiesOnQ);
+                    
+                    // bolt2Queue
+                    if (!bolt2QueueIds.containsKey(getUniqueBoltId(slot))) {
+                        bolt2QueueIds.put(getUniqueBoltId(slot), new HashSet<String>());
+                    }
+                    bolt2QueueIds.get(getUniqueBoltId(slot)).add(q.getQueueId());
+                    
+                    // bolt2Partition
+                    if (!bolt2Partition.containsKey(getUniqueBoltId(slot))) {
+                        bolt2Partition.put(getUniqueBoltId(slot), new HashSet<StreamGroup>());
+                    }
+                    bolt2Partition.get(getUniqueBoltId(slot)).add(stream.getStreamGroup());
+                }
+            }
+        }
+    }
+
+    private String getUniqueBoltId(WorkSlot slot) {
+        return String.format(UNIQUE_BOLT_ID, slot.getTopologyName(), slot.getBoltId());
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
new file mode 100644
index 0000000..6b8495e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/resource/CoordinatorResource.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.eagle.alert.coordinator.resource;
+
+import javax.ws.rs.GET;
+import javax.ws.rs.POST;
+import javax.ws.rs.Path;
+import javax.ws.rs.Produces;
+
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.Coordinator;
+import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.utils.JsonUtils;
+
+/**
+ * This is to provide API access even we don't have ZK as intermediate access.
+ * FIXME : more elogant status code
+ * 
+ * @since Mar 24, 2016 <br/>
+ */
+@Path("/coordinator")
+@Produces({ "application/json" })
+public class CoordinatorResource {
+
+    // sprint config here?
+    private Coordinator alertCoordinator = new Coordinator();
+
+    @GET
+    @Path("/assignments")
+    public String getAssignments() throws Exception {
+        ScheduleState state = alertCoordinator.getState();
+        return JsonUtils.writeValueAsString(state);
+    }
+
+    @POST
+    @Path("/build")
+    public String build() throws Exception {
+        ScheduleOption option = new ScheduleOption();
+        ScheduleState state = alertCoordinator.schedule(option);
+        return JsonUtils.writeValueAsString(state);
+    }
+
+    /**
+     * Manually update the topology usages, for administration
+     * 
+     * @return
+     */
+    @POST
+    @Path("/refreshUsages")
+    public String refreshUsages() {
+        // TODO
+        return "";
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
new file mode 100644
index 0000000..d7881bd
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/DynamicPolicyLoader.java
@@ -0,0 +1,81 @@
+package org.apache.eagle.alert.coordinator.trigger;
+
+import java.util.ArrayList;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.collections.CollectionUtils;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Poll policy change and notify listeners
+ */
+public class DynamicPolicyLoader implements Runnable{
+    private static Logger LOG = LoggerFactory.getLogger(DynamicPolicyLoader.class);
+
+    private IMetadataServiceClient client;
+    // initial cachedPolicies should be empty
+    private Map<String, PolicyDefinition> cachedPolicies = new HashMap<>();
+    private List<PolicyChangeListener> listeners = new ArrayList<>();
+
+    public DynamicPolicyLoader(IMetadataServiceClient client){
+        this.client = client;
+    }
+
+    public synchronized void addPolicyChangeListener(PolicyChangeListener listener){
+        listeners.add(listener);
+    }
+
+    /**
+     * When it is run at the first time, due to cachedPolicies being empty, all existing policies are expected
+     * to be addedPolicies
+     */
+    @SuppressWarnings("unchecked")
+    @Override
+    public void run() {
+        // we should catch every exception to avoid zombile thread
+        try {
+            List<PolicyDefinition> current = client.listPolicies();
+            Map<String, PolicyDefinition> currPolicies = new HashMap<>();
+            current.forEach(pe -> currPolicies.put(pe.getName(), pe));
+
+            Collection<String> addedPolicies = CollectionUtils.subtract(currPolicies.keySet(), cachedPolicies.keySet());
+            Collection<String> removedPolicies = CollectionUtils.subtract(cachedPolicies.keySet(), currPolicies.keySet());
+            Collection<String> potentiallyModifiedPolicies = CollectionUtils.intersection(currPolicies.keySet(), cachedPolicies.keySet());
+
+            List<String> reallyModifiedPolicies = new ArrayList<>();
+            for (String updatedPolicy : potentiallyModifiedPolicies) {
+                if (!currPolicies.get(updatedPolicy).equals(cachedPolicies.get(updatedPolicy))) {
+                    reallyModifiedPolicies.add(updatedPolicy);
+                }
+            }
+
+            boolean policyChanged = false;
+            if (addedPolicies.size() != 0 ||
+                    removedPolicies.size() != 0 ||
+                    reallyModifiedPolicies.size() != 0) {
+                policyChanged = true;
+            }
+
+            if (!policyChanged) {
+                LOG.info("policy is not changed since last run");
+                return;
+            }
+            synchronized (this) {
+                for (PolicyChangeListener listener : listeners) {
+                    listener.onPolicyChange(current, addedPolicies, removedPolicies, reallyModifiedPolicies);
+                }
+            }
+
+            // reset cached policies
+            cachedPolicies = currPolicies;
+        } catch (Throwable t) {
+            LOG.error("error loading policy, but continue to run", t);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
new file mode 100644
index 0000000..8aa322f
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/trigger/PolicyChangeListener.java
@@ -0,0 +1,10 @@
+package org.apache.eagle.alert.coordinator.trigger;
+
+import java.util.Collection;
+import java.util.List;
+
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+
+public interface PolicyChangeListener {
+    void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies, Collection<String> removedPolicies, Collection<String> modifiedPolicies);
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
new file mode 100644
index 0000000..51cc315
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/application.conf
@@ -0,0 +1,27 @@
+{
+	"coordinator" : {
+		"policiesPerBolt" : 5,
+		"boltParallelism" : 5,
+		"policyDefaultParallelism" : 5,
+		"boltLoadUpbound": 0.8,
+		"topologyLoadUpbound" : 0.8,
+		"numOfAlertBoltsPerTopology" : 5,
+		"zkConfig" : {
+			"zkQuorum" : "localhost:2181",
+			"zkRoot" : "/alert",
+			"zkSessionTimeoutMs" : 10000,
+			"connectionTimeoutMs" : 10000,
+			"zkRetryTimes" : 3,
+			"zkRetryInterval" : 3000
+		},
+		"metadataService" : {
+			"host" : "localhost",
+			"port" : 8080,
+			"context" : "/api"
+		},
+		"metadataDynamicCheck" : {
+			"initDelayMillis" : 1000,
+			"delayMillis" : 30000
+		}
+	}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
new file mode 100644
index 0000000..d4bc126
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/resources/log4j.properties
@@ -0,0 +1,21 @@
+# Licensed to the Apache Software Foundation (ASF) under one or more
+# contributor license agreements.  See the NOTICE file distributed with
+# this work for additional information regarding copyright ownership.
+# The ASF licenses this file to You under the Apache License, Version 2.0
+# (the "License"); you may not use this file except in compliance with
+# the License.  You may obtain a copy of the License at
+#
+#    http://www.apache.org/licenses/LICENSE-2.0
+#
+# Unless required by applicable law or agreed to in writing, software
+# distributed under the License is distributed on an "AS IS" BASIS,
+# WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+# See the License for the specific language governing permissions and
+# limitations under the License.
+
+log4j.rootLogger=INFO, stdout
+
+# standard output
+log4j.appender.stdout=org.apache.log4j.ConsoleAppender
+log4j.appender.stdout.layout=org.apache.log4j.PatternLayout
+log4j.appender.stdout.layout.ConversionPattern=%d{ISO8601} %p [%t] %c{2}[%L]: %m%n

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
new file mode 100644
index 0000000..1aa925e
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/WEB-INF/web.xml
@@ -0,0 +1,87 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+  ~ Licensed to the Apache Software Foundation (ASF) under one or more
+  ~ contributor license agreements.  See the NOTICE file distributed with
+  ~ this work for additional information regarding copyright ownership.
+  ~ The ASF licenses this file to You under the Apache License, Version 2.0
+  ~ (the "License"); you may not use this file except in compliance with
+  ~ the License.  You may obtain a copy of the License at
+  ~
+  ~    http://www.apache.org/licenses/LICENSE-2.0
+  ~
+  ~ Unless required by applicable law or agreed to in writing, software
+  ~ distributed under the License is distributed on an "AS IS" BASIS,
+  ~ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  ~ See the License for the specific language governing permissions and
+  ~ limitations under the License.
+  -->
+<web-app xmlns="http://java.sun.com/xml/ns/javaee"
+           xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+           xsi:schemaLocation="http://java.sun.com/xml/ns/javaee
+		  http://java.sun.com/xml/ns/javaee/web-app_3_0.xsd"
+           version="3.0">
+    <welcome-file-list>
+        <welcome-file>index.html</welcome-file>
+    </welcome-file-list>
+    <servlet>
+        <servlet-name>Jersey Web Application</servlet-name>
+        <servlet-class>com.sun.jersey.spi.container.servlet.ServletContainer</servlet-class>
+        <init-param>
+            <param-name>com.sun.jersey.config.property.packages</param-name>
+            <param-value>io.swagger.jaxrs.json,io.swagger.jaxrs.listing,org.apache.eagle.alert.coordinator.resource,org.codehaus.jackson.jaxrs</param-value>
+        </init-param>
+        <init-param>
+            <param-name>com.sun.jersey.spi.container.ContainerRequestFilters</param-name>
+            <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter;com.sun.jersey.api.container.filter.PostReplaceFilter</param-value>
+        </init-param>
+        <init-param>
+            <param-name>com.sun.jersey.spi.container.ContainerResponseFilters</param-name>
+            <param-value>com.sun.jersey.api.container.filter.GZIPContentEncodingFilter</param-value>
+        </init-param>
+        <load-on-startup>1</load-on-startup>
+    </servlet>
+    <!-- Servlet for swagger initialization only, no URL mapping. -->
+	<servlet>
+		<servlet-name>swaggerConfig</servlet-name>
+		<servlet-class>io.swagger.jaxrs.config.DefaultJaxrsConfig</servlet-class>
+		<init-param>
+			<param-name>api.version</param-name>
+			<param-value>1.0.0</param-value>
+		</init-param>
+		<init-param>
+			<param-name>swagger.api.basepath</param-name>
+			<param-value>/api</param-value>
+		</init-param>
+		<load-on-startup>2</load-on-startup>
+	</servlet>
+
+    <servlet-mapping>
+        <servlet-name>Jersey Web Application</servlet-name>
+        <url-pattern>/api/*</url-pattern>
+    </servlet-mapping>
+    <filter>
+        <filter-name>CorsFilter</filter-name>
+        <!-- this should be replaced by tomcat ones, see also metadata resource -->
+        <filter-class>org.apache.eagle.alert.resource.SimpleCORSFiler</filter-class>
+        <init-param>
+            <param-name>cors.allowed.origins</param-name>
+            <param-value>*</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.allowed.headers</param-name>
+            <param-value>Authorization,Origin, No-Cache, X-Requested-With, If-Modified-Since, Pragma, Last-Modified, Cache-Control, Expires, Content-Type, X-E4M-With, Accept</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.allowed.methods</param-name>
+            <param-value>GET,POST,HEAD,OPTIONS,PUT,DELETE</param-value>
+        </init-param>
+        <init-param>
+            <param-name>cors.support.credentials</param-name>
+            <param-value>true</param-value>
+        </init-param>
+    </filter>
+    <filter-mapping>
+        <filter-name>CorsFilter</filter-name>
+        <url-pattern>/*</url-pattern>
+    </filter-mapping>
+</web-app>

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
new file mode 100644
index 0000000..1c4ea76
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/webapp/index.html
@@ -0,0 +1,18 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one
+  or more contributor license agreements.  See the NOTICE file
+  distributed with this work for additional information
+  regarding copyright ownership.  The ASF licenses this file
+  to you under the Apache License, Version 2.0 (the
+  "License"); you may not use this file except in compliance
+  with the License.  You may obtain a copy of the License at
+
+      http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+  -->
+UMP Coordinator service!

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
new file mode 100644
index 0000000..78b72b3
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/CoordinatorTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.apache.eagle.alert.config.ConfigBusConsumer;
+import org.apache.eagle.alert.config.ConfigBusProducer;
+import org.apache.eagle.alert.config.ConfigChangeCallback;
+import org.apache.eagle.alert.config.ConfigValue;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.Coordinator;
+import org.apache.eagle.alert.coordinator.ScheduleOption;
+import org.apache.eagle.alert.service.IMetadataServiceClient;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 5, 2016
+ *
+ */
+public class CoordinatorTest {
+
+    @SuppressWarnings({ "resource", "unused" })
+    @Ignore
+    @Test
+    public void test() throws Exception {
+        before();
+        Config config = ConfigFactory.load().getConfig("coordinator");
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
+        IMetadataServiceClient client = new MetadataServiceClientImpl(config);
+
+        Coordinator coordinator = new Coordinator(config, producer, client);
+        ScheduleOption option = new ScheduleOption();
+        ScheduleState state = coordinator.schedule(option);
+        String v = state.getVersion();
+
+        AtomicBoolean validated = new AtomicBoolean(false);
+        ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() {
+            @Override
+            public void onNewConfig(ConfigValue value) {
+                String vId = value.getValue().toString();
+                Assert.assertEquals(v, vId);
+                validated.set(true);
+            }
+        });
+
+        Thread.sleep(1000);
+        Assert.assertTrue(validated.get());
+    }
+
+    @SuppressWarnings({ "resource", "unused" })
+    @Test
+    public void test_01() throws Exception {
+        before();
+        Config config = ConfigFactory.load().getConfig("coordinator");
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
+        IMetadataServiceClient client = ScheduleContextBuilderTest.getSampleMetadataService();
+
+        Coordinator coordinator = new Coordinator(config, producer, client);
+        ScheduleOption option = new ScheduleOption();
+        ScheduleState state = coordinator.schedule(option);
+        String v = state.getVersion();
+
+        // TODO : assert version
+
+        CountDownLatch latch = new CountDownLatch(1);
+        AtomicBoolean validated = new AtomicBoolean(false);
+        ConfigBusConsumer consumer = new ConfigBusConsumer(zkConfig, "topo1/spout", new ConfigChangeCallback() {
+            @Override
+            public void onNewConfig(ConfigValue value) {
+                String vId = value.getValue().toString();
+                Assert.assertEquals(v, vId);
+                validated.set(true);
+                latch.countDown();
+            }
+        });
+
+        latch.await(3, TimeUnit.SECONDS);
+        Assert.assertTrue(validated.get());
+    }
+
+    @Ignore
+    @Test
+    public void test_main() throws Exception {
+        before();
+
+        Coordinator.main(null);
+    }
+
+    @Before
+    public void before() {
+        System.setProperty("config.resource", "/test-application.conf");
+        ConfigFactory.invalidateCaches();
+        ConfigFactory.load().getConfig("coordinator");
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
new file mode 100644
index 0000000..155a9e5
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/DynamicPolicyLoaderTest.java
@@ -0,0 +1,26 @@
+/*
+ *
+ *  * Licensed to the Apache Software Foundation (ASF) under one or more
+ *  * contributor license agreements.  See the NOTICE file distributed with
+ *  * this work for additional information regarding copyright ownership.
+ *  * The ASF licenses this file to You under the Apache License, Version 2.0
+ *  * (the "License"); you may not use this file except in compliance with
+ *  * the License.  You may obtain a copy of the License at
+ *  *
+ *  *    http://www.apache.org/licenses/LICENSE-2.0
+ *  *
+ *  * Unless required by applicable law or agreed to in writing, software
+ *  * distributed under the License is distributed on an "AS IS" BASIS,
+ *  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *  * See the License for the specific language governing permissions and
+ *  * limitations under the License.
+ *
+ */
+
+package org.apache.alert.coordinator;
+
+/**
+ * Since 4/28/16.
+ */
+public class DynamicPolicyLoaderTest {
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
new file mode 100644
index 0000000..e2ea031
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/MetadataServiceClientImplTest.java
@@ -0,0 +1,57 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import org.apache.eagle.alert.config.ConfigBusProducer;
+import org.apache.eagle.alert.config.ZKConfig;
+import org.apache.eagle.alert.config.ZKConfigBuilder;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordinator.Coordinator;
+import org.apache.eagle.alert.service.MetadataServiceClientImpl;
+import org.junit.Ignore;
+import org.junit.Test;
+
+import com.typesafe.config.Config;
+import com.typesafe.config.ConfigFactory;
+
+/**
+ * @since May 9, 2016
+ *
+ */
+public class MetadataServiceClientImplTest {
+
+    @Ignore
+    @Test
+    public void addScheduleState() throws Exception {
+        ConfigFactory.invalidateCaches();
+        System.setProperty("config.resource", "/test-application.conf");
+        Config config = ConfigFactory.load("test-application.conf").getConfig("coordinator");
+        MetadataServiceClientImpl client = new MetadataServiceClientImpl(config);
+
+        ScheduleState ss = new ScheduleState();
+        ss.setVersion("spec_version_1463764252582");
+
+        client.addScheduleState(ss);
+
+        client.close();
+
+        ss.setVersion("spec_version_1464764252582");
+        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
+        ConfigBusProducer producer = new ConfigBusProducer(zkConfig);
+        Coordinator.postSchedule(client, ss, producer);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/75a8265c/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
new file mode 100644
index 0000000..f2e67de
--- /dev/null
+++ b/eagle-core/eagle-alert/alert/alert-coordinator/src/test/java/org/apache/alert/coordinator/ScheduleContextBuilderTest.java
@@ -0,0 +1,281 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.alert.coordinator;
+
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.alert.coordinator.mock.InMemMetadataServiceClient;
+import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
+import org.apache.eagle.alert.coordination.model.ScheduleState;
+import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
+import org.apache.eagle.alert.coordination.model.WorkSlot;
+import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
+import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
+import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
+import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
+import org.apache.eagle.alert.coordination.model.internal.Topology;
+import org.apache.eagle.alert.coordinator.IScheduleContext;
+import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
+import org.apache.eagle.alert.coordinator.model.TopologyUsage;
+import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
+import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
+import org.apache.eagle.alert.engine.coordinator.Publishment;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn;
+import org.apache.eagle.alert.engine.coordinator.StreamColumn.Type;
+import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
+import org.apache.eagle.alert.engine.coordinator.StreamPartition;
+import org.apache.eagle.alert.engine.coordinator.StreamSortSpec;
+import org.junit.Assert;
+import org.junit.Test;
+
+/**
+ * @since May 5, 2016
+ *
+ */
+public class ScheduleContextBuilderTest {
+
+    @Test
+    public void test() {
+        InMemMetadataServiceClient client = getSampleMetadataService();
+
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+
+        IScheduleContext context = builder.buildContext();
+
+        // assert topology usage
+        Map<String, TopologyUsage> usages = context.getTopologyUsages();
+        Assert.assertEquals(1, usages.get(TOPO1).getMonitoredStream().size());
+        Assert.assertTrue(usages.get(TOPO1).getPolicies().contains(TEST_POLICY_1));
+
+        String alertBolt0 = TOPO1 + "-alert-" + "0";
+        String alertBolt1 = TOPO1 + "-alert-" + "1";
+        String alertBolt2 = TOPO1 + "-alert-" + "2";
+        for (AlertBoltUsage u : usages.get(TOPO1).getAlertUsages().values()) {
+            if (u.getBoltId().equals(alertBolt0) || u.getBoltId().equals(alertBolt1)
+                    || u.getBoltId().equals(alertBolt2)) {
+                Assert.assertEquals(1, u.getPolicies().size());
+                Assert.assertTrue(u.getPolicies().contains(TEST_POLICY_1));
+                Assert.assertEquals(1, u.getPartitions().size());
+                Assert.assertEquals(1, u.getReferQueues().size());
+            }
+        }
+    }
+
+    @Test
+    public void test_remove_policy() {
+        InMemMetadataServiceClient client = getSampleMetadataService();
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+
+        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
+
+        IScheduleContext context = builder.buildContext();
+        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
+
+        client.listPolicies().remove(0);
+        context = builder.buildContext();
+        Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+        WorkSlot slot = queue.getWorkingSlots().get(0);
+        Set<String> topoPolicies = context.getTopologyUsages().get(slot.topologyName).getPolicies();
+        Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
+        Assert.assertEquals(0, topoPolicies.size());
+    }
+
+    @Test
+    public void test_changed_policy() {
+        InMemMetadataServiceClient client = getSampleMetadataService();
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+        PolicyAssignment assignment1 = client.getVersionedSpec().getAssignments().get(0);
+
+        IScheduleContext context = builder.buildContext();
+        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+        StreamWorkSlotQueue queue = SchedulerTest.getQueue(context, assignment1.getQueueId()).getRight();
+
+        PolicyDefinition pd1 = client.listPolicies().get(0);
+        // add a new group by column : need to replace the partiton spec, to
+        // avoid reference same object in
+        // on jvm (no serialization and deserialization)
+        StreamPartition par = new StreamPartition(pd1.getPartitionSpec().get(0));
+        par.getColumns().add("s1");
+        pd1.getPartitionSpec().clear();
+        pd1.getPartitionSpec().add(par);
+
+        context = builder.buildContext();
+
+        // assert the policy assignment is removed
+        Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+        // assert the monitored stream is removed as no policy on it now.
+        Assert.assertEquals(0, context.getMonitoredStreams().size());
+        // assert the topology usage doesn't contain policy
+        WorkSlot slot = queue.getWorkingSlots().get(0);
+        TopologyUsage topologyUsage = context.getTopologyUsages().get(slot.topologyName);
+        Set<String> topoPolicies = topologyUsage.getPolicies();
+        Assert.assertFalse(topoPolicies.contains(TEST_DATASOURCE_1));
+        Assert.assertEquals(0, topoPolicies.size());
+        // assert the topology usage doesn't contain the monitored stream
+        Assert.assertEquals(0, topologyUsage.getMonitoredStream().size());
+        // assert the alert bolt usage doesn't have the queue reference
+        Assert.assertEquals(0, topologyUsage.getAlertBoltUsage(slot.getBoltId()).getReferQueues().size());
+    }
+
+    @Test
+    public void test_renamed_topologies() {
+        InMemMetadataServiceClient client = getSampleMetadataService();
+        ScheduleContextBuilder builder = new ScheduleContextBuilder(client);
+
+        IScheduleContext context = builder.buildContext();
+        Assert.assertTrue(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+
+        Topology t = client.listTopologies().get(0);
+        t.setName("newName");
+
+        context = builder.buildContext();
+        Assert.assertFalse(context.getPolicyAssignments().containsKey(TEST_POLICY_1));
+    }
+
+    private static final String TOPO1 = "topo1";
+    private static final String V1 = "v1";
+    private static final String COL1 = "col1";
+    private static final String OUT_STREAM1 = "out-stream1";
+    private static final String TEST_POLICY_1 = "test-policy-1";
+    private static final String TEST_STREAM_DEF_1 = "testStreamDef";
+    private static final String TEST_DATASOURCE_1 = "test-datasource-1";
+    private static StreamPartition par;
+    private static String queueId;
+    private static StreamGroup streamGroup;
+
+    public static InMemMetadataServiceClient getSampleMetadataService() {
+        InMemMetadataServiceClient client = new InMemMetadataServiceClient();
+        client.listTopologies().add(createSampleTopology());
+        client.listDataSources().add(createKafka2TupleMetadata());
+        // client.listSpoutMetadata().add(createS)
+        client.listPolicies().add(createPolicy());
+        client.listPublishment().add(createPublishment());
+        client.listStreams().add(createStreamDefinition());
+        client.addScheduleState(createScheduleState());
+        return client;
+    }
+
+    private static ScheduleState createScheduleState() {
+        ScheduleState ss = new ScheduleState();
+        ss.setVersion(V1);
+
+        ss.getMonitoredStreams().add(createMonitoredStream());
+        ss.getAssignments().add(createAssignment());
+
+        return ss;
+    }
+
+    private static MonitoredStream createMonitoredStream() {
+        MonitoredStream ms = new MonitoredStream(streamGroup);
+        ms.setVersion(V1);
+
+        List<WorkSlot> slots = new ArrayList<WorkSlot>();
+        WorkSlot slot0 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 0);
+        WorkSlot slot1 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 1);
+        WorkSlot slot2 = new WorkSlot(TOPO1, TOPO1 + "-alert-" + 2);
+        slots.add(slot0);
+        slots.add(slot1);
+        slots.add(slot2);
+
+        StreamWorkSlotQueue q = new StreamWorkSlotQueue(streamGroup, false, new HashMap<>(), slots);
+        ms.addQueues(q);
+        queueId = q.getQueueId();
+        return ms;
+    }
+
+    private static PolicyAssignment createAssignment() {
+        PolicyAssignment pa = new PolicyAssignment(TEST_POLICY_1, queueId);
+        return pa;
+    }
+
+    private static PolicyDefinition createPolicy() {
+        PolicyDefinition def = new PolicyDefinition();
+        def.setName(TEST_POLICY_1);
+        def.setInputStreams(Arrays.asList(TEST_STREAM_DEF_1));
+        def.setOutputStreams(Arrays.asList(OUT_STREAM1));
+        def.setParallelismHint(5);
+
+        streamGroup = new StreamGroup();
+        par = new StreamPartition();
+        par.setStreamId(TEST_STREAM_DEF_1);
+        par.getColumns().add(COL1);
+        StreamSortSpec sortSpec = new StreamSortSpec();
+//        sortSpec.setColumn("col1");
+        sortSpec.setWindowMargin(3);
+        sortSpec.setWindowPeriod("PT1M");
+
+        par.setSortSpec(sortSpec);
+        streamGroup.addStreamPartition(par);
+
+        List<StreamPartition> lists = new ArrayList<StreamPartition>();
+        lists.add(par);
+        def.setPartitionSpec(lists);
+        return def;
+    }
+
+    private static StreamDefinition createStreamDefinition() {
+        StreamDefinition def = new StreamDefinition();
+        def.setStreamId(TEST_STREAM_DEF_1);
+        def.setDataSource(TEST_DATASOURCE_1);
+
+        StreamColumn col = new StreamColumn();
+        col.setName(COL1);
+        col.setRequired(true);
+        col.setType(Type.STRING);
+        def.getColumns().add(col);
+
+        return def;
+    }
+
+    private static Publishment createPublishment() {
+        Publishment pub = new Publishment();
+        pub.setType("KAFKA");
+        pub.setName("test-stream-output");
+        pub.setPolicyIds(Arrays.asList(TEST_POLICY_1));
+        return pub;
+    }
+
+    private static Kafka2TupleMetadata createKafka2TupleMetadata() {
+        Kafka2TupleMetadata ktm = new Kafka2TupleMetadata();
+        ktm.setName(TEST_DATASOURCE_1);
+        ktm.setSchemeCls("SchemeClass");
+        ktm.setTopic("tupleTopic");
+        ktm.setType("KAFKA");
+        ktm.setCodec(new Tuple2StreamMetadata());
+        return ktm;
+    }
+
+    private static Topology createSampleTopology() {
+        Topology t = new Topology(TOPO1, 3, 10);
+        for (int i = 0; i < t.getNumOfGroupBolt(); i++) {
+            t.getGroupNodeIds().add(t.getName() + "-grp-" + i);
+        }
+        for (int i = 0; i < t.getNumOfAlertBolt(); i++) {
+            t.getAlertBoltIds().add(t.getName() + "-alert-" + i);
+        }
+        return t;
+    }
+
+}