You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@eagle.apache.org by ha...@apache.org on 2016/06/02 07:07:59 UTC

[20/46] incubator-eagle git commit: [EAGLE-325] Initialize next-gen alert engine code on branch-0.5

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
deleted file mode 100644
index ccb4624..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/Coordinator.java
+++ /dev/null
@@ -1,212 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import java.text.MessageFormat;
-import java.util.Collection;
-import java.util.List;
-import java.util.concurrent.Executors;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.eagle.alert.config.ConfigBusProducer;
-import org.apache.eagle.alert.config.ConfigValue;
-import org.apache.eagle.alert.config.ZKConfig;
-import org.apache.eagle.alert.config.ZKConfigBuilder;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordinator.provider.ScheduleContextBuilder;
-import org.apache.eagle.alert.coordinator.trigger.DynamicPolicyLoader;
-import org.apache.eagle.alert.coordinator.trigger.PolicyChangeListener;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.service.IMetadataServiceClient;
-import org.apache.eagle.alert.service.MetadataServiceClientImpl;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since Mar 24, 2016 Coordinator is a standalone java application, which
- *        listens to policy changes and use schedule algorithm to distribute
- *        policies 1) reacting to shutdown events 2) start non-daemon thread to
- *        pull policies and figure out if polices are changed
- */
-public class Coordinator {
-    private static final String COORDINATOR = "coordinator";
-    /**
-     * {@link ZKMetadataChangeNotifyService}
-     *  /alert/{topologyName}/spout
-     *                  /router
-     *                  /alert
-     *                  /publisher
-     */
-    private static final String ZK_ALERT_CONFIG_SPOUT = "{0}/spout";
-    private static final String ZK_ALERT_CONFIG_ROUTER = "{0}/router";
-    private static final String ZK_ALERT_CONFIG_ALERT = "{0}/alert";
-    private static final String ZK_ALERT_CONFIG_PUBLISHER = "{0}/publisher";
-
-    private static final Logger LOG = LoggerFactory.getLogger(Coordinator.class);
-
-    private final static String METADATA_SERVICE_HOST = "metadataService.host";
-    private final static String METADATA_SERVICE_PORT = "metadataService.port";
-    private final static String METADATA_SERVICE_CONTEXT = "metadataService.context";
-    private final static String DYNAMIC_POLICY_LOADER_INIT_MILLS = "metadataDynamicCheck.initDelayMillis";
-    private final static String DYNAMIC_POLICY_LOADER_DELAY_MILLS = "metadataDynamicCheck.delayMillis";
-
-    private volatile ScheduleState currentState = null;
-    private final ConfigBusProducer producer;
-    private final IMetadataServiceClient client;
-    private Config config;
-    
-    public Coordinator() {
-        config = ConfigFactory.load().getConfig(COORDINATOR);
-        ZKConfig zkConfig = ZKConfigBuilder.getZKConfig(config);
-        producer = new ConfigBusProducer(zkConfig);
-        client = new MetadataServiceClientImpl(config);
-    }
-
-    public Coordinator(Config config, ConfigBusProducer producer, IMetadataServiceClient client) {
-        this.config = config;
-        this.producer = producer;
-        this.client = client;
-    }
-
-    public ScheduleState schedule(ScheduleOption option) {
-        IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
-        TopologyMgmtService mgmtService = new TopologyMgmtService();
-        IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
-        scheduler.init(context, mgmtService);
-        ScheduleState state = scheduler.schedule(option);
-
-        // persist & notify
-        postSchedule(client, state, producer);
-
-        currentState = state;
-        return state;
-    }
-
-    public static void postSchedule(IMetadataServiceClient client, ScheduleState state, ConfigBusProducer producer) {
-        // persist state
-        client.addScheduleState(state);
-        // TODO, see ScheduleState comments on how to better store these configs
-        // store policy assignment
-        // store monitored stream
-
-        // notify
-        ConfigValue value = new ConfigValue();
-        value.setValue(state.getVersion());
-        value.setValueVersionId(true);
-        for (String topo : state.getSpoutSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_SPOUT, topo), value);
-        }
-        for (String topo : state.getGroupSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ROUTER, topo), value);
-        }
-        for (String topo : state.getAlertSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_ALERT, topo), value);
-        }
-        for (String topo : state.getPublishSpecs().keySet()) {
-            producer.send(MessageFormat.format(ZK_ALERT_CONFIG_PUBLISHER, topo), value);
-        }
-    }
-
-    public ScheduleState getState() {
-        return currentState;
-    }
-
-    /**
-     * shutdown background threads and release various resources
-     */
-    private static class CoordinatorShutdownHook implements Runnable {
-        private static final Logger LOG = LoggerFactory.getLogger(CoordinatorShutdownHook.class);
-        private ScheduledExecutorService executorSrv;
-
-        public CoordinatorShutdownHook(ScheduledExecutorService executorSrv) {
-            this.executorSrv = executorSrv;
-        }
-
-        @Override
-        public void run() {
-            LOG.info("start shutdown coordinator ...");
-            LOG.info("Step 1 shutdown dynamic policy loader thread ");
-            // we should catch every exception to make best effort for clean
-            // shutdown
-            try {
-                executorSrv.shutdown();
-                executorSrv.awaitTermination(2000, TimeUnit.MILLISECONDS);
-            } catch (Throwable t) {
-                LOG.error("error shutdown dynamic policy loader", t);
-            } finally {
-                executorSrv.shutdownNow();
-            }
-        }
-    }
-
-    private static class PolicyChangeHandler implements PolicyChangeListener {
-        private final static Logger LOG = LoggerFactory.getLogger(PolicyChangeHandler.class);
-        private Config config;
-        private IMetadataServiceClient client;
-
-        public PolicyChangeHandler(Config config) {
-            this.config = config;
-            this.client = new MetadataServiceClientImpl(config);
-        }
-
-        @Override
-        public void onPolicyChange(List<PolicyDefinition> allPolicies, Collection<String> addedPolicies,
-                Collection<String> removedPolicies, Collection<String> modifiedPolicies) {
-            LOG.info("policy changed ... ");
-            LOG.info("allPolicies: " + allPolicies + ", addedPolicies: " + addedPolicies + ", removedPolicies: "
-                    + removedPolicies + ", modifiedPolicies: " + modifiedPolicies);
-
-            IScheduleContext context = new ScheduleContextBuilder(client).buildContext();
-            TopologyMgmtService mgmtService = new TopologyMgmtService();
-            IPolicyScheduler scheduler = PolicySchedulerFactory.createScheduler();
-
-            scheduler.init(context, mgmtService);
-
-            ScheduleState state = scheduler.schedule(new ScheduleOption());
-
-            ConfigBusProducer producer = new ConfigBusProducer(ZKConfigBuilder.getZKConfig(config));
-            postSchedule(client, state, producer);
-            producer.send("spout", new ConfigValue());
-        }
-    }
-
-    public static void main(String[] args) throws Exception {
-        Config config = ConfigFactory.load().getConfig(COORDINATOR);
-        // build dynamic policy loader
-        String host = config.getString(METADATA_SERVICE_HOST);
-        int port = config.getInt(METADATA_SERVICE_PORT);
-        String context = config.getString(METADATA_SERVICE_CONTEXT);
-        IMetadataServiceClient client = new MetadataServiceClientImpl(host, port, context);
-        DynamicPolicyLoader loader = new DynamicPolicyLoader(client);
-        loader.addPolicyChangeListener(new PolicyChangeHandler(config));
-
-        // schedule dynamic policy loader
-        long initDelayMillis = config.getLong(DYNAMIC_POLICY_LOADER_INIT_MILLS);
-        long delayMillis = config.getLong(DYNAMIC_POLICY_LOADER_DELAY_MILLS);
-        ScheduledExecutorService scheduleSrv = Executors.newScheduledThreadPool(1);
-        scheduleSrv.scheduleAtFixedRate(loader, initDelayMillis, delayMillis, TimeUnit.MILLISECONDS);
-        Runtime.getRuntime().addShutdownHook(new Thread(new CoordinatorShutdownHook(scheduleSrv)));
-        LOG.info("Eagle Coordinator started ...");
-        
-        Thread.currentThread().join();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
deleted file mode 100644
index 3458b3e..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/CoordinatorConstants.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-/**
- * @since Apr 22, 2016
- *
- */
-public class CoordinatorConstants {
-    public static final String CONFIG_ITEM_COORDINATOR = "coordinator";
-    public static final String CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND = "topologyLoadUpbound";
-    public static final String CONFIG_ITEM_BOLT_LOAD_UPBOUND = "boltLoadUpbound";
-    public static final String POLICY_DEFAULT_PARALLELISM = "policyDefaultParallelism";
-    public static final String BOLT_PARALLELISM = "boltParallelism";
-    public static final String NUM_OF_ALERT_BOLTS_PER_TOPOLOGY = "numOfAlertBoltsPerTopology";
-    public static final String POLICIES_PER_BOLT = "policiesPerBolt";
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
deleted file mode 100644
index 5e61443..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IPolicyScheduler.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-
-/**
- * @since Mar 24, 2016
- *
- */
-public interface IPolicyScheduler {
-
-    void init(IScheduleContext context, TopologyMgmtService mgmtService);
-
-    /**
-     * Build the assignments for all.
-     */
-    ScheduleState schedule(ScheduleOption option);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
deleted file mode 100644
index 0cde22d..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/IScheduleContext.java
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-
-/**
- * @since Mar 28, 2016
- *
- */
-public interface IScheduleContext {
-
-    Map<String, Topology> getTopologies();
-
-    Map<String, PolicyDefinition> getPolicies();
-
-    // data source
-    Map<String, Kafka2TupleMetadata> getDataSourceMetadata();
-
-    Map<String, StreamDefinition> getStreamSchemas();
-
-    Map<String, TopologyUsage> getTopologyUsages();
-
-    Map<String, PolicyAssignment> getPolicyAssignments();
-
-    Map<StreamGroup, MonitoredStream> getMonitoredStreams();
-    
-    Map<String, Publishment> getPublishments();
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
deleted file mode 100644
index 8bccb53..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/PolicySchedulerFactory.java
+++ /dev/null
@@ -1,30 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import org.apache.eagle.alert.coordinator.impl.GreedyPolicyScheduler;
-
-/**
- * @since Mar 24, 2016
- *
- */
-public class PolicySchedulerFactory {
-
-    public static IPolicyScheduler createScheduler() {
-        return new GreedyPolicyScheduler();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
deleted file mode 100644
index 6c04e61..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/ScheduleOption.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-/**
- * A runtime option for one schedule processing.
- * 
- * Could used for configuration override.
- * 
- * @since Apr 19, 2016
- *
- */
-public class ScheduleOption {
-    private int policiesPerBolt;
-    private int boltParallelism;
-    private int policyDefaultParallelism;
-    private double boltLoadUpbound;
-    private double topoLoadUpbound;
-
-    public int getPoliciesPerBolt() {
-        return policiesPerBolt;
-    }
-
-    public void setPoliciesPerBolt(int policiesPerBolt) {
-        this.policiesPerBolt = policiesPerBolt;
-    }
-
-    public int getBoltParallelism() {
-        return boltParallelism;
-    }
-
-    public void setBoltParallelism(int boltParallelism) {
-        this.boltParallelism = boltParallelism;
-    }
-
-    public int getPolicyDefaultParallelism() {
-        return policyDefaultParallelism;
-    }
-
-    public void setPolicyDefaultParallelism(int policyDefaultParallelism) {
-        this.policyDefaultParallelism = policyDefaultParallelism;
-    }
-
-    public double getBoltLoadUpbound() {
-        return boltLoadUpbound;
-    }
-
-    public void setBoltLoadUpbound(double boltLoadUpbound) {
-        this.boltLoadUpbound = boltLoadUpbound;
-    }
-
-    public double getTopoLoadUpbound() {
-        return topoLoadUpbound;
-    }
-
-    public void setTopoLoadUpbound(double topoLoadUpbound) {
-        this.topoLoadUpbound = topoLoadUpbound;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
deleted file mode 100644
index 4ae07f5..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/TopologyMgmtService.java
+++ /dev/null
@@ -1,83 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.NUM_OF_ALERT_BOLTS_PER_TOPOLOGY;
-
-import java.util.Collections;
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * @since Mar 29, 2016
- *
- */
-public class TopologyMgmtService {
-
-    public static class TopologyMeta {
-        public String topologyId;
-        public Topology topology;
-        public TopologyUsage usage;
-
-        public String clusterId;
-        public String nimbusHost;
-        public String nimbusPort;
-
-    }
-
-    public static class StormClusterMeta {
-        public String clusterId;
-        public String nimbusHost;
-        public String nimbusPort;
-        public String stormVersion;
-    }
-
-    @SuppressWarnings("unused")
-    private int boltParallelism = 0;
-    private int numberOfBoltsPerTopology = 0;
-
-    public TopologyMgmtService() {
-        Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
-        boltParallelism = config.getInt(CoordinatorConstants.BOLT_PARALLELISM);
-        numberOfBoltsPerTopology = config.getInt(NUM_OF_ALERT_BOLTS_PER_TOPOLOGY);
-    }
-    
-    public int getNumberOfAlertBoltsInTopology() {
-        return numberOfBoltsPerTopology;
-    }
-
-    /**
-     * TODO: call topology mgmt API to create a topology
-     * 
-     * @return
-     */
-    public TopologyMeta creatTopology() {
-        // TODO
-        throw new UnsupportedOperationException("not supported yet!");
-    }
-    
-    public List<TopologyMeta> listTopologies() {
-        // TODO
-        return Collections.emptyList();
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
deleted file mode 100644
index a9b6c00..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/GreedyPolicyScheduler.java
+++ /dev/null
@@ -1,340 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_BOLT_LOAD_UPBOUND;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_COORDINATOR;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICIES_PER_BOLT;
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.POLICY_DEFAULT_PARALLELISM;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IPolicyScheduler;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.ScheduleOption;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.coordinator.provider.InMemScheduleConext;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.utils.JsonUtils;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * A simple greedy assigner. <br/>
- * A greedy assigner simply loop the policies, find the most suitable topology
- * to locate the policy first, then assign the topics to corresponding
- * spouts/group-by bolts.
- * 
- * <br/>
- * For each given policy, the greedy steps are
- * <ul>
- * <li>1. Find the same topology that already serve the policy without exceed the load</li>
- * <li>2. Find the topology that already take the source traffic without exceed the load</li>
- * <li>3. Find the topology that available to place source topic without exceed the load</li>
- * <li>4. Create a new topology and locate the policy</li>
- * <li>Route table generated after all policies assigned</li>
- * <ul>
- * <br/>
- * 
- * @since Mar 24, 2016
- *
- */
-public class GreedyPolicyScheduler implements IPolicyScheduler {
-
-    private static final Logger LOG = LoggerFactory.getLogger(GreedyPolicyScheduler.class);
-
-    private int policiesPerBolt;
-    private int policyDefaultParallelism;
-    private int initialQueueSize;
-    private double boltLoadUpbound;
-
-    // copied context for scheduling
-    private IScheduleContext context;
-
-    private TopologyMgmtService mgmtService;
-
-    private ScheduleState state;
-
-    public GreedyPolicyScheduler() {
-        Config config = ConfigFactory.load().getConfig(CONFIG_ITEM_COORDINATOR);
-        policiesPerBolt = config.getInt(POLICIES_PER_BOLT);
-        policyDefaultParallelism = config.getInt(POLICY_DEFAULT_PARALLELISM);
-        initialQueueSize = policyDefaultParallelism;
-        boltLoadUpbound = config.getDouble(CONFIG_ITEM_BOLT_LOAD_UPBOUND);
-    }
-
-    public synchronized ScheduleState schedule(ScheduleOption option) {
-        // FIXME: never re-assign right now: sticky mode
-        // TODO: how to identify the over-heat nodes? not yet done #Scale of policies
-        // Answer: Use configured policiesPerBolt and configured bolt load up-bound
-        // FIXME: Here could be strategy to define the topology priorities
-        List<WorkItem> workSets = findWorkingSets();
-        /**
-         * <pre>
-         * <ul>
-         * <li>how to support take multiple "dumped" topology that consuming the same input as one available sets?</li>
-         * Answer: spout spec generated after policy assigned
-         * <li>How to define the input traffic partition?</li>
-         * Answer: input traffic might not be supported right now.
-         * <li>How to support traffic partition between topology?</li> 
-         * Answer: two possible place: a global route table will be generated, those target not in current topology tuples will be dropped. This make the partition for tuple to alert
-         * <li>How to support add topology on demand by evaluate the available topology bandwidth(need topology level load)?</li>
-         * Answer: Use configured topology load up-bound, when topology load is available, will adopt
-         * <ul>
-         * <pre>
-         */
-        List<ScheduleResult> results = new ArrayList<ScheduleResult>();
-        Map<String, PolicyAssignment> newAssignments = new HashMap<String, PolicyAssignment>();
-        for (WorkItem item : workSets) {
-            ScheduleResult r = schedulePolicy(item, newAssignments);
-            results.add(r);
-        }
-
-        state = generateMonitorMetadata(workSets, newAssignments);
-        if (LOG.isDebugEnabled()) {
-            LOG.debug("calculated schedule state: {}", JsonUtils.writeValueAsString(state));
-        }
-        return state;
-    }
-
-    private List<WorkItem> findWorkingSets() {
-        // find the unassigned definition
-        List<WorkItem> workSets = new LinkedList<WorkItem>();
-        for (PolicyDefinition def : context.getPolicies().values()) {
-            int expectParal = def.getParallelismHint();
-            if (expectParal == 0) {
-                expectParal = policyDefaultParallelism;
-            }
-            // how to handle expand of an policy in a smooth transition manner
-            PolicyAssignment assignment = context.getPolicyAssignments().get(def.getName());
-            if (assignment != null) {
-                LOG.info("policy {} already allocated", def.getName());
-                continue;
-            }
-
-            WorkItem item = new WorkItem(def, expectParal);
-            workSets.add(item);
-        }
-        LOG.info("work set calculation: {}", workSets);
-        return workSets;
-    }
-
-    private ScheduleState generateMonitorMetadata(List<WorkItem> expandworkSets,
-            Map<String, PolicyAssignment> newAssignments) {
-        MonitorMetadataGenerator generator = new MonitorMetadataGenerator(context);
-        return generator.generate(expandworkSets);
-    }
-
-    private void placePolicy(PolicyDefinition def, AlertBoltUsage alertBoltUsage, Topology targetTopology,
-            TopologyUsage usage) {
-        String policyName = def.getName();
-
-        // topology usage update
-        alertBoltUsage.addPolicies(def);
-
-        // update alert policy
-        usage.getPolicies().add(policyName);
-
-        // update source topics
-        updateDataSource(usage, def);
-        
-        // update group-by
-        updateGrouping(usage, def);
-    }
-
-    private void updateGrouping(TopologyUsage usage, PolicyDefinition def) {
-        // groupByMeta is removed since groupspec generate doesnt need it now. 
-//        List<StreamPartition> policyPartitionSpec = def.getPartitionSpec();
-//        Map<String, List<StreamPartition>> groupByMeta = usage.getGroupByMeta();
-//        for (StreamPartition par : policyPartitionSpec) {
-//            List<StreamPartition> partitions = groupByMeta.get(par.getStreamId());
-//            if (partitions == null) {
-//                partitions = new ArrayList<StreamPartition>();
-//                // de-dup of the partition on the list?
-//                groupByMeta.put(par.getStreamId(), partitions);
-//            }
-//            if (!partitions.contains(par)) {
-//                partitions.add(par);
-//            }
-//        }
-    }
-
-    private void updateDataSource(TopologyUsage usage, PolicyDefinition def) {
-        List<String> datasources = findDatasource(def);
-        usage.getDataSources().addAll(datasources);
-    }
-
-    private List<String> findDatasource(PolicyDefinition def) {
-        List<String> result = new ArrayList<String>();
-
-        List<String> inputStreams = def.getInputStreams();
-        Map<String, StreamDefinition> schemaMaps = context.getStreamSchemas();
-        for (String is : inputStreams) {
-            StreamDefinition ss = schemaMaps.get(is);
-            result.add(ss.getDataSource());
-        }
-        return result;
-    }
-
-    /**
-     * For each given policy, the greedy steps are
-     * <ul>
-     * <li>1. Find the same topology that already serve the policy</li>
-     * <li>2. Find the topology that already take the source traffic</li>
-     * <li>3. Find the topology that available to place source topic</li>
-     * <li>4. Create a new topology and locate the policy</li>
-     * <li>Route table generated after all policies assigned</li>
-     * <ul>
-     * <br/>
-     * 
-     * @param newAssignments
-     */
-    private ScheduleResult schedulePolicy(WorkItem item, Map<String, PolicyAssignment> newAssignments) {
-        LOG.info(" schedule for {}", item );
-
-        String policyName = item.def.getName();
-        StreamGroup policyStreamPartition = new StreamGroup();
-        if (item.def.getPartitionSpec().isEmpty()) {
-            LOG.error(" policy {} partition spec is empty! ", policyName);
-            ScheduleResult result = new ScheduleResult();
-            result.policyName = policyName;
-            result.code = 400;
-            result.message = "policy doesn't have partition spec";
-            return result;
-        }
-        policyStreamPartition.addStreamPartitions(item.def.getPartitionSpec());
-
-        MonitoredStream targetdStream = context.getMonitoredStreams().get(policyStreamPartition);
-        if (targetdStream == null) {
-            targetdStream = new MonitoredStream(policyStreamPartition);
-            context.getMonitoredStreams().put(policyStreamPartition, targetdStream);
-        }
-
-        ScheduleResult result = new ScheduleResult();
-        result.policyName = policyName;
-
-        StreamWorkSlotQueue queue = findWorkSlotQueue(targetdStream, item.def);
-        if (queue == null) {
-            result.code = 400;
-            result.message = String.format("unable to allocate work queue resource for policy %s !", policyName);
-        } else {
-            placePolicyToQueue(item.def, queue, newAssignments);
-            result.code = 200;
-            result.message = "OK";
-        }
-
-        LOG.info(" schedule result : {}", result);
-        return result;
-    }
-
-    private void placePolicyToQueue(PolicyDefinition def, StreamWorkSlotQueue queue,
-            Map<String, PolicyAssignment> newAssignments) {
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            Topology targetTopology = context.getTopologies().get(slot.getTopologyName());
-            TopologyUsage usage = context.getTopologyUsages().get(slot.getTopologyName());
-            AlertBoltUsage alertBoltUsage = usage.getAlertBoltUsage(slot.getBoltId());
-            placePolicy(def, alertBoltUsage, targetTopology, usage);
-        }
-//        queue.placePolicy(def);
-        PolicyAssignment assignment = new PolicyAssignment(def.getName(), queue.getQueueId());
-        context.getPolicyAssignments().put(def.getName(), assignment);
-        newAssignments.put(def.getName(), assignment);
-    }
-
-    private StreamWorkSlotQueue findWorkSlotQueue(MonitoredStream targetdStream, PolicyDefinition def) {
-        StreamWorkSlotQueue targetQueue = null;
-        for (StreamWorkSlotQueue queue : targetdStream.getQueues()) {
-            if (isQueueAvailable(queue, def)) {
-                targetQueue = queue;
-                break;
-            }
-        }
-
-        if (targetQueue == null) {
-            WorkQueueBuilder builder = new WorkQueueBuilder(context, mgmtService);
-            // TODO : get the properties from policy definiton
-            targetQueue = builder.createQueue(targetdStream, false, getQueueSize(def.getParallelismHint()),
-                    new HashMap<String, Object>());
-        }
-        return targetQueue;
-    }
-
-    /**
-     * Some strategy to generate correct size in Startegy of queue builder
-     * 
-     * @param hint
-     * @return
-     */
-    private int getQueueSize(int hint) {
-        return initialQueueSize;
-    }
-
-    private boolean isQueueAvailable(StreamWorkSlotQueue queue, PolicyDefinition def) {
-        if (queue.getQueueSize() < def.getParallelismHint()) {
-            return false;
-        }
-
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
-            AlertBoltUsage usage = u.getAlertBoltUsage(slot.getBoltId());
-            if (!isBoltAvailable(usage, def)) {
-                return false;
-            }
-        }
-        return true;
-    }
-
-    private boolean isBoltAvailable(AlertBoltUsage boltUsage, PolicyDefinition def) {
-        // overload or over policy # or already contains
-        if (boltUsage == null || boltUsage.getLoad() > boltLoadUpbound
-                || boltUsage.getPolicies().size() > policiesPerBolt || boltUsage.getPolicies().contains(def.getName())) {
-            return false;
-        }
-        return true;
-    }
-
-    public void init(IScheduleContext context, TopologyMgmtService mgmtService) {
-        this.context = new InMemScheduleConext(context);
-        this.mgmtService = mgmtService;
-    }
-    
-    public IScheduleContext getContext() {
-        return context;
-    }
-
-    public ScheduleState getState() {
-        return state;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
deleted file mode 100644
index 40f16e9..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/MonitorMetadataGenerator.java
+++ /dev/null
@@ -1,287 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.AlertBoltSpec;
-import org.apache.eagle.alert.coordination.model.Kafka2TupleMetadata;
-import org.apache.eagle.alert.coordination.model.PolicyWorkerQueue;
-import org.apache.eagle.alert.coordination.model.PublishSpec;
-import org.apache.eagle.alert.coordination.model.RouterSpec;
-import org.apache.eagle.alert.coordination.model.ScheduleState;
-import org.apache.eagle.alert.coordination.model.SpoutSpec;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionMetadata;
-import org.apache.eagle.alert.coordination.model.StreamRepartitionStrategy;
-import org.apache.eagle.alert.coordination.model.StreamRouterSpec;
-import org.apache.eagle.alert.coordination.model.Tuple2StreamMetadata;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-import org.apache.eagle.alert.engine.coordinator.Publishment;
-import org.apache.eagle.alert.engine.coordinator.StreamDefinition;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Apr 26, 2016
- * Given current policy placement, figure out monitor metadata
- * 
- * TODO: refactor to eliminate the duplicate of stupid if-notInMap-then-create....
- * FIXME: too many duplicated code logic : check null; add list to map; add to list.. 
- */
-public class MonitorMetadataGenerator {
-
-    private static final Logger LOG = LoggerFactory.getLogger(MonitorMetadataGenerator.class);
-
-    private IScheduleContext context;
-
-    public MonitorMetadataGenerator(IScheduleContext context) {
-        this.context = context;
-    }
-
-    public ScheduleState generate(List<WorkItem> expandworkSets) {
-        // topologyId -> SpoutSpec
-        Map<String, SpoutSpec> topoSpoutSpecsMap = generateSpoutMonitorMetadata();
-
-        // grp-by meta spec(sort & grp)
-        Map<String, RouterSpec> groupSpecsMap = generateGroupbyMonitorMetadata();
-
-        // alert bolt spec
-        Map<String, AlertBoltSpec> alertSpecsMap = generateAlertMonitorMetadata();
-
-        Map<String, PublishSpec> publishSpecsMap = generatePublishMetadata();
-
-        String uniqueVersion = generateVersion();
-        ScheduleState status = new ScheduleState(uniqueVersion, 
-                topoSpoutSpecsMap, 
-                groupSpecsMap, 
-                alertSpecsMap,
-                publishSpecsMap, 
-                context.getPolicyAssignments().values(), 
-                context.getMonitoredStreams().values(),
-                context.getPolicies().values(),
-                context.getStreamSchemas().values());
-        return status;
-    }
-
-    private Map<String, PublishSpec> generatePublishMetadata() {
-        Map<String, PublishSpec> pubSpecs = new HashMap<String, PublishSpec>();
-        // prebuild policy to publishment map
-        Map<String, List<Publishment>> policyToPub = new HashMap<String, List<Publishment>>();
-        for (Publishment pub : context.getPublishments().values()) {
-            for (String policyId : pub.getPolicyIds()) {
-                List<Publishment> policyPubs = policyToPub.get(policyId);
-                if (policyPubs == null) {
-                    policyPubs = new ArrayList<>();
-                    policyToPub.put(policyId, policyPubs);
-                }
-                policyPubs.add(pub);
-            }
-        }
-
-        // build per topology
-        for (TopologyUsage u : context.getTopologyUsages().values()) {
-            PublishSpec pubSpec = pubSpecs.get(u.getTopoName());
-            if (pubSpec == null) {
-                pubSpec = new PublishSpec(u.getTopoName(), context.getTopologies().get(u.getTopoName()).getPubBoltId());
-                pubSpecs.put(u.getTopoName(), pubSpec);
-            }
-
-            for (String p : u.getPolicies()) {
-                PolicyDefinition definition = context.getPolicies().get(p);
-                if (definition == null) {
-                    continue;
-                }
-                if (policyToPub.containsKey(p)) {
-                    for (Publishment pub : policyToPub.get(p)) {
-                        pubSpec.addPublishment(pub);
-                    }
-                }
-            }
-        }
-        return pubSpecs;
-    }
-
-    /**
-     * FIXME: add auto-increment version number?
-     */
-    private String generateVersion() {
-        return "spec_version_" + System.currentTimeMillis();
-    }
-
-    private Map<String, AlertBoltSpec> generateAlertMonitorMetadata() {
-        Map<String, AlertBoltSpec> alertSpecs = new HashMap<String, AlertBoltSpec>();
-        for (TopologyUsage u : context.getTopologyUsages().values()) {
-            AlertBoltSpec alertSpec = alertSpecs.get(u.getTopoName());
-            if (alertSpec == null) {
-                alertSpec = new AlertBoltSpec(u.getTopoName());
-                alertSpecs.put(u.getTopoName(), alertSpec);
-            }
-            for (AlertBoltUsage boltUsage : u.getAlertUsages().values()) {
-                for (String policyName : boltUsage.getPolicies()) {
-                    PolicyDefinition definition = context.getPolicies().get(policyName);
-                    alertSpec.addBoltPolicy(boltUsage.getBoltId(), definition.getName());
-                }
-            }
-        }
-        return alertSpecs;
-    }
-
-    private Map<String, RouterSpec> generateGroupbyMonitorMetadata() {
-        Map<String, RouterSpec> groupSpecsMap = new HashMap<String, RouterSpec>();
-        for (TopologyUsage u : context.getTopologyUsages().values()) {
-            RouterSpec spec = groupSpecsMap.get(u.getTopoName());
-            if (spec == null) {
-                spec = new RouterSpec(u.getTopoName());
-                groupSpecsMap.put(u.getTopoName(), spec);
-            }
-            
-            for (MonitoredStream ms : u.getMonitoredStream()) {
-                // mutiple stream on the same policy group : for correlation group case:
-                for (StreamPartition partiton : ms.getStreamGroup().getStreamPartitions()) {
-                    StreamRouterSpec routeSpec = new StreamRouterSpec();
-                    routeSpec.setPartition(partiton);
-                    routeSpec.setStreamId(partiton.getStreamId());
-
-                    for (StreamWorkSlotQueue sq : ms.getQueues()) {
-                        PolicyWorkerQueue queue = new PolicyWorkerQueue();
-                        queue.setWorkers(sq.getWorkingSlots());
-                        queue.setPartition(partiton);
-                        routeSpec.addQueue(queue);
-                    }
-
-                    spec.addRouterSpec(routeSpec);
-                }
-            }
-        }
-
-        return groupSpecsMap;
-    }
-
-    private Map<String, SpoutSpec> generateSpoutMonitorMetadata() {
-        Map<String, StreamWorkSlotQueue> queueMap = buildQueueMap();
-        
-        Map<String, SpoutSpec> topoSpoutSpecsMap = new HashMap<String, SpoutSpec>();
-        // streamName -> StreamDefinition
-        Map<String, StreamDefinition> streamSchemaMap = context.getStreamSchemas();
-        Map<String, Kafka2TupleMetadata> datasourcesMap = context.getDataSourceMetadata();
-        for (TopologyUsage usage : context.getTopologyUsages().values()) {
-            Topology topo = context.getTopologies().get(usage.getTopoName());
-
-            // based on data source schemas
-            // generate topic -> Kafka2TupleMetadata
-            // generate topic -> Tuple2StreamMetadata (actually the schema selector)
-            Map<String, Kafka2TupleMetadata> dss = new HashMap<String, Kafka2TupleMetadata>();
-            Map<String, Tuple2StreamMetadata> tss = new HashMap<String, Tuple2StreamMetadata>();
-            for (String dataSourceId : usage.getDataSources()) {
-                Kafka2TupleMetadata ds = datasourcesMap.get(dataSourceId);
-                dss.put(ds.getTopic(), ds);
-                tss.put(ds.getTopic(), ds.getCodec());
-            }
-
-            // generate topicId -> StreamRepartitionMetadata
-            Map<String, List<StreamRepartitionMetadata>> streamsMap = new HashMap<String, List<StreamRepartitionMetadata>>();
-            for (String policyName : usage.getPolicies()) {
-                PolicyDefinition def = context.getPolicies().get(policyName);
-                
-                PolicyAssignment assignment = context.getPolicyAssignments().get(policyName);
-                if (assignment == null) {
-                    LOG.error(" can not find assignment for policy {} ! ", policyName);
-                    continue;
-                }
-
-                for (StreamPartition policyStreamPartition : def.getPartitionSpec()) {
-                    String stream = policyStreamPartition.getStreamId();
-                    StreamDefinition schema = streamSchemaMap.get(stream);
-                    String topic = datasourcesMap.get(schema.getDataSource()).getTopic();
-
-                    // add stream name to tuple metadata
-                    if (tss.containsKey(topic)) {
-                        Tuple2StreamMetadata tupleMetadata = tss.get(topic);
-                        tupleMetadata.getActiveStreamNames().add(stream);
-                    }
-
-                    // grouping strategy
-                    StreamRepartitionStrategy gs = new StreamRepartitionStrategy();
-                    gs.partition = policyStreamPartition;
-                    gs.numTotalParticipatingRouterBolts = queueMap.get(assignment.getQueueId()).getNumberOfGroupBolts();
-                    gs.startSequence = queueMap.get(assignment.getQueueId()).getTopologyGroupStartIndex(topo.getName());
-                    gs.totalTargetBoltIds = new ArrayList<String>(topo.getGroupNodeIds());
-
-                    // add to map
-                    addGroupingStrategy(streamsMap, stream, schema, topic, schema.getDataSource(), gs);
-                }
-            }
-
-            SpoutSpec spoutSpec = new SpoutSpec(topo.getName(), streamsMap, tss, dss);
-            topoSpoutSpecsMap.put(topo.getName(), spoutSpec);
-        }
-        return topoSpoutSpecsMap;
-    }
-
-    /**
-     * Work queue not a root level object, thus we need to build a map from
-     * MonitoredStream for later quick lookup
-     * 
-     * @return
-     */
-    private Map<String, StreamWorkSlotQueue> buildQueueMap() {
-        Map<String, StreamWorkSlotQueue> queueMap = new HashMap<String, StreamWorkSlotQueue>();
-        for (MonitoredStream ms : context.getMonitoredStreams().values()) {
-            for (StreamWorkSlotQueue queue : ms.getQueues()) {
-                queueMap.put(queue.getQueueId(), queue);
-            }
-        }
-        return queueMap;
-    }
-
-    private void addGroupingStrategy(Map<String, List<StreamRepartitionMetadata>> streamsMap, String stream,
-            StreamDefinition schema, String topicName, String datasourceName, StreamRepartitionStrategy gs) {
-        List<StreamRepartitionMetadata> dsStreamMeta;
-        if (streamsMap.containsKey(topicName)) {
-            dsStreamMeta = streamsMap.get(topicName);
-        } else {
-            dsStreamMeta = new ArrayList<StreamRepartitionMetadata>();
-            streamsMap.put(topicName, dsStreamMeta);
-        }
-        StreamRepartitionMetadata targetSm = null;
-        for (StreamRepartitionMetadata sm : dsStreamMeta) {
-            if (stream.equalsIgnoreCase(sm.getStreamId())) {
-                targetSm = sm;
-                break;
-            }
-        }
-        if (targetSm == null) {
-            targetSm = new StreamRepartitionMetadata(datasourceName, schema.getStreamId());
-            dsStreamMeta.add(targetSm);
-        }
-        if (!targetSm.groupingStrategies.contains(gs)) {
-            targetSm.addGroupStrategy(gs);
-        }
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
deleted file mode 100644
index ea96d79..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/ScheduleResult.java
+++ /dev/null
@@ -1,42 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.internal.PolicyAssignment;
-import org.apache.eagle.alert.engine.coordinator.StreamPartition;
-
-/**
- * Schedule result for one policy
- * 
- * 
- * @since Apr 26, 2016
- *
- */
-public class ScheduleResult {
-    int code;
-    String message;
-    String policyName;
-    StreamPartition partition;
-    int index;
-    List<PolicyAssignment> topoliciesScheduled;
-
-    public String toString() {
-        return String.format("policy: %s, result code: %d ", policyName, code, message);
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
deleted file mode 100644
index baa489d..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkItem.java
+++ /dev/null
@@ -1,33 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-public class WorkItem {
-    public final PolicyDefinition def;
-    public final int requestParallelism;
-
-    public WorkItem(PolicyDefinition def, int workNum) {
-        this.def = def;
-        this.requestParallelism = workNum;
-    }
-
-    public String toString() {
-        return "policy name: " + def.getName() + "(" + requestParallelism + ")";
-    }
-}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
deleted file mode 100644
index a32b8fb..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/WorkQueueBuilder.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl;
-
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.impl.strategies.IWorkSlotStrategy;
-import org.apache.eagle.alert.coordinator.impl.strategies.SameTopologySlotStrategy;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * @since Apr 27, 2016
- *
- */
-public class WorkQueueBuilder {
-
-    private static final Logger LOG = LoggerFactory.getLogger(WorkQueueBuilder.class);
-
-    private final IScheduleContext context;
-    private final TopologyMgmtService mgmtService;
-
-    public WorkQueueBuilder(IScheduleContext context, TopologyMgmtService mgmtService) {
-        this.context = context;
-        this.mgmtService = mgmtService;
-    }
-
-    public StreamWorkSlotQueue createQueue(MonitoredStream stream, boolean isDedicated, int size,
-            Map<String, Object> properties) {
-        // FIXME: make extensible and configurable
-        IWorkSlotStrategy strategy = new SameTopologySlotStrategy(context, stream.getStreamGroup(), mgmtService);
-        List<WorkSlot> slots = strategy.reserveWorkSlots(size, isDedicated, properties);
-        if (slots.size() < size) {
-            LOG.error("allocat stream work queue failed, required size");
-            return null;
-        }
-        StreamWorkSlotQueue queue = new StreamWorkSlotQueue(stream.getStreamGroup(), isDedicated, properties,
-                slots);
-        calculateGroupIndexAndCount(queue);
-        assignQueueSlots(stream, queue);// build reverse reference
-        stream.addQueues(queue);
-
-        return queue;
-    }
-
-    private void assignQueueSlots(MonitoredStream stream, StreamWorkSlotQueue queue) {
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            TopologyUsage u = context.getTopologyUsages().get(slot.getTopologyName());
-            AlertBoltUsage boltUsage = u.getAlertBoltUsage(slot.getBoltId());
-            boltUsage.addQueue(stream.getStreamGroup(), queue);
-            u.addMonitoredStream(stream);
-        }
-    }
-
-    private void calculateGroupIndexAndCount(StreamWorkSlotQueue queue) {
-        Map<String, Integer> result = new HashMap<String, Integer>();
-        int total = 0;
-        for (WorkSlot slot : queue.getWorkingSlots()) {
-            if (result.containsKey(slot.getTopologyName())) {
-                continue;
-            }
-            result.put(slot.getTopologyName(), total);
-            total += context.getTopologies().get(slot.getTopologyName()).getNumOfGroupBolt();
-        }
-
-        queue.setNumberOfGroupBolts(total);
-        queue.setTopoGroupStartIndex(result);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
deleted file mode 100644
index 28df3c4..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/IWorkSlotStrategy.java
+++ /dev/null
@@ -1,32 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl.strategies;
-
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-
-/**
- * @since Apr 27, 2016
- *
- */
-public interface IWorkSlotStrategy {
-
-    List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties);
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
deleted file mode 100644
index e755237..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/impl/strategies/SameTopologySlotStrategy.java
+++ /dev/null
@@ -1,164 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.impl.strategies;
-
-import static org.apache.eagle.alert.coordinator.CoordinatorConstants.CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND;
-
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.Iterator;
-import java.util.List;
-import java.util.Map;
-
-import org.apache.eagle.alert.coordination.model.WorkSlot;
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.Topology;
-import org.apache.eagle.alert.coordinator.CoordinatorConstants;
-import org.apache.eagle.alert.coordinator.IScheduleContext;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService;
-import org.apache.eagle.alert.coordinator.TopologyMgmtService.TopologyMeta;
-import org.apache.eagle.alert.coordinator.model.AlertBoltUsage;
-import org.apache.eagle.alert.coordinator.model.TopologyUsage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.typesafe.config.Config;
-import com.typesafe.config.ConfigFactory;
-
-/**
- * A simple strategy that only find the bolts in the same topology as the
- * required work slots.
- * 
- * Invariant:<br/>
- * One slot queue only on the one topology.<br/>
- * One topology doesn't contains two same partition slot queues.
- * 
- * @since Apr 27, 2016
- *
- */
-public class SameTopologySlotStrategy implements IWorkSlotStrategy {
-
-    private static final Logger LOG = LoggerFactory.getLogger(SameTopologySlotStrategy.class);
-
-    private final IScheduleContext context;
-    private final StreamGroup partitionGroup;
-    private final TopologyMgmtService mgmtService;
-
-//    private final int numOfPoliciesBoundPerBolt;
-    private final double topoLoadUpbound;
-
-    public SameTopologySlotStrategy(IScheduleContext context, StreamGroup streamPartitionGroup,
-            TopologyMgmtService mgmtService) {
-        this.context = context;
-        this.partitionGroup = streamPartitionGroup;
-        this.mgmtService = mgmtService;
-
-        Config config = ConfigFactory.load().getConfig(CoordinatorConstants.CONFIG_ITEM_COORDINATOR);
-//        numOfPoliciesBoundPerBolt = config.getInt(CoordinatorConstants.POLICIES_PER_BOLT);
-        topoLoadUpbound = config.getDouble(CONFIG_ITEM_TOPOLOGY_LOAD_UPBOUND);
-    }
-
-    /**
-     * @param isDedicated
-     *            - not used yet!
-     */
-    @Override
-    public List<WorkSlot> reserveWorkSlots(int size, boolean isDedicated, Map<String, Object> properties) {
-        Iterator<Topology> it = context.getTopologies().values().stream().filter((t) -> t.getNumOfAlertBolt() >= size)
-                .iterator();
-        // priority strategy first???
-        List<WorkSlot> slots = new ArrayList<WorkSlot>();
-        while (it.hasNext()) {
-            Topology t = it.next();
-            if (getQueueOnTopology(size, slots, t)) {
-                break;
-            }
-        }
-
-        if (slots.size() == 0) {
-            int supportedSize = mgmtService.getNumberOfAlertBoltsInTopology();
-            if (size > supportedSize) {
-                LOG.error("can not find available slots for queue, required size {}, supported size {} !", size, supportedSize);
-                return Collections.emptyList();
-            }
-            TopologyMeta topoMeta = mgmtService.creatTopology();
-            if (topoMeta == null) {
-                LOG.error("can not create topology for given queue requirement, required size {}, requried partition group {} !", size, partitionGroup);
-                return Collections.emptyList();
-            }
-
-            context.getTopologies().put(topoMeta.topologyId, topoMeta.topology);
-            context.getTopologyUsages().put(topoMeta.topologyId, topoMeta.usage);
-            boolean placed = getQueueOnTopology(size, slots, topoMeta.topology);
-            if (!placed) {
-                LOG.error("can not find available slots from new created topology, required size {}. This indicates an error !", size);
-            }
-        }
-        return slots;
-    }
-
-    private boolean getQueueOnTopology(int size, List<WorkSlot> slots, Topology t) {
-        TopologyUsage u = context.getTopologyUsages().get(t.getName());
-        if (!isTopologyAvailable(u)) {
-            return false;
-        }
-
-        List<String> bolts = new ArrayList<String>();
-        for (AlertBoltUsage alertUsage : u.getAlertUsages().values()) {
-            if (isBoltAvailable(alertUsage)) {
-                bolts.add(alertUsage.getBoltId());
-            }
-
-            if (bolts.size() == size) {
-                break;
-            }
-        }
-
-        if (bolts.size() == size) {
-            for (String boltId : bolts) {
-                WorkSlot slot = new WorkSlot(t.getName(), boltId);
-                slots.add(slot);
-            }
-            return true;
-        }
-        return false;
-    }
-
-    private boolean isTopologyAvailable(TopologyUsage u) {
-//        for (MonitoredStream stream : u.getMonitoredStream()) {
-//            if (partition.equals(stream.getStreamParitition())) {
-//                return false;
-//            }
-//        }
-        if (u == null || u.getLoad() > topoLoadUpbound) {
-            return false;
-        }
-        
-        return true;
-    }
-
-    private boolean isBoltAvailable(AlertBoltUsage alertUsage) {
-        // FIXME : more detail to compare on queue exclusion check
-        if (alertUsage.getQueueSize() > 0) {
-            return false;
-        }
-        // actually it's now 0;
-        return true;
-//        return alertUsage.getPolicies().size() < numOfPoliciesBoundPerBolt;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
deleted file mode 100644
index e9148f5..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/AlertBoltUsage.java
+++ /dev/null
@@ -1,93 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-import java.util.ArrayList;
-import java.util.List;
-
-import org.apache.eagle.alert.coordination.model.internal.StreamGroup;
-import org.apache.eagle.alert.coordination.model.internal.StreamWorkSlotQueue;
-import org.apache.eagle.alert.engine.coordinator.PolicyDefinition;
-
-/**
- * @since Mar 28, 2016
- *
- */
-public class AlertBoltUsage {
-
-    private String boltId;
-    private List<String> policies = new ArrayList<String>();
-    // the stream partitions group that scheduled for this given alert bolt
-    private List<StreamGroup> partitions = new ArrayList<StreamGroup>();
-    // the slot queue that scheduled for this given aler bolt
-    private List<StreamWorkSlotQueue> referQueues = new ArrayList<StreamWorkSlotQueue>();
-    private double load;
-
-    public AlertBoltUsage(String anid) {
-        this.boltId = anid;
-    }
-
-    public String getBoltId() {
-        return boltId;
-    }
-
-    public void setBoltId(String boltId) {
-        this.boltId = boltId;
-    }
-
-    public List<String> getPolicies() {
-        return policies;
-    }
-
-    public void addPolicies(PolicyDefinition pd) {
-        policies.add(pd.getName());
-        // add first partition
-//        for (StreamPartition par : pd.getPartitionSpec()) {
-//            partitions.add(par);
-//        }
-    }
-
-    public double getLoad() {
-        return load;
-    }
-
-    public void setLoad(double load) {
-        this.load = load;
-    }
-
-    public List<StreamGroup> getPartitions() {
-        return partitions;
-    }
-
-    public List<StreamWorkSlotQueue> getReferQueues() {
-        return referQueues;
-    }
-    
-    public int getQueueSize() {
-        return referQueues.size();
-    }
-
-    public void addQueue(StreamGroup streamPartition, StreamWorkSlotQueue queue) {
-        this.referQueues.add(queue);
-        this.partitions.add(streamPartition);
-    }
-
-    public void removeQueue(StreamWorkSlotQueue queue) {
-        this.referQueues.remove(queue);
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
deleted file mode 100644
index 86238d1..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/GroupBoltUsage.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-
-/**
- * @since Mar 28, 2016
- *
- */
-public class GroupBoltUsage {
-
-    private String boltId;
-    private double load;
-    
-    public GroupBoltUsage(String boltId) {
-        this.boltId = boltId;
-    }
-
-//    private final Set<String> streams = new HashSet<String>();
-//    private final Map<String, StreamFilter> filters = new HashMap<String, StreamFilter>();
-
-//    private final Map<String, List<StreamPartition>> groupByMeta;
-
-    public double getLoad() {
-        return load;
-    }
-
-    public void setLoad(double load) {
-        this.load = load;
-    }
-
-//    public Set<String> getStreams() {
-//        return streams;
-//    }
-//
-//
-//    public Map<String, StreamFilter> getFilters() {
-//        return filters;
-//    }
-
-//    public Map<String, List<StreamPartition>> getGroupByMeta() {
-//        return groupByMeta;
-//    }
-
-    public String getBoltId() {
-        return boltId;
-    }
-
-    public void setBoltId(String boltId) {
-        this.boltId = boltId;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/incubator-eagle/blob/737e5a2d/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
----------------------------------------------------------------------
diff --git a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java b/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
deleted file mode 100644
index 6eb6195..0000000
--- a/eagle-core/eagle-alert/alert/alert-coordinator/src/main/java/org/apache/eagle/alert/coordinator/model/TopologyUsage.java
+++ /dev/null
@@ -1,102 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *    http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.eagle.alert.coordinator.model;
-
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-
-import org.apache.eagle.alert.coordination.model.internal.MonitoredStream;
-
-/**
- * @since Mar 27, 2016
- *
- */
-public class TopologyUsage {
-    // topo info
-    private String topoName;
-    private final Set<String> datasources = new HashSet<String>();
-    // usage information
-    private final Set<String> policies = new HashSet<String>();
-    private final Map<String, AlertBoltUsage> alertUsages = new HashMap<String, AlertBoltUsage>();
-    private final Map<String, GroupBoltUsage> groupUsages = new HashMap<String, GroupBoltUsage>();
-    private final List<MonitoredStream> monitoredStream = new ArrayList<MonitoredStream>();
-
-    private double load;
-
-    /**
-     * This is to be the existing/previous meta-data. <br/>
-     * Only one group meta-data for all of the group bolts in this topology.
-     */
-
-    public TopologyUsage() {
-    }
-    
-    public TopologyUsage(String name) {
-        this.topoName = name;
-    }
-    
-    public String getTopoName() {
-        return topoName;
-    }
-
-    public void setTopoName(String topoId) {
-        this.topoName = topoId;
-    }
-
-    public Set<String> getDataSources() {
-        return datasources;
-    }
-
-    public Set<String> getPolicies() {
-        return policies;
-    }
-
-    public Map<String, AlertBoltUsage> getAlertUsages() {
-        return alertUsages;
-    }
-
-    public AlertBoltUsage getAlertBoltUsage(String boltId) {
-        return alertUsages.get(boltId);
-    }
-
-    public Map<String, GroupBoltUsage> getGroupUsages() {
-        return groupUsages;
-    }
-
-    public List<MonitoredStream> getMonitoredStream() {
-        return monitoredStream;
-    }
-
-    public void addMonitoredStream(MonitoredStream par) {
-        if (!this.monitoredStream.contains(par)) {
-            this.monitoredStream.add(par);
-        }
-    }
-
-    public double getLoad() {
-        return load;
-    }
-
-    public void setLoad(double load) {
-        this.load = load;
-    }
-
-}