You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ni...@apache.org on 2013/11/27 19:55:21 UTC

[24/26] git commit: auto-scaler refactoring v0.1

auto-scaler refactoring v0.1


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/21aadd0a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/21aadd0a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/21aadd0a

Branch: refs/heads/master
Commit: 21aadd0a108c4a6f269a3b0eabfc0b1a2d7f2db1
Parents: ffaaeee
Author: Nirmal Fernando <ni...@apache.org>
Authored: Thu Nov 28 00:14:24 2013 +0530
Committer: Nirmal Fernando <ni...@apache.org>
Committed: Thu Nov 28 00:14:24 2013 +0530

----------------------------------------------------------------------
 .../stratos/autoscaler/ClusterContext.java      |  54 ++++-
 .../stratos/autoscaler/ClusterMonitor.java      | 154 +++++++++++++
 .../stratos/autoscaler/PartitionContext.java    |  77 +++++++
 .../algorithm/AutoscaleAlgorithm.java           |  15 +-
 .../autoscaler/algorithm/OneAfterAnother.java   |   4 +-
 .../PartitionGroupOneAfterAnother.java          |   4 +-
 .../autoscaler/algorithm/RoundRobin.java        |   4 +-
 .../cloud/controller/CloudControllerClient.java |  88 +++++---
 .../autoscaler/exception/SpawningException.java |  12 +-
 .../exception/TerminationException.java         |   4 +
 .../internal/AutoscalerServerComponent.java     |  26 ++-
 .../health/HealthEventMessageDelegator.java     |  54 ++---
 .../autoscaler/policy/PolicyManager.java        |   2 +-
 .../deployers/DeploymentPolicyDeployer.java     |   2 +-
 .../deployers/DeploymentPolicyReader.java       |   6 +-
 .../rule/AutoscalerRuleEvaluator.java           | 133 ++++++-----
 .../autoscaler/rule/ExecutorTaskScheduler.java  |  58 ++---
 .../processors/AutoscalerTopologyReceiver.java  | 223 +++++++++++++++++++
 .../topology/processors/TopologyReceiver.java   |  80 +++++++
 .../stratos/autoscaler/util/AutoscalerUtil.java |  63 +++---
 .../stratos/autoscaler/TestKnowledgeBase.java   |  62 +++++-
 .../src/test/resources/autoscaler.drl           | 124 +++++++++++
 .../test/resources/minimum-autoscaler-rule.drl  |  62 ++++++
 .../resources/test-minimum-autoscaler-rule.drl  |  60 +++++
 24 files changed, 1163 insertions(+), 208 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
index 10e09f7..ce3249b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterContext.java
@@ -20,9 +20,15 @@
 package org.apache.stratos.autoscaler;
 
 import java.util.HashMap;
+import java.util.List;
 import java.util.Map;
 import java.util.Properties;
 
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
 /**
  * Defines cluster context properties.
  */
@@ -39,9 +45,14 @@ public class ClusterContext {
     private float requestsInFlightGradient;
     
     private int memberCount;
+    
+    private StatefulKnowledgeSession ksession;
+    private FactHandle facthandle;
 
     //This map will keep number of currently spawned instance count against partitionId
     private Map<String, Integer> partitionCountMap;
+    
+    private List<Partition> partitionsOfThisCluster;
 
     private int currentPartitionIndex;
     private int currentPartitionGroupIndex;
@@ -49,13 +60,22 @@ public class ClusterContext {
     private Properties properties;
 
     private Map<String, MemberContext> memberContextMap;
+    private DeploymentPolicy deploymentPolicy;
 
-    public ClusterContext(String clusterId, String serviceId) {
+    public ClusterContext(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy) {
 
         this.clusterId = clusterId;
         this.serviceId = serviceId;
+        this.setDeploymentPolicy(deploymentPolicy);
+        if (deploymentPolicy != null) {
+            this.setPartitionsOfThisCluster(deploymentPolicy.getAllPartitions());
+        }
         memberContextMap = new HashMap<String, MemberContext>();
         partitionCountMap = new HashMap<String, Integer>();
+        
+        for (Partition partition : partitionsOfThisCluster) {
+            this.addPartitionCount(partition.getId(), 0);
+        }
         memberCount = 0;
     }
 
@@ -192,4 +212,36 @@ public class ClusterContext {
     public void setCurrentPartitionGroupIndex(int currentPartitionGroupIndex) {
         this.currentPartitionGroupIndex = currentPartitionGroupIndex;
     }
+
+    public List<Partition> getAllPartitions() {
+        return partitionsOfThisCluster;
+    }
+
+    public void setPartitionsOfThisCluster(List<Partition> partitionsOfThisCluster) {
+        this.partitionsOfThisCluster = partitionsOfThisCluster;
+    }
+
+    public StatefulKnowledgeSession getKsession() {
+        return ksession;
+    }
+
+    public void setKsession(StatefulKnowledgeSession ksession) {
+        this.ksession = ksession;
+    }
+
+    public FactHandle getFacthandle() {
+        return facthandle;
+    }
+
+    public void setFacthandle(FactHandle facthandle) {
+        this.facthandle = facthandle;
+    }
+
+    public DeploymentPolicy getDeploymentPolicy() {
+        return deploymentPolicy;
+    }
+
+    public void setDeploymentPolicy(DeploymentPolicy deploymentPolicy) {
+        this.deploymentPolicy = deploymentPolicy;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java
new file mode 100644
index 0000000..f795baa
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/ClusterMonitor.java
@@ -0,0 +1,154 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler;
+
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.rule.FactHandle;
+
+/**
+ * Is responsible for monitoring a service cluster. This runs periodically
+ * and perform minimum instance check and scaling check using the underlying
+ * rules engine.
+ * @author nirmal
+ *
+ */
+public class ClusterMonitor implements Runnable{
+
+    private String clusterId;
+    private ClusterContext clusterCtxt;
+    private List<MemberContext> memberCtxt;
+    private Map<String, PartitionContext> partitionCtxts;
+    private StatefulKnowledgeSession ksession;
+    private boolean isDestroyed;
+    
+    private FactHandle facthandle;
+    
+    public ClusterMonitor(String clusterId, ClusterContext ctxt, StatefulKnowledgeSession ksession) {
+        this.clusterId = clusterId;
+        this.clusterCtxt = ctxt;
+        this.ksession = ksession;
+        partitionCtxts = new ConcurrentHashMap<String, PartitionContext>();
+    }
+
+    public String getClusterId() {
+        return clusterId;
+    }
+
+    public void setClusterId(String clusterId) {
+        this.clusterId = clusterId;
+    }
+
+    public ClusterContext getClusterCtxt() {
+        return clusterCtxt;
+    }
+
+    public void setClusterCtxt(ClusterContext clusterCtxt) {
+        this.clusterCtxt = clusterCtxt;
+    }
+
+    public List<MemberContext> getMemberCtxt() {
+        return memberCtxt;
+    }
+
+    public void setMemberCtxt(List<MemberContext> memberCtxt) {
+        this.memberCtxt = memberCtxt;
+    }
+
+    public Map<String, PartitionContext> getPartitionCtxt() {
+        return partitionCtxts;
+    }
+
+    public void setPartitionCtxt(Map<String, PartitionContext> partitionCtxt) {
+        this.partitionCtxts = partitionCtxt;
+    }
+    
+    public void addPartitionCtxt(PartitionContext ctxt) {
+        this.partitionCtxts.put(ctxt.getPartitionId(), ctxt);
+    }
+    
+    public PartitionContext getPartitionCtxt(String id) {
+        return this.partitionCtxts.get(id);
+    }
+
+    public StatefulKnowledgeSession getKsession() {
+        return ksession;
+    }
+
+    public void setKsession(StatefulKnowledgeSession ksession) {
+        this.ksession = ksession;
+    }
+
+    public FactHandle getFacthandle() {
+        return facthandle;
+    }
+
+    public void setFacthandle(FactHandle facthandle) {
+        this.facthandle = facthandle;
+    }
+
+    @Override
+    public void run() {
+
+        while (!isDestroyed()) {
+            minInstanceCountCheck();
+            // TODO scale
+            try {
+                // TODO make this configurable
+                Thread.sleep(30000);
+            } catch (InterruptedException ignore) {
+            }
+        }
+    }
+    
+    private void minInstanceCountCheck() {
+        if(clusterCtxt != null ) {
+            ksession.setGlobal("clusterId", clusterId);
+            //TODO make this concurrent
+            for (Partition partition : clusterCtxt.getAllPartitions()) {
+                String id = partition.getId();
+                PartitionContext ctxt = partitionCtxts.get(id);
+                if(ctxt == null) {
+                    ctxt = new PartitionContext(partition);
+                }
+                ctxt.setMinimumMemberCount(partition.getPartitionMembersMin());
+                
+                AutoscalerRuleEvaluator.evaluate(ksession, facthandle, ctxt);
+            }
+        }
+    }
+
+    public void destroy() {
+        ksession.dispose();
+        setDestroyed(true);
+    }
+
+    public boolean isDestroyed() {
+        return isDestroyed;
+    }
+
+    public void setDestroyed(boolean isDestroyed) {
+        this.isDestroyed = isDestroyed;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java
new file mode 100644
index 0000000..3579bcf
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/PartitionContext.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.stratos.messaging.domain.policy.Partition;
+
+/**
+ * @author nirmal
+ *
+ */
+public class PartitionContext {
+
+    private String partitionId;
+    private Partition partition;
+    private int currentMemberCount = 0;
+    private int minimumMemberCount = 0;
+    private List<String> memberIds = new ArrayList<String>();
+    
+    public PartitionContext(Partition partition) {
+        this.setPartition(partition);
+        this.partitionId = partition.getId();
+    }
+    
+    public String getPartitionId() {
+        return partitionId;
+    }
+    public void setPartitionId(String partitionId) {
+        this.partitionId = partitionId;
+    }
+    public int getCurrentMemberCount() {
+        return currentMemberCount;
+    }
+    public void incrementCurrentMemberCount(int count) {
+        this.currentMemberCount += count;
+    }
+    public List<String> getMemberIds() {
+        return memberIds;
+    }
+    public void setMemberIds(List<String> memberIds) {
+        this.memberIds = memberIds;
+    }
+
+    public int getMinimumMemberCount() {
+        return minimumMemberCount;
+    }
+
+    public void setMinimumMemberCount(int minimumMemberCount) {
+        this.minimumMemberCount = minimumMemberCount;
+    }
+
+    public Partition getPartition() {
+        return partition;
+    }
+
+    public void setPartition(Partition partition) {
+        this.partition = partition;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
index aacc6bf..3a3fef5 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/AutoscaleAlgorithm.java
@@ -19,16 +19,19 @@
 
 package org.apache.stratos.autoscaler.algorithm;
 
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
 
 /**
  *
  */
 public interface AutoscaleAlgorithm {
 
-public boolean scaleUpPartitionAvailable(String clusterId);
-public boolean scaleDownPartitionAvailable(String clusterId);
-public Partition getNextScaleUpPartition(PartitionGroup partition,String clusterId);
-public Partition getNextScaleDownPartition(PartitionGroup partition,String clusterId);
+    public boolean scaleUpPartitionAvailable(String clusterId);
+
+    public boolean scaleDownPartitionAvailable(String clusterId);
+
+    public Partition getNextScaleUpPartition(PartitionGroup partition, String clusterId);
+
+    public Partition getNextScaleDownPartition(PartitionGroup partition, String clusterId);
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
index 3c06c13..309c2c9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/OneAfterAnother.java
@@ -25,8 +25,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
 
 /**
  * Completes partitions in the order defined in autoscaler policy, go to next if current one reached the max limit

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
index dd47678..c95d621 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/PartitionGroupOneAfterAnother.java
@@ -27,8 +27,8 @@ import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.ClusterContext;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
 import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
 
 /**

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
index d75f11a..47e72d4 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/algorithm/RoundRobin.java
@@ -23,8 +23,8 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.ClusterContext;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
 
 import java.util.List;
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
index 00ee1cb..2536007 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/client/cloud/controller/CloudControllerClient.java
@@ -25,9 +25,13 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.Constants;
 import org.apache.stratos.autoscaler.exception.SpawningException;
 import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.policy.model.Partition;
 import org.apache.stratos.autoscaler.util.ConfUtil;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceIllegalArgumentExceptionException;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidCartridgeTypeExceptionException;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceInvalidMemberExceptionException;
 import org.apache.stratos.cloud.controller.stub.CloudControllerServiceStub;
+import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException;
+import org.apache.stratos.messaging.domain.policy.Partition;
 
 import java.rmi.RemoteException;
 
@@ -38,9 +42,24 @@ import java.rmi.RemoteException;
 public class CloudControllerClient {
 
     private static final Log log = LogFactory.getLog(CloudControllerClient.class);
-    private CloudControllerServiceStub stub;
+    private static CloudControllerServiceStub stub;
+    private static CloudControllerClient instance;
     
-    public CloudControllerClient(){
+    public static CloudControllerClient getInstance() {
+
+        if (instance == null) {
+            synchronized (CloudControllerClient.class) {
+                
+                if(instance == null) {
+                    instance = new CloudControllerClient();
+                }
+            }
+        }
+
+        return instance;
+    }
+    
+    private CloudControllerClient(){
     	try {
             XMLConfiguration conf = ConfUtil.getInstance().getConfiguration();
             int port = conf.getInt("autoscaler.cloudController.port", Constants.CLOUD_CONTROLLER_DEFAULT_PORT);
@@ -58,53 +77,54 @@ public class CloudControllerClient {
         log.info("Calling CC for spawning instances in cluster " + clusterId);
         log.info("Member count to be increased: " + memberCountToBeIncreased);
 
-        org.apache.stratos.messaging.domain.topology.xsd.Partition partitionTopology = new
-                org.apache.stratos.messaging.domain.topology.xsd.Partition();
-        partitionTopology.setId(partition.getId());
-
-        try {
-            for(int i =0; i< memberCountToBeIncreased; i++){
-                stub.startInstance(clusterId, partitionTopology);
-            }
-        } catch (RemoteException e) {
-            log.error("Error occurred in cloud controller side while spawning instance");
+        for(int i =0; i< memberCountToBeIncreased; i++){
+            spawnAnInstance(partition, clusterId);
         }
+        
     }
 
-    public void spawnAnInstance(Partition partition, String clusterId) throws SpawningException {
+    public static void spawnAnInstance(Partition partition, String clusterId) throws SpawningException {
 
-        log.info("Calling CC for spawning an instance in cluster " + clusterId);
-        org.apache.stratos.messaging.domain.topology.xsd.Partition partitionTopology = new
-                org.apache.stratos.messaging.domain.topology.xsd.Partition();
+        org.apache.stratos.messaging.domain.policy.xsd.Partition partitionTopology = new
+                org.apache.stratos.messaging.domain.policy.xsd.Partition();
         partitionTopology.setId(partition.getId());
-        /*locationScope.setCloud(partition.getIaas());
-        locationScope.setRegion(partition.getZone());*/
-
+        
         try {
             stub.startInstance(clusterId, partitionTopology);
+        } catch (CloudControllerServiceIllegalArgumentExceptionException e) {
+            log.error(e.getMessage());
+            throw new SpawningException(e);
+        } catch (CloudControllerServiceUnregisteredCartridgeExceptionException e) {
+            log.error(e.getMessage());
+            throw new SpawningException(e);
         } catch (RemoteException e) {
-
-            log.error("Error occurred in cloud controller side while spawning instance");
-
+            String msg = "Error occurred in cloud controller side while spawning instance";
+            log.error(msg);
+            throw new SpawningException(msg, e);
         }
     }
 
-    public void terminate(Partition partition, String clusterId) throws TerminationException {
+    public void terminate(String memberId) throws TerminationException {
         //call CC terminate method
 
-        log.info("Calling CC for terminating an instance in cluster " + clusterId);
-        org.apache.stratos.messaging.domain.topology.xsd.Partition partitionTopology = new
-                org.apache.stratos.messaging.domain.topology.xsd.Partition();
-        partitionTopology.setId(partition.getId());
-        /*locationScope.setCloud(partition.getIaas());
-            locationScope.setRegion(partition.getZone());*/
+        log.info("Calling CC for terminating member with id: " + memberId);
 
         try {
-            stub.terminateInstance(clusterId, partitionTopology);
+            stub.terminateInstance(memberId);
         } catch (RemoteException e) {
-
-            log.error("Error occurred in cloud controller side while terminating instance");
-
+            String msg = "Error occurred in cloud controller side while terminating instance";
+            log.error(msg, e);
+            throw new TerminationException(msg, e);
+
+        } catch (CloudControllerServiceIllegalArgumentExceptionException e) {
+            log.error(e.getMessage());
+            throw new TerminationException(e);
+        } catch (CloudControllerServiceInvalidMemberExceptionException e) {
+            log.error(e.getMessage());
+            throw new TerminationException(e);
+        } catch (CloudControllerServiceInvalidCartridgeTypeExceptionException e) {
+            log.error(e.getMessage());
+            throw new TerminationException(e);
         }
     }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
index bf46354..6b228b9 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/SpawningException.java
@@ -1,15 +1,19 @@
 package org.apache.stratos.autoscaler.exception;
 
-import java.rmi.RemoteException;
-
 /**
  *
  */
 public class SpawningException extends Exception {
 
-    public SpawningException(String exception, RemoteException message){
-        super(exception, message);
+    private static final long serialVersionUID = 4761501174753405374L;
+
+
+    public SpawningException(String message, Exception exception){
+        super(message, exception);
     }
 
 
+    public SpawningException(Exception exception){
+        super(exception);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
index efb3f21..8c7ede7 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/exception/TerminationException.java
@@ -7,4 +7,8 @@ public class TerminationException extends Throwable {
     public TerminationException(String s, Exception e) {
         super(s, e);
     }
+    
+    public TerminationException(Exception e) {
+        super(e);
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 35e79c5..4969670 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -23,6 +23,8 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageDelegator;
 import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageReceiver;
 import org.apache.stratos.autoscaler.rule.ExecutorTaskScheduler;
+import org.apache.stratos.autoscaler.topology.processors.AutoscalerTopologyReceiver;
+import org.apache.stratos.autoscaler.topology.processors.TopologyReceiver;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageReceiver;
@@ -42,18 +44,20 @@ public class AutoscalerServerComponent {
     protected void activate(ComponentContext componentContext) throws Exception {
 
         // Subscribe to all topics
-        TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
-        topologyTopicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
-        Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber);
-        topologyTopicSubscriberThread.start();
-        if (log.isDebugEnabled()) {
-            log.debug("Topology event message receiver thread started");
-        }
-
-        TopologyEventMessageDelegator tropologyEventMessageDelegator = new TopologyEventMessageDelegator();
-        Thread tropologyDelegatorThread = new Thread(tropologyEventMessageDelegator);
-        tropologyDelegatorThread.start();
+//        TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+//        topologyTopicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
+//        Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber);
+//        topologyTopicSubscriberThread.start();
+//        if (log.isDebugEnabled()) {
+//            log.debug("Topology event message receiver thread started");
+//        }
+//
+//        TopologyEventMessageDelegator tropologyEventMessageDelegator = new TopologyEventMessageDelegator();
+//        Thread tropologyDelegatorThread = new Thread(tropologyEventMessageDelegator);
+//        tropologyDelegatorThread.start();
 
+        Thread th = new Thread(new AutoscalerTopologyReceiver());
+        th.start();
         if (log.isDebugEnabled()) {
             log.debug("Topology message processor thread started");
         }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
index 5057553..5c57135 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
@@ -19,6 +19,7 @@
 package org.apache.stratos.autoscaler.message.receiver.health;
 
 import com.google.gson.stream.JsonReader;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
@@ -27,11 +28,13 @@ import org.apache.stratos.autoscaler.Constants;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
 import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
 import javax.jms.TextMessage;
+
 import java.io.BufferedReader;
 import java.io.StringReader;
 
@@ -61,31 +64,32 @@ public class HealthEventMessageDelegator implements Runnable {
                 log.info(clusterId);
                 log.info(value);
                 log.info(eventName);
-                for (Service service :  TopologyManager.getTopology().getServices()){
-
-                    if(service.clusterExists(clusterId)){
-
-                        if(!AutoscalerContext.getInstance().clusterExists(clusterId)){
-
-                            Cluster cluster = service.getCluster(clusterId);
-                            AutoscalePolicy autoscalePolicy = PolicyManager.getInstance().getAutoscalePolicy(cluster.getAutoscalePolicyName());
-
-                            ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName());
-
-                            LoadThresholds loadThresholds = autoscalePolicy.getLoadThresholds();
-                            float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
-                            float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
-                            float secondDerivative  = loadThresholds.getRequestsInFlight().getSecondDerivative();
-
-                            clusterContext.setAverageRequestsInFlight(averageLimit);
-                            clusterContext.setRequestsInFlightGradient(gradientLimit);
-                            clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
-
-                            AutoscalerContext.getInstance().addClusterContext(clusterContext);
-                        }
-                        break;
-                    }
-                }
+//                for (Service service :  TopologyManager.getTopology().getServices()){
+//
+//                    if(service.clusterExists(clusterId)){
+//
+//                        if(!AutoscalerContext.getInstance().clusterExists(clusterId)){
+//
+//                            Cluster cluster = service.getCluster(clusterId);
+//                            AutoscalePolicy autoscalePolicy = PolicyManager.getInstance().getAutoscalePolicy(cluster.getAutoscalePolicyName());
+//                            DeploymentPolicy deploymentPolicy = PolicyManager.getInstance().getDeploymentPolicy(cluster.getDeploymentPolicyName());
+//
+//                            ClusterContext clusterContext = new ClusterContext(clusterId, service.getServiceName(), deploymentPolicy.getAllPartitions());
+//
+//                            LoadThresholds loadThresholds = autoscalePolicy.getLoadThresholds();
+//                            float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
+//                            float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
+//                            float secondDerivative  = loadThresholds.getRequestsInFlight().getSecondDerivative();
+//
+//                            clusterContext.setAverageRequestsInFlight(averageLimit);
+//                            clusterContext.setRequestsInFlightGradient(gradientLimit);
+//                            clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+//
+//                            AutoscalerContext.getInstance().addClusterContext(clusterContext);
+//                        }
+//                        break;
+//                    }
+//                }
                 if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){
                     AutoscalerContext.getInstance().getClusterContext(clusterId).setAverageRequestsInFlight(value);
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
index 093d0b8..2630f98 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/PolicyManager.java
@@ -29,7 +29,7 @@ import java.util.Map;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
 
 /**
  * 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
index d74a384..a219088 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyDeployer.java
@@ -29,7 +29,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.policy.InvalidPolicyException;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
 
 /**
  * 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
index ab8f106..603742b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/policy/deployers/DeploymentPolicyReader.java
@@ -29,9 +29,9 @@ import org.apache.axis2.deployment.DeploymentException;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.policy.InvalidPolicyException;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.Partition;
 
 /**
  * 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
index 538fb22..a360c52 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/AutoscalerRuleEvaluator.java
@@ -20,13 +20,18 @@
 package org.apache.stratos.autoscaler.rule;
 
 import java.io.File;
+import java.util.HashMap;
+import java.util.Map;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.ClusterContext;
+import org.apache.stratos.autoscaler.ClusterMonitor;
+import org.apache.stratos.autoscaler.PartitionContext;
 import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
-import org.apache.stratos.autoscaler.policy.model.Partition;
+import org.apache.stratos.messaging.domain.policy.Partition;
 import org.apache.stratos.messaging.domain.topology.Service;
 import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 import org.drools.KnowledgeBase;
@@ -43,6 +48,7 @@ import org.apache.stratos.autoscaler.algorithm.RoundRobin;
 import org.apache.stratos.autoscaler.util.AutoscalerUtil;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.autoscaler.algorithm.PartitionGroupOneAfterAnother;
+import org.drools.runtime.rule.FactHandle;
 
 /**
  * This class is responsible for evaluating the current details of topology, statistics, and health
@@ -54,50 +60,63 @@ public class AutoscalerRuleEvaluator {
 	
 	private static AutoscalerRuleEvaluator instance = null;
 	private static final String DRL_FILE_NAME = "autoscaler.drl";
-	
-	private KnowledgeBase kbase;
-	private StatefulKnowledgeSession ksession;
+	private Map<String, ClusterMonitor> monitors;
+	private static KnowledgeBase kbase;
 
 	private AutoscalerRuleEvaluator() {
         try {
             kbase = readKnowledgeBase();
+            setMonitors(new HashMap<String, ClusterMonitor>());
         } catch (Exception e) {
             log.error("Rule evaluate error", e);
         }
     }
     
     
-    public void evaluate(Service service) throws Exception{
-        try {
+    public static void evaluate(StatefulKnowledgeSession ksession, FactHandle handle, Object obj) {
 
-            for (Cluster cluster: service.getClusters()){
-                //update cluster-context
-            	cluster.setDeploymentPolicyName("economy-deployment");
-                AutoscalerUtil.updateClusterContext(cluster);                
-            }
+        if (handle == null) {
 
-            ksession = kbase.newStatefulKnowledgeSession();
-            ksession.setGlobal("$context", AutoscalerContext.getInstance());
-            ksession.setGlobal("log", log);
-            ksession.setGlobal("$manager", PolicyManager.getInstance());
-            ksession.setGlobal("$topology", TopologyManager.getTopology());
-            ksession.setGlobal("$evaluator", this);
-			ksession.insert(service);
-			ksession.fireAllRules();
-		} catch (Exception e) {
-			throw new Exception("Rule evaluate error", e);
-		}
+            handle = ksession.insert(obj);
+        } else {
+            ksession.update(handle, obj);
+        }
+        ksession.fireAllRules();
+
+    }
+
+    public void addMonitor(ClusterMonitor monitor) {
+        monitors.put(monitor.getClusterId(), monitor);
     }
     
-	public boolean delegateSpawn(Partition partition, String clusterId) {
-		CloudControllerClient cloudControllerClient = new CloudControllerClient();
+    public ClusterMonitor getMonitor(String clusterId) {
+        return monitors.get(clusterId);
+    }
+    
+    public ClusterMonitor removeMonitor(String clusterId) {
+        return monitors.remove(clusterId);
+    }
+
+    public StatefulKnowledgeSession getStatefulSession() {
+        StatefulKnowledgeSession ksession;
+        ksession = kbase.newStatefulKnowledgeSession();
+//        ksession.setGlobal("$partitions", ctxt.getPartitionsOfThisCluster());
+//        ksession.setGlobal("log", log);
+//        ksession.setGlobal("$manager", PolicyManager.getInstance());
+//        ksession.setGlobal("$topology", TopologyManager.getTopology());
+//        ksession.setGlobal("$evaluator", this);
+        return ksession;
+//        ksession.insert(clusterCtxt);
+//        ksession.fireAllRules();
+    }
+    
+	public static boolean delegateSpawn(Partition partition, String clusterId) {
 		try {
-            int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+//            int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
 
-            if(currentMemberCount < partition.getPartitionMembersMax())       {
-                AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(1);
-    			cloudControllerClient.spawnAnInstance(partition, clusterId);
-            }
+//            if(currentMemberCount < partition.getPartitionMembersMax())       {
+//                AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(1);
+    			CloudControllerClient.spawnAnInstance(partition, clusterId);
 
 		} catch (Throwable e) {
 			log.error("Cannot spawn an instance", e);
@@ -106,15 +125,15 @@ public class AutoscalerRuleEvaluator {
 	}
 
 	public boolean delegateTerminate(Partition partition, String clusterId) {
-		CloudControllerClient cloudControllerClient = new CloudControllerClient();
 		try {
 
-            int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
-            log.info("Current member count is " + currentMemberCount );
-            if(currentMemberCount > partition.getPartitionMembersMin())       {
-                AutoscalerContext.getInstance().getClusterContext(clusterId).decreaseMemberCount();
-                cloudControllerClient.terminate(partition, clusterId);
-            }
+//            int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+//            log.info("Current member count is " + currentMemberCount );
+//            if(currentMemberCount > partition.getPartitionMembersMin())       {
+//                AutoscalerContext.getInstance().getClusterContext(clusterId).decreaseMemberCount();
+               //FIXME
+//                cloudControllerClient.terminate(partition, clusterId);
+//            }
 			return true;
 		} catch (Throwable e) {
 			log.error("Cannot terminate instance", e);
@@ -122,22 +141,22 @@ public class AutoscalerRuleEvaluator {
 		return false;
 	}
 
-	public boolean delegateSpawn(Partition partition, String clusterId, int memberCountToBeIncreased) {
-		CloudControllerClient cloudControllerClient = new CloudControllerClient();
-		try {
-            int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
-            log.info("Current member count is " + currentMemberCount );
-
-            if(currentMemberCount < partition.getPartitionMembersMax()) {
-                AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(memberCountToBeIncreased);
-                cloudControllerClient.spawnInstances(partition, clusterId, memberCountToBeIncreased);
-            }
-			return true;
-		} catch (Throwable e) {
-			log.error("Cannot spawn an instance", e);
-		}
-		return false;
-	}
+//	public boolean delegateSpawn(Partition partition, String clusterId, int memberCountToBeIncreased) {
+//		CloudControllerClient cloudControllerClient = new CloudControllerClient();
+//		try {
+//            int currentMemberCount = AutoscalerContext.getInstance().getClusterContext(clusterId).getMemberCount();
+//            log.info("Current member count is " + currentMemberCount );
+//
+//            if(currentMemberCount < partition.getPartitionMembersMax()) {
+//                AutoscalerContext.getInstance().getClusterContext(clusterId).increaseMemberCount(memberCountToBeIncreased);
+//                cloudControllerClient.spawnInstances(partition, clusterId, memberCountToBeIncreased);
+//            }
+//			return true;
+//		} catch (Throwable e) {
+//			log.error("Cannot spawn an instance", e);
+//		}
+//		return false;
+//	}
 
     public static synchronized AutoscalerRuleEvaluator getInstance() {
             if (instance == null) {
@@ -184,4 +203,14 @@ public class AutoscalerRuleEvaluator {
     {
     	return new PartitionGroupOneAfterAnother().getNextScaleDownPartition(clusterID);
     }
+
+
+    public Map<String, ClusterMonitor> getMonitors() {
+        return monitors;
+    }
+
+
+    public void setMonitors(Map<String, ClusterMonitor> monitors) {
+        this.monitors = monitors;
+    }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
index 5722a57..fe79a59 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/rule/ExecutorTaskScheduler.java
@@ -55,34 +55,36 @@ public class ExecutorTaskScheduler implements Runnable {
         final Runnable rulesEvaluator = new Runnable() {
             public void run() {
 
-                try {
-                    for (Service service : TopologyManager.getTopology().getServices()) {
-
-                        AutoscalerRuleEvaluator.getInstance().evaluate(service);
-                    }
-
-                    // Remove cluster context if its already removed from Topology
-                    for (String clusterContextId : AutoscalerContext.getInstance().getClusterContexes().keySet()) {
-                        boolean clusterAvailable = false;
-                        for (Service service : TopologyManager.getTopology().getServices()) {
-                            for (Cluster cluster : service.getClusters()) {
-                                if (cluster.getClusterId().equals(clusterContextId)) {
-
-                                    clusterAvailable = true;
-                                }
-                            }
-                        }
-
-                        if (!clusterAvailable) {
-                            AutoscalerContext.getInstance().removeClusterContext(clusterContextId);
-                        }
-                    }
-
-                } catch (Exception e) {
-                    log.error("Error", e);
-                    log.debug("Shutting down rule scheduler");
-                    ex.shutdownNow();
-                }
+//                try {
+//                    for (Service service : TopologyManager.getTopology().getServices()) {
+//
+//                        AutoscalerRuleEvaluator.getInstance().evaluate(service);
+//                    }
+//
+//                    // Remove cluster context if its already removed from Topology
+//                    for (String clusterContextId : AutoscalerContext.getInstance().getClusterContexes().keySet()) {
+//                        boolean clusterAvailable = false;
+//                        for (Service service : TopologyManager.getTopology().getServices()) {
+//                            for (Cluster cluster : service.getClusters()) {
+//                                if (cluster.getClusterId().equals(clusterContextId)) {
+//
+//                                    clusterAvailable = true;
+//                                }
+//                            }
+//                        }
+//
+//                        if (!clusterAvailable) {
+//                            AutoscalerContext.getInstance().removeClusterContext(clusterContextId);
+//                        }
+//                    }
+//
+//                } catch (Exception e) {
+//                    String msg = "Error while evaluating rules.";
+//                    log.error(msg, e);
+//                    throw new RuntimeException(msg, e);
+////                    log.debug("Shutting down rule scheduler");
+////                    ex.shutdownNow();
+//                }
             }
         };
         

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
new file mode 100644
index 0000000..ca79578
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/AutoscalerTopologyReceiver.java
@@ -0,0 +1,223 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.topology.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.ClusterContext;
+import org.apache.stratos.autoscaler.ClusterMonitor;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.listener.topology.ClusterCreatedEventListener;
+import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
+import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
+import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
+import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
+import org.apache.stratos.messaging.event.topology.*;
+import org.apache.stratos.messaging.message.processor.topology.TopologyEventProcessorChain;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.Collection;
+
+/**
+ * Load balancer topology receiver.
+ */
+public class AutoscalerTopologyReceiver implements Runnable {
+
+    private static final Log log = LogFactory.getLog(AutoscalerTopologyReceiver.class);
+
+    private TopologyReceiver topologyReceiver;
+    private boolean terminated;
+
+    public AutoscalerTopologyReceiver() {
+        this.topologyReceiver = new TopologyReceiver(createMessageDelegator());
+    }
+
+    @Override
+    public void run() {
+        Thread thread = new Thread(topologyReceiver);
+        thread.start();
+        if(log.isInfoEnabled()) {
+            log.info("Load balancer topology receiver thread started");
+        }
+
+        // Keep the thread live until terminated
+        while (!terminated);
+        if(log.isInfoEnabled()) {
+            log.info("Load balancer topology receiver thread terminated");
+        }
+    }
+
+    private TopologyEventMessageDelegator createMessageDelegator() {
+        TopologyEventProcessorChain processorChain = createEventProcessorChain();
+        final TopologyEventMessageDelegator messageDelegator = new TopologyEventMessageDelegator(processorChain);
+        messageDelegator.addCompleteTopologyEventListener(new CompleteTopologyEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    TopologyManager.acquireReadLock();
+                    for(Service service : TopologyManager.getTopology().getServices()) {
+                        for(Cluster cluster : service.getClusters()) {
+                                addClusterToContext(cluster);
+                        }
+                    }
+                }
+                finally {
+                    TopologyManager.releaseReadLock();
+                }
+                // Complete topology is only consumed once, remove listener
+                messageDelegator.removeCompleteTopologyEventListener(this);
+            }
+
+        });
+        return messageDelegator;
+    }
+
+    private TopologyEventProcessorChain createEventProcessorChain() {
+        // Listen to topology events that affect clusters
+        TopologyEventProcessorChain processorChain = new TopologyEventProcessorChain();
+        processorChain.addEventListener(new ClusterCreatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    ClusterCreatedEvent e = (ClusterCreatedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    Service service = TopologyManager.getTopology().getService(e.getServiceName());
+                    Cluster cluster = service.getCluster(e.getClusterId());
+                    addClusterToContext(cluster);
+                }
+                finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+
+        });
+        
+        processorChain.addEventListener(new ClusterRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    ClusterRemovedEvent e = (ClusterRemovedEvent) event;
+                    TopologyManager.acquireReadLock();
+                    
+                    removeClusterFromContext(e.getClusterId());
+                }
+                finally {
+                    TopologyManager.releaseReadLock();
+                }
+            }
+
+        });
+        
+        processorChain.addEventListener(new MemberActivatedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+//                try {
+//                    TopologyManager.acquireReadLock();
+//
+//                    // Add cluster to the context when its first member is activated
+//                    MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent)event;
+//                    Cluster cluster = findCluster(memberActivatedEvent.getClusterId());
+//                    if(cluster == null) {
+//                        if(log.isErrorEnabled()) {
+//                            log.error(String.format("Cluster not found in topology: [cluster] %s", memberActivatedEvent.getClusterId()));
+//                        }
+//                    }
+//                    addClusterToContext(cluster);
+//                }
+//                finally {
+//                    TopologyManager.releaseReadLock();
+//                }
+            }
+        });
+        processorChain.addEventListener(new ServiceRemovedEventListener() {
+            @Override
+            protected void onEvent(Event event) {
+//                try {
+//                    TopologyManager.acquireReadLock();
+//
+//                    // Remove all clusters of given service from context
+//                    ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
+//                    for(Service service : TopologyManager.getTopology().getServices()) {
+//                        for(Cluster cluster : service.getClusters()) {
+//                            removeClusterFromContext(cluster.getHostName());
+//                        }
+//                    }
+//                }
+//                finally {
+//                    TopologyManager.releaseReadLock();
+//                }
+            }
+        });
+        return processorChain;
+    }
+
+    private void addClusterToContext(Cluster cluster) {
+        ClusterContext ctxt = AutoscalerUtil.getClusterContext(cluster);
+        AutoscalerRuleEvaluator ruleCtxt = AutoscalerRuleEvaluator.getInstance();
+        ClusterMonitor monitor =
+                                 new ClusterMonitor(cluster.getClusterId(), ctxt,
+                                                    ruleCtxt.getStatefulSession());
+        Thread th = new Thread(monitor);
+        th.start();
+        AutoscalerRuleEvaluator.getInstance().addMonitor(monitor);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("Cluster monitor has been added: [cluster] %s",
+                                    cluster.getClusterId()));
+        }
+    }
+
+    private void removeClusterFromContext(String clusterId) {
+        ClusterMonitor monitor = AutoscalerRuleEvaluator.getInstance().removeMonitor(clusterId);
+        monitor.destroy();
+            if(log.isDebugEnabled()) {
+                log.debug(String.format("Cluster monitor has been removed: [cluster] %s ", clusterId));
+            }
+    }
+
+    private Cluster findCluster(String clusterId) {
+        if(clusterId == null) {
+            return null;
+        }
+
+        Collection<Service> services = TopologyManager.getTopology().getServices();
+        for (Service service : services) {
+            for (Cluster cluster : service.getClusters()) {
+                if (clusterId.equals(cluster.getClusterId())) {
+                    return cluster;
+                }
+            }
+        }
+        return null;
+    }
+
+    /**
+     * Terminate load balancer topology receiver thread.
+     */
+    public void terminate() {
+        topologyReceiver.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java
new file mode 100644
index 0000000..09c4a30
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/topology/processors/TopologyReceiver.java
@@ -0,0 +1,80 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.topology.processors;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.message.processor.MessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageReceiver;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving topology information from message broker.
+ */
+public class TopologyReceiver implements Runnable {
+    private static final Log log = LogFactory.getLog(TopologyReceiver.class);
+    private TopologyEventMessageDelegator messageDelegator;
+    private TopicSubscriber topicSubscriber;
+    private boolean terminated;
+
+    public TopologyReceiver() {
+        this.messageDelegator = new TopologyEventMessageDelegator();
+    }
+
+    public TopologyReceiver(TopologyEventMessageDelegator messageDelegator) {
+        this.messageDelegator = messageDelegator;
+    }
+
+    @Override
+    public void run() {
+        try {
+            // Start topic subscriber thread
+            topicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+            topicSubscriber.setMessageListener(new TopologyEventMessageReceiver());
+            Thread subscriberThread = new Thread(topicSubscriber);
+            subscriberThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message receiver thread started");
+            }
+
+            // Start topology event message delegator thread
+            Thread receiverThread = new Thread(messageDelegator);
+            receiverThread.start();
+            if (log.isDebugEnabled()) {
+                log.debug("Topology event message delegator thread started");
+            }
+
+            // Keep the thread live until terminated
+            while (!terminated);
+        } catch (Exception e) {
+            if (log.isErrorEnabled()) {
+                log.error("Topology receiver failed", e);
+            }
+        }
+    }
+
+    public void terminate() {
+        topicSubscriber.terminate();
+        messageDelegator.terminate();
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index 16143f9..3b7cef8 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -19,14 +19,11 @@
 
 package org.apache.stratos.autoscaler.util;
 
-import org.apache.stratos.autoscaler.AutoscalerContext;
 import org.apache.stratos.autoscaler.ClusterContext;
 import org.apache.stratos.autoscaler.policy.PolicyManager;
 import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.policy.model.DeploymentPolicy;
 import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
-import org.apache.stratos.autoscaler.policy.model.Partition;
-import org.apache.stratos.autoscaler.policy.model.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 
 
@@ -44,40 +41,40 @@ public class AutoscalerUtil {
 	 * @param cluster
 	 * @return ClusterContext - Updated ClusterContext
 	 */
-	public static ClusterContext updateClusterContext(Cluster cluster) {
-		AutoscalerContext context = AutoscalerContext.getInstance();
-		ClusterContext clusterContext = context.getClusterContext(cluster.getClusterId());
-		if (null == clusterContext) {
+    public static ClusterContext getClusterContext(Cluster cluster) {
+        // FIXME fix the following code to correctly update
+        // AutoscalerContext context = AutoscalerContext.getInstance();
+        if (null == cluster) {
+            return null;
+        }
 
-			clusterContext = new ClusterContext(cluster.getClusterId(), cluster.getServiceName());
-			AutoscalePolicy policy = PolicyManager.getInstance().getAutoscalePolicy(cluster.getAutoscalePolicyName());
+        AutoscalePolicy policy =
+                                 PolicyManager.getInstance()
+                                              .getAutoscalePolicy(cluster.getAutoscalePolicyName());
+        DeploymentPolicy deploymentPolicy =
+                                            PolicyManager.getInstance()
+                                                         .getDeploymentPolicy(cluster.getDeploymentPolicyName());
 
-            if(policy!=null){
+        ClusterContext clusterContext =
+                                        new ClusterContext(cluster.getClusterId(),
+                                                           cluster.getServiceName(),
+                                                           deploymentPolicy);
 
-                //get values from policy
-                LoadThresholds loadThresholds = policy.getLoadThresholds();
-                float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
-                float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
-                float secondDerivative  = loadThresholds.getRequestsInFlight().getSecondDerivative();
+        if (policy != null) {
 
+            // get values from policy
+            LoadThresholds loadThresholds = policy.getLoadThresholds();
+            float averageLimit = loadThresholds.getRequestsInFlight().getAverage();
+            float gradientLimit = loadThresholds.getRequestsInFlight().getGradient();
+            float secondDerivative = loadThresholds.getRequestsInFlight().getSecondDerivative();
 
-                clusterContext.setRequestsInFlightGradient(gradientLimit);
-                clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
-                clusterContext.setAverageRequestsInFlight(averageLimit);
-                DeploymentPolicy deploymentPolicy = PolicyManager.getInstance().getDeploymentPolicy(cluster.getDeploymentPolicyName());
-                if(deploymentPolicy!=null){
-                	for(PartitionGroup group :deploymentPolicy.getPartitionGroups()){
-                		for (Partition partition : group.getPartitions()) {
-                            clusterContext.addPartitionCount(partition.getId(), 0);
-                    }
-                	}
-                }
-                
-             }
+            clusterContext.setRequestsInFlightGradient(gradientLimit);
+            clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+            clusterContext.setAverageRequestsInFlight(averageLimit);
 
-			context.addClusterContext(clusterContext);
-		}
-		return clusterContext;
-	}
+        }
+
+        return clusterContext;
+    }
 
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java b/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
index b719995..cd6476c 100644
--- a/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
+++ b/components/org.apache.stratos.autoscaler/src/test/java/org/apache/stratos/autoscaler/TestKnowledgeBase.java
@@ -19,19 +19,34 @@
 
 package org.apache.stratos.autoscaler;
 
+import java.util.ArrayList;
+import java.util.List;
+
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.drools.runtime.rule.FactHandle;
+import org.drools.KnowledgeBase;
+import org.drools.KnowledgeBaseFactory;
 import org.drools.builder.*;
 import org.drools.io.Resource;
 import org.drools.io.ResourceFactory;
+import org.drools.runtime.StatefulKnowledgeSession;
+import org.drools.runtime.StatelessKnowledgeSession;
+import org.junit.Before;
 import org.junit.Test;
 
 public class TestKnowledgeBase {
     private static final Log log = LogFactory.getLog(TestKnowledgeBase.class);
-    private String droolsFilePath = "../../products/autoscaler/modules/distribution/src/main/conf/autoscaler.drl";
+    private String droolsFilePath = "src/test/resources/test-minimum-autoscaler-rule.drl";
+    private KnowledgeBase kbase;
+    private StatefulKnowledgeSession ksession;
+    private StatelessKnowledgeSession ksession1;
 
-    @Test
-    public void testKnowledgeBase() {
+    @Before
+    public void setUp() {
         KnowledgeBuilder kbuilder = KnowledgeBuilderFactory.newKnowledgeBuilder();
         Resource resource = ResourceFactory.newFileResource(droolsFilePath);
         kbuilder.add(resource, ResourceType.DRL);
@@ -46,5 +61,46 @@ public class TestKnowledgeBase {
             }
             throw new IllegalArgumentException(String.format("Could not parse drools file: %s", droolsFilePath));
         }
+        
+        kbase = KnowledgeBaseFactory.newKnowledgeBase();
+        kbase.addKnowledgePackages(kbuilder.getKnowledgePackages());
+    }
+    
+    @Test
+    public void testMinimumRule() {
+        if(kbase == null) {
+            throw new IllegalArgumentException("Knowledge base is null.");
+        }
+        
+//        ksession1 = kbase.newStatelessKnowledgeSession();
+        ksession = kbase.newStatefulKnowledgeSession();
+        List<String> p = new ArrayList<String>();
+        p.add("aa");
+        p.add("bb");
+//        p.setId("pp");
+//        ksession.setGlobal("pa", p);
+//        ksession.setGlobal("log", log);
+//        ksession.setGlobal("$manager", PolicyManager.getInstance());
+//        ksession.setGlobal("$topology", TopologyManager.getTopology());
+//        ksession.setGlobal("$evaluator", this);
+//        ksession1.execute(p);
+//        FactHandle handle = ksession.insert(p);
+        ksession.insert(p);
+        ksession.fireAllRules();
+//        p = new Partition();
+//        p.setId("3");
+//        ksession.update(handle, p);
+//        ksession.fireAllRules();
+        try {
+            Thread.sleep(3000);
+        } catch (InterruptedException e) {
+            // TODO Auto-generated catch block
+            e.printStackTrace();
+        }
+//        System.err.println(p.getId());
+//        ksession1.execute(p);
+//        ksession.insert(p);
+//        ksession.execute(p);
+        
     }
 }
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/21aadd0a/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl b/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl
new file mode 100644
index 0000000..cf9982f
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/test/resources/autoscaler.drl
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.autoscaler.rule;
+
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.ClusterContext;
+import org.apache.stratos.autoscaler.Constants;
+import org.apache.stratos.autoscaler.policy.PolicyManager;
+import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
+import org.apache.stratos.messaging.domain.policy.Partition;
+import org.apache.stratos.messaging.domain.policy.PartitionGroup;
+import org.apache.stratos.messaging.domain.policy.DeploymentPolicy;
+import org.apache.stratos.autoscaler.policy.model.RequestsInFlight;
+import org.apache.stratos.autoscaler.policy.model.LoadThresholds;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.algorithm.AutoscaleAlgorithm;
+import org.apache.stratos.autoscaler.algorithm.OneAfterAnother;
+import org.apache.stratos.autoscaler.algorithm.RoundRobin;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.commons.logging.Log;
+
+global org.apache.stratos.autoscaler.policy.PolicyManager $manager;
+global org.apache.stratos.autoscaler.AutoscalerContext $context; 
+global org.apache.commons.logging.Log log;
+global org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator $evaluator;
+global org.apache.stratos.messaging.domain.topology.Topology $topology;
+
+
+rule "Minimum Rule"
+dialect "mvel"
+       when
+	       $service : Service () 
+	       $cluster : Cluster () from  $service.getClusters()
+	       $deploymentPolicy : DeploymentPolicy(id == $cluster.getDeploymentPolicyName() ) from $manager.getDeploymentPolicyList()
+	       $partitionGroup : PartitionGroup () from  $deploymentPolicy.getPartitionGroups()
+	       $partition : Partition () from $partitionGroup.getPartitions()
+	       $clusterContext : ClusterContext() from $context.getClusterContext($cluster.getClusterId())
+	       eval($clusterContext.getMemberCount($partition.getId()) < $partition.getPartitionMembersMin())
+
+
+       then
+	       int memberCountToBeIncreased = 1 ;
+	       if($evaluator.delegateSpawn($partition,$cluster.getClusterId(), memberCountToBeIncreased)){
+		   $clusterContext.increaseMemberCountInPartitionBy($partition.getId(), memberCountToBeIncreased);
+	       }
+end
+
+rule "Scaler-up Rule"
+dialect "mvel"
+	when
+        $service : Service ()
+        $cluster : Cluster () from  $service.getClusters()
+        $autoScalingPolicy : AutoscalePolicy(id == $cluster.getAutoscalePolicyName() ) from $manager.getAutoscalePolicyList()
+        $deploymentPolicy : DeploymentPolicy(id == $cluster.getDeploymentPolicyName() ) from $manager.getDeploymentPolicyList()
+        $partitionGroup : PartitionGroup () from  $deploymentPolicy.getPartitionGroups()
+        $clusterContext : ClusterContext() from $context.getClusterContext($cluster.getClusterId())
+        $loadThresholds :LoadThresholds() from  $autoScalingPolicy.getLoadThresholds()
+
+        autoscaleAlgorithm : AutoscaleAlgorithm() from  $evaluator.getAutoscaleAlgorithm($partitionGroup.getPartitionAlgo())
+        lbStatAverage : Float() from  $clusterContext.getAverageRequestsInFlight()
+        lbStatGradient : Float() from  $clusterContext.getRequestsInFlightGradient()
+        lbStatSecondDerivative : Float() from  $clusterContext.getRequestsInFlightSecondDerivative()
+        averageLimit : Float() from  $loadThresholds.getRequestsInFlight().getAverage()
+        gradientLimit : Float() from  $loadThresholds.getRequestsInFlight().getGradient()
+        secondDerivative  : Float() from  $loadThresholds.getRequestsInFlight().getSecondDerivative()
+        partition :  Partition() from autoscaleAlgorithm.getNextScaleUpPartition($partitionGroup, $cluster.getClusterId())
+        eval (lbStatAverage > averageLimit && lbStatGradient > gradientLimit)
+	then
+        int numberOfInstancesToBeSpawned = (lbStatSecondDerivative > secondDerivative) ? 2 : 1; //  take from a config
+        $evaluator.delegateSpawn(partition,$cluster.getClusterId(), numberOfInstancesToBeSpawned);
+        $clusterContext.setRequestsInFlightGradient(gradientLimit);
+        $clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+        $clusterContext.setAverageRequestsInFlight(averageLimit);
+end
+
+rule "Scaler-down Rule"
+dialect "mvel"
+	when
+	    $service : Service ()
+	    $cluster : Cluster () from  $service.getClusters()
+	    $autoScalingPolicy : AutoscalePolicy(id == $cluster.getAutoscalePolicyName() ) from $manager.getAutoscalePolicyList()
+        $deploymentPolicy : DeploymentPolicy(id == $cluster.getDeploymentPolicyName() ) from $manager.getDeploymentPolicyList()
+        $partitionGroup : PartitionGroup () from  $deploymentPolicy.getPartitionGroups()
+	    $clusterContext : ClusterContext() from $context.getClusterContext($cluster.getClusterId())
+        $loadThresholds :LoadThresholds() from  $autoScalingPolicy.getLoadThresholds()
+
+        autoscaleAlgorithm : AutoscaleAlgorithm() from  $evaluator.getAutoscaleAlgorithm($partitionGroup.getPartitionAlgo())
+        lbStatAverage : Float() from  $clusterContext.getAverageRequestsInFlight()
+        lbStatGradient : Float() from  $clusterContext.getRequestsInFlightGradient()
+        lbStatSecondDerivative : Float() from  $clusterContext.getRequestsInFlightSecondDerivative()
+        averageLimit : Float() from  $loadThresholds.getRequestsInFlight().getAverage()
+        gradientLimit : Float() from  $loadThresholds.getRequestsInFlight().getGradient()
+        secondDerivative  : Float() from  $loadThresholds.getRequestsInFlight().getSecondDerivative()
+        scaleDownSlowerMarginOfGradient : Float() from  $loadThresholds.getRequestsInFlight().getScaleDownMarginOfGradient()
+        scaleDownSlowerMarginOfSecondDerivative : Float() from  $loadThresholds.getRequestsInFlight().getScaleDownMarginOfSecondDerivative()
+        partition : Partition() from autoscaleAlgorithm.getNextScaleDownPartition($partitionGroup, $cluster.getClusterId())
+        eval(lbStatAverage < averageLimit  && lbStatGradient < gradientLimit - scaleDownSlowerMarginOfSecondDerivative
+                         && lbStatSecondDerivative < secondDerivative - scaleDownSlowerMarginOfSecondDerivative)
+	then
+        $evaluator.delegateTerminate(partition,$cluster.getClusterId());
+        $clusterContext.setRequestsInFlightGradient(gradientLimit);
+        $clusterContext.setRequestsInFlightSecondDerivative(secondDerivative);
+        $clusterContext.setAverageRequestsInFlight(averageLimit);
+end