You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2013/12/10 21:12:41 UTC

[1/8] git commit: retreieving data cluster from topology manager - initial stuff

Updated Branches:
  refs/heads/master ba3b50c32 -> 85abff74d


retreieving data cluster from topology manager - initial stuff


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2d201668
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2d201668
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2d201668

Branch: refs/heads/master
Commit: 2d201668e4c91788992e58ea72a1bb10fb6ecf65
Parents: 2dbf559
Author: Isuru <is...@wso2.com>
Authored: Sun Dec 8 10:15:44 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Sun Dec 8 10:15:44 2013 +0530

----------------------------------------------------------------------
 .../internal/ADCManagementServerComponent.java  |  6 +++++
 .../SubscriptionMultiTenantBehaviour.java       | 25 ++++++++++++++++----
 2 files changed, 27 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2d201668/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
index 13d75fe..a77cd48 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
@@ -28,6 +28,7 @@ import org.apache.stratos.adc.mgt.utils.StratosDBUtils;
 import org.apache.stratos.adc.topology.mgt.service.TopologyManagementService;
 import org.apache.stratos.messaging.broker.publish.EventPublisher;
 import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
 import org.apache.stratos.messaging.util.Constants;
 import org.osgi.service.component.ComponentContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
@@ -96,6 +97,11 @@ public class ADCManagementServerComponent {
             Thread tsubscriber = new Thread(subscriber);
 			tsubscriber.start();
 
+            //Starting Topology Receiver
+            TopologyReceiver topologyReceiver = new TopologyReceiver();
+            Thread topologyReceiverThread = new Thread(topologyReceiver);
+            topologyReceiverThread.start();
+
             if (log.isInfoEnabled()) {
                 log.info("ADC management server component is activated");
             }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2d201668/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/tenancy/SubscriptionMultiTenantBehaviour.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/tenancy/SubscriptionMultiTenantBehaviour.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/tenancy/SubscriptionMultiTenantBehaviour.java
index 7d7122c..a250912 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/tenancy/SubscriptionMultiTenantBehaviour.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/subscription/tenancy/SubscriptionMultiTenantBehaviour.java
@@ -25,13 +25,12 @@ import org.apache.stratos.adc.mgt.exception.ADCException;
 import org.apache.stratos.adc.mgt.exception.AlreadySubscribedException;
 import org.apache.stratos.adc.mgt.exception.NotSubscribedException;
 import org.apache.stratos.adc.mgt.exception.UnregisteredCartridgeException;
-import org.apache.stratos.adc.mgt.internal.DataHolder;
 import org.apache.stratos.adc.mgt.payload.PayloadArg;
 import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
 import org.apache.stratos.adc.mgt.utils.CartridgeConstants;
 import org.apache.stratos.adc.mgt.utils.PersistenceManager;
-import org.apache.stratos.adc.topology.mgt.service.TopologyManagementService;
-import org.apache.stratos.adc.topology.mgt.serviceobjects.DomainContext;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
 import java.util.Properties;
 
@@ -71,7 +70,7 @@ public class SubscriptionMultiTenantBehaviour extends SubscriptionTenancyBehavio
             }
         }
 
-        TopologyManagementService topologyService = DataHolder.getTopologyMgtService();
+        /*TopologyManagementService topologyService = DataHolder.getTopologyMgtService();
         DomainContext[] domainContexts = topologyService.getDomainsAndSubdomains(cartridgeSubscription.getType(),
                 cartridgeSubscription.getSubscriber().getTenantId());
         log.info("Retrieved " + domainContexts.length + " domain and corresponding subdomain pairs");
@@ -96,6 +95,24 @@ public class SubscriptionMultiTenantBehaviour extends SubscriptionTenancyBehavio
                     cartridgeSubscription.getSubscriber().getTenantId();
             log.warn(msg);
             throw new ADCException(msg);
+        }*/
+        TopologyManager.acquireReadLock();
+
+        try {
+            Service service = TopologyManager.getTopology().getService(cartridgeSubscription.getType());
+            if(service == null) {
+                TopologyManager.releaseReadLock();
+                String errorMsg = "Error in subscribing, no service found with the name " + cartridgeSubscription.getType();
+                log.error(errorMsg);
+                throw new ADCException(errorMsg);
+            }
+
+            //TODO: fix properly
+            //cartridgeSubscription.getCluster().setClusterDomain(service.getCluster().);
+            //cartridgeSubscription.getCluster().setClusterSubDomain(domainContext.getSubDomain());
+
+        } finally {
+            TopologyManager.releaseReadLock();
         }
     }
 


[4/8] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr

Posted by is...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/a0323f5a
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/a0323f5a
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/a0323f5a

Branch: refs/heads/master
Commit: a0323f5ab9cf852a0862bf28539b543307c17acb
Parents: c07f459 de73990
Author: Isuru <is...@wso2.com>
Authored: Tue Dec 10 15:38:49 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Tue Dec 10 15:38:49 2013 +0530

----------------------------------------------------------------------
 service-stubs/org.apache.stratos.autoscaler.service.stub/pom.xml | 3 ++-
 1 file changed, 2 insertions(+), 1 deletion(-)
----------------------------------------------------------------------



[6/8] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr

Posted by is...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/c00e48f9
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/c00e48f9
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/c00e48f9

Branch: refs/heads/master
Commit: c00e48f9fb83b8156ad91885aad493af48db3cf3
Parents: cbc482a f573c20
Author: Isuru <is...@wso2.com>
Authored: Tue Dec 10 20:48:15 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Tue Dec 10 20:48:15 2013 +0530

----------------------------------------------------------------------
 .../adc/mgt/client/AutoscalerServiceClient.java |  36 ++-
 .../mgt/subscription/CartridgeSubscription.java |   3 +-
 .../apache/stratos/autoscaler/Constants.java    |   3 +
 .../autoscaler/NetworkPartitionContext.java     | 171 +++++++++++++++
 .../autoscaler/api/AutoScalerServiceImpl.java   |  54 ++++-
 .../exception/NonExistingLBException.java       |  43 ++++
 .../interfaces/AutoScalerServiceInterface.java  |   5 +-
 .../autoscaler/partition/PartitionManager.java  |  65 +++++-
 .../cloud/controller/pojo/Cartridge.java        |  20 ++
 .../cloud/controller/pojo/CartridgeConfig.java  |  20 ++
 .../cloud/controller/pojo/CartridgeInfo.java    |  10 +
 .../controller/pojo/LoadbalancerConfig.java     |  70 ++++++
 .../topology/TopologyEventSender.java           |   6 +-
 .../controller/util/CloudControllerUtil.java    |   8 +
 .../WSO2CEPInFlightRequestPublisher.java        |  13 +-
 .../extension/api/LoadBalancerStatsReader.java  |   3 +-
 .../TenantAwareLoadBalanceEndpoint.java         |  31 ++-
 .../balancer/mediators/ResponseInterceptor.java |  26 ++-
 ...adBalancerInFlightRequestCountCollector.java |  71 +++---
 .../WSO2CEPInFlightRequestCountObserver.java    |  36 +--
 .../stratos/load/balancer/util/Constants.java   |   3 +
 .../event/topology/ClusterCreatedEvent.java     |   2 +-
 .../apache/stratos/rest/endpoint/Constants.java |   6 +-
 .../definition/CartridgeDefinitionBean.java     |   3 +
 .../cartridge/definition/LoadBalancerBean.java  |   4 +
 .../bean/util/converter/PojoConverter.java      |  12 +-
 .../rest/endpoint/services/ServiceUtils.java    | 218 ++++++++++++++++++-
 .../rest/endpoint/services/StratosAdmin.java    |  16 +-
 .../LoadBalancerStatisticsEventBuilder.xml      |   4 +-
 .../AverageRequestsInflightFinder.xml           |   4 +-
 .../GradientOfRequestsInFlightFinder.xml        |   6 +-
 ...SecondDerivativeOfRequestsInFlightFinder.xml |   6 +-
 .../stream-manager-config.xml                   |   9 +-
 .../haproxy/extension/HAProxyStatsReader.java   |  34 +--
 .../src/main/resources/AutoScalerService.wsdl   | 122 ++++++++++-
 .../main/resources/CloudControllerService.wsdl  |  21 +-
 36 files changed, 1027 insertions(+), 137 deletions(-)
----------------------------------------------------------------------



[2/8] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr

Posted by is...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/c66b090f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/c66b090f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/c66b090f

Branch: refs/heads/master
Commit: c66b090f97882b83b9fc24f3709e4a204016af98
Parents: 2d20166 2fbd0fd
Author: Isuru <is...@wso2.com>
Authored: Sun Dec 8 10:19:58 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Sun Dec 8 10:19:58 2013 +0530

----------------------------------------------------------------------
 .../mgt/publisher/TenantSynzhronizerTask.java   |  26 ++++-
 .../balancer/LoadBalancerTenantReceiver.java    | 105 +++++++++++--------
 .../balancer/LoadBalancerTopologyReceiver.java  |  64 ++---------
 .../stratos/load/balancer/RequestDelegator.java |  14 +--
 .../conf/LoadBalancerConfiguration.java         |  17 +--
 .../context/LoadBalancerContextUtil.java        |  73 +++++++++++++
 .../sample/configuration/loadbalancer2.conf     |   4 +-
 .../stratos/messaging/domain/tenant/Tenant.java |   5 +
 .../distribution/src/main/conf/log4j.properties |   2 +-
 9 files changed, 192 insertions(+), 118 deletions(-)
----------------------------------------------------------------------



[7/8] git commit: committing changes related to SM information model and relevant Rest APIs

Posted by is...@apache.org.
committing changes related to SM information model and relevant Rest APIs


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/749d1d1e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/749d1d1e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/749d1d1e

Branch: refs/heads/master
Commit: 749d1d1e42045697da037ec35160bd3eb030f925
Parents: c00e48f
Author: Isuru <is...@wso2.com>
Authored: Wed Dec 11 01:38:52 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Wed Dec 11 01:38:52 2013 +0530

----------------------------------------------------------------------
 .../topology/model/TopologyClusterModel.java    | 312 +++++++++++++++++--
 .../rest/endpoint/services/ServiceUtils.java    |  40 ++-
 .../rest/endpoint/services/StratosAdmin.java    |  32 ++
 3 files changed, 356 insertions(+), 28 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/749d1d1e/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
index f70c0d2..52e75cc 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
@@ -23,18 +23,17 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.messaging.domain.topology.Cluster;
 
-import java.util.ArrayList;
-import java.util.HashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
 import java.util.concurrent.locks.ReentrantReadWriteLock;
 
 public class TopologyClusterModel {
 
     private static final Log log = LogFactory.getLog(TopologyClusterModel.class);
-    private Map<TenantIdAndAliasTopologyKey, Cluster> tenantIdAndAliasTopologyKeyToClusterMap;
-    private Map<Integer, List<Cluster>> tenantIdToClusterMap;
-    private Map<TenantIdAndTypeTopologyKey , List<Cluster>> tenantIdAndTypeTopologyKeyToClusterMap;
+
+    private Map<Integer, Set<CartridgeTypeContext>> tenantIdToCartridgeTypeContextMap;
+    //private Map<TenantIdAndAliasTopologyKey, Cluster> tenantIdAndAliasTopologyKeyToClusterMap;
+    //private Map<Integer, List<Cluster>> tenantIdToClusterMap;
+    //private Map<TenantIdAndTypeTopologyKey , List<Cluster>> tenantIdAndTypeTopologyKeyToClusterMap;
     private static TopologyClusterModel topologyClusterModel;
 
     //locks
@@ -43,9 +42,10 @@ public class TopologyClusterModel {
     private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
 
     private TopologyClusterModel() {
-        tenantIdAndAliasTopologyKeyToClusterMap = new HashMap<TenantIdAndAliasTopologyKey, Cluster>();
-        tenantIdAndTypeTopologyKeyToClusterMap = new HashMap<TenantIdAndTypeTopologyKey, List<Cluster>>();
-        tenantIdToClusterMap = new HashMap<Integer, List<Cluster>>();
+        //tenantIdAndAliasTopologyKeyToClusterMap = new HashMap<TenantIdAndAliasTopologyKey, Cluster>();
+        //tenantIdAndTypeTopologyKeyToClusterMap = new HashMap<TenantIdAndTypeTopologyKey, List<Cluster>>();
+        //tenantIdToClusterMap = new HashMap<Integer, List<Cluster>>();
+        tenantIdToCartridgeTypeContextMap = new HashMap<Integer, Set<CartridgeTypeContext>>();
     }
 
     public static TopologyClusterModel getInstance () {
@@ -60,7 +60,7 @@ public class TopologyClusterModel {
         return topologyClusterModel;
     }
 
-    public void addCluster (int tenantId, String cartridgeType, String subscriptionAlias, Cluster cluster) {
+    /*public void addCluster (int tenantId, String cartridgeType, String subscriptionAlias, Cluster cluster) {
 
         List<Cluster> clusters;
         writeLock.lock();
@@ -92,43 +92,317 @@ public class TopologyClusterModel {
         } finally {
             writeLock.unlock();
         }
+    } */
+
+    public void addCluster (int tenantId, String cartridgeType, String subscriptionAlias, Cluster cluster) {
+
+        Set<CartridgeTypeContext> cartridgeTypeContextSet = null;
+        Set<SubscriptionAliasContext> subscriptionAliasContextSet = null;
+
+        writeLock.lock();
+        try {
+            //check if a set of CartridgeTypeContext instances already exist for given tenant Id
+            cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId);
+            if(cartridgeTypeContextSet != null) {
+                CartridgeTypeContext cartridgeTypeContext = null;
+                //iterate through the set
+                Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator();
+                while (typeCtxIterator.hasNext()) {
+                    //see if the set contains a CartridgeTypeContext instance with the given cartridge type
+                    cartridgeTypeContext = typeCtxIterator.next();
+                    if (cartridgeTypeContext.getType().equals(cartridgeType)){
+                        //if so, get the SubscriptionAliasContext set
+                        subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet();
+                        break;
+                    }
+                }
+                //check if a SubscriptionAliasContext set is not found
+                if(subscriptionAliasContextSet == null) {
+                    //no SubscriptionAliasContext instance
+                    //create a new SubscriptionAliasContext instance
+                    SubscriptionAliasContext subscriptionAliasContext = new SubscriptionAliasContext(subscriptionAlias,
+                            cluster);
+                    //create a SubscriptionAliasContext set
+                    subscriptionAliasContextSet = new HashSet<SubscriptionAliasContext>();
+                    //add the created SubscriptionAliasContext instance to SubscriptionAliasContext set
+                    subscriptionAliasContextSet.add(subscriptionAliasContext);
+                    //set it to the CartridgeTypeContext instance
+                    cartridgeTypeContext = new CartridgeTypeContext(cartridgeType);
+                    cartridgeTypeContext.setSubscriptionAliasContextSet(subscriptionAliasContextSet);
+                    //add to the cartridgeTypeContextSet
+                    cartridgeTypeContextSet.add(cartridgeTypeContext);
+
+                } else {
+                    //iterate through the set
+                    Iterator<SubscriptionAliasContext> aliasIterator = subscriptionAliasContextSet.iterator();
+                    while (aliasIterator.hasNext()) {
+                        //see if the set contains a SubscriptionAliasContext instance with the given alias
+                        SubscriptionAliasContext subscriptionAliasContext = aliasIterator.next();
+                        if (subscriptionAliasContext.getSubscriptionAlias().equals(subscriptionAlias)) {
+                            //remove the existing one
+                            aliasIterator.remove();
+                            break;
+                        }
+                    }
+                    //now, add the new cluster object
+                    subscriptionAliasContextSet.add(new SubscriptionAliasContext(subscriptionAlias, cluster));
+                }
+
+            } else {
+                //no entries for this tenant, go from down to top creating relevant objects and populating them
+                //create a new SubscriptionAliasContext instance
+                SubscriptionAliasContext subscriptionAliasContext = new SubscriptionAliasContext(subscriptionAlias,
+                        cluster);
+                //create a SubscriptionAliasContext set
+                subscriptionAliasContextSet = new HashSet<SubscriptionAliasContext>();
+                //add the created SubscriptionAliasContext instance to SubscriptionAliasContext set
+                subscriptionAliasContextSet.add(subscriptionAliasContext);
+
+                //create a new CartridgeTypeContext instance
+                CartridgeTypeContext cartridgeTypeContext = new CartridgeTypeContext(cartridgeType);
+                //link the SubscriptionAliasContextSet to it
+                cartridgeTypeContext.setSubscriptionAliasContextSet(subscriptionAliasContextSet);
+
+                //Create CartridgeTypeContext instance
+                cartridgeTypeContextSet = new HashSet<CartridgeTypeContext>();
+                //link the SubscriptionAliasContext set to CartridgeTypeContext instance
+                cartridgeTypeContext.setSubscriptionAliasContextSet(subscriptionAliasContextSet);
+
+                //link the CartridgeTypeContext set to the [tenant Id -> CartridgeTypeContext] map
+                tenantIdToCartridgeTypeContextMap.put(tenantId, cartridgeTypeContextSet);
+            }
+
+        } finally {
+            writeLock.unlock();
+        }
     }
 
-    public Cluster getCluster (int tenantId, String subscriptionAlias) {
+    public Cluster getCluster (int tenantId, String cartridgeType, String subscriptionAlias) {
+
+        Set<CartridgeTypeContext> cartridgeTypeContextSet = null;
+        Set<SubscriptionAliasContext> subscriptionAliasContextSet = null;
 
         readLock.lock();
         try {
-            return tenantIdAndAliasTopologyKeyToClusterMap.get(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias));
+            //check if a set of CartridgeTypeContext instances already exist for given tenant Id
+            cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId);
+            if(cartridgeTypeContextSet != null) {
+                CartridgeTypeContext cartridgeTypeContext = null;
+                //iterate through the set
+                Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator();
+                while (typeCtxIterator.hasNext()) {
+                    //see if the set contains a CartridgeTypeContext instance with the given cartridge type
+                    cartridgeTypeContext = typeCtxIterator.next();
+                    if (cartridgeTypeContext.getType().equals(cartridgeType)){
+                        //if so, get the SubscriptionAliasContext set
+                        subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet();
+                        break;
+                    }
+                }
+                if(subscriptionAliasContextSet != null) {
+                    //iterate through the set
+                    Iterator<SubscriptionAliasContext> aliasIterator = subscriptionAliasContextSet.iterator();
+                    while (aliasIterator.hasNext()) {
+                        //see if the set contains a SubscriptionAliasContext instance with the given alias
+                        SubscriptionAliasContext subscriptionAliasContext = aliasIterator.next();
+                        if (subscriptionAliasContext.getSubscriptionAlias().equals(subscriptionAlias)) {
+                            return subscriptionAliasContext.getCluster();
+                        }
+                    }
+                }
+            }
 
         } finally {
             readLock.unlock();
         }
+
+        return null;
     }
 
-    public List<Cluster> getClusters (int tenantId, String cartridgeType) {
+    public Set<Cluster> getClusters (int tenantId, String cartridgeType) {
+
+        Set<CartridgeTypeContext> cartridgeTypeContextSet = null;
+        Set<SubscriptionAliasContext> subscriptionAliasContextSet = null;
+        Set<Cluster> clusterSet = null;
 
         readLock.lock();
         try {
-            return tenantIdAndTypeTopologyKeyToClusterMap.get(new TenantIdAndTypeTopologyKey(tenantId, cartridgeType));
+            cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId);
+            if(cartridgeTypeContextSet != null) {
+                //iterate through the set
+                Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator();
+                while (typeCtxIterator.hasNext()) {
+                    //iterate and get each of SubscriptionAliasContext sets
+                    CartridgeTypeContext cartridgeTypeContext = typeCtxIterator.next();
+                    subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet();
+
+                    if (subscriptionAliasContextSet != null) {
+                        //iterate and convert to Cluster set
+                        Iterator<SubscriptionAliasContext> aliasCtxIterator = subscriptionAliasContextSet.iterator();
+                        clusterSet = new HashSet<Cluster>();
+                        while (aliasCtxIterator.hasNext()) {
+                            clusterSet.add(aliasCtxIterator.next().getCluster());
+                        }
+                    }
+                }
+            }
 
         } finally {
             readLock.unlock();
         }
+
+        return clusterSet;
     }
 
-    public List<Cluster> getClusters (int tenantId) {
+    public Set<Cluster> getClusters (int tenantId) {
+
+        Set<CartridgeTypeContext> cartridgeTypeContextSet = null;
+        Set<SubscriptionAliasContext> subscriptionAliasContextSet = null;
+        Set<Cluster> clusterSet = null;
 
         readLock.lock();
         try {
-            return tenantIdToClusterMap.get(tenantId);
+            cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId);
+            if(cartridgeTypeContextSet != null) {
+                CartridgeTypeContext cartridgeTypeContext = null;
+                //iterate through the set
+                Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator();
+                while (typeCtxIterator.hasNext()) {
+                    //see if the set contains a CartridgeTypeContext instance with the given cartridge type
+                }
+
+                if (subscriptionAliasContextSet != null) {
+                    //iterate and convert to Cluster set
+                    Iterator<SubscriptionAliasContext> aliasCtxIterator = subscriptionAliasContextSet.iterator();
+                    clusterSet = new HashSet<Cluster>();
+                    while (aliasCtxIterator.hasNext()) {
+                        clusterSet.add(aliasCtxIterator.next().getCluster());
+                    }
+                }
+            }
 
         } finally {
             readLock.unlock();
         }
+
+        return clusterSet;
     }
 
-    public void removeCluster (int tenantId, String subscriptionAlias) {
-        tenantIdAndAliasTopologyKeyToClusterMap.remove(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias));
+    public void removeCluster (int tenantId, String cartridgeType, String subscriptionAlias) {
+
+        Set<CartridgeTypeContext> cartridgeTypeContextSet = null;
+        Set<SubscriptionAliasContext> subscriptionAliasContextSet = null;
+
+        writeLock.lock();
+        try {
+            //check if a set of CartridgeTypeContext instances already exist for given tenant Id
+            cartridgeTypeContextSet = tenantIdToCartridgeTypeContextMap.get(tenantId);
+            if(cartridgeTypeContextSet != null) {
+                CartridgeTypeContext cartridgeTypeContext = null;
+                //iterate through the set
+                Iterator<CartridgeTypeContext> typeCtxIterator = cartridgeTypeContextSet.iterator();
+                while (typeCtxIterator.hasNext()) {
+                    //see if the set contains a CartridgeTypeContext instance with the given cartridge type
+                    cartridgeTypeContext = typeCtxIterator.next();
+                    if (cartridgeTypeContext.getType().equals(cartridgeType)){
+                        //if so, get the SubscriptionAliasContext set
+                        subscriptionAliasContextSet = cartridgeTypeContext.getSubscriptionAliasContextSet();
+                        break;
+                    }
+                }
+                if(subscriptionAliasContextSet != null) {
+                    //iterate through the set
+                    Iterator<SubscriptionAliasContext> aliasIterator = subscriptionAliasContextSet.iterator();
+                    while (aliasIterator.hasNext()) {
+                        //see if the set contains a SubscriptionAliasContext instance with the given alias
+                        SubscriptionAliasContext subscriptionAliasContext = aliasIterator.next();
+                        if (subscriptionAliasContext.getSubscriptionAlias().equals(subscriptionAlias)) {
+                            //remove the existing one
+                            aliasIterator.remove();
+                            break;
+                        }
+                    }
+                }
+            }
+
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    private class CartridgeTypeContext {
+
+        private String type;
+        private Set<SubscriptionAliasContext> subscriptionAliasContextSet;
+
+        public CartridgeTypeContext (String type) {
+            this.type = type;
+        }
+
+        public void setSubscriptionAliasContextSet (Set<SubscriptionAliasContext> subscriptionAliasContextSet) {
+            this.subscriptionAliasContextSet = subscriptionAliasContextSet;
+        }
+
+        public String getType () {
+            return type;
+        }
+
+        public Set<SubscriptionAliasContext> getSubscriptionAliasContextSet () {
+            return subscriptionAliasContextSet;
+        }
+
+        public boolean equals(Object other) {
+
+            if(this == other) {
+                return true;
+            }
+            if(!(other instanceof CartridgeTypeContext)) {
+                return false;
+            }
+
+            CartridgeTypeContext that = (CartridgeTypeContext)other;
+            return this.type.equals(that.type);
+        }
+
+        public int hashCode () {
+            return type.hashCode();
+        }
+    }
+
+    private class SubscriptionAliasContext {
+
+        private String subscriptionAlias;
+        private Cluster cluster;
+
+        public SubscriptionAliasContext(String subscriptionAlias, Cluster cluster) {
+            this.subscriptionAlias = subscriptionAlias;
+            this.cluster = cluster;
+        }
+
+        public String getSubscriptionAlias () {
+            return subscriptionAlias;
+        }
+
+        public Cluster getCluster () {
+            return cluster;
+        }
+
+        public boolean equals(Object other) {
+
+            if(this == other) {
+                return true;
+            }
+            if(!(other instanceof SubscriptionAliasContext)) {
+                return false;
+            }
+
+            SubscriptionAliasContext that = (SubscriptionAliasContext)other;
+            return this.subscriptionAlias.equals(that.subscriptionAlias);
+        }
+
+        public int hashCode () {
+            return subscriptionAlias.hashCode();
+        }
     }
 
     private class TenantIdAndAliasTopologyKey {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/749d1d1e/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
index a83a594..632684a 100644
--- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
+++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
@@ -31,32 +31,27 @@ import org.apache.stratos.adc.mgt.exception.*;
 import org.apache.stratos.adc.mgt.internal.DataHolder;
 import org.apache.stratos.adc.mgt.manager.CartridgeSubscriptionManager;
 import org.apache.stratos.adc.mgt.subscription.CartridgeSubscription;
+import org.apache.stratos.adc.mgt.topology.model.TopologyClusterModel;
 import org.apache.stratos.adc.mgt.utils.ApplicationManagementUtil;
 import org.apache.stratos.adc.mgt.utils.CartridgeConstants;
 import org.apache.stratos.adc.mgt.utils.PersistenceManager;
 import org.apache.stratos.adc.topology.mgt.service.TopologyManagementService;
+import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.cloud.controller.pojo.CartridgeConfig;
 import org.apache.stratos.cloud.controller.pojo.CartridgeInfo;
 import org.apache.stratos.cloud.controller.pojo.LoadbalancerConfig;
 import org.apache.stratos.cloud.controller.pojo.Properties;
+import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.rest.endpoint.Constants;
-import org.apache.stratos.rest.endpoint.bean.CartridgeInfoBean;
 import org.apache.stratos.rest.endpoint.bean.autoscaler.partition.Partition;
 import org.apache.stratos.rest.endpoint.bean.autoscaler.partition.PartitionGroup;
 import org.apache.stratos.rest.endpoint.bean.autoscaler.policy.autoscale.AutoscalePolicy;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
 import org.apache.stratos.rest.endpoint.bean.cartridge.definition.CartridgeDefinitionBean;
-import org.apache.stratos.rest.endpoint.bean.cartridge.definition.LoadBalancerBean;
 import org.apache.stratos.rest.endpoint.bean.util.converter.PojoConverter;
 import org.apache.stratos.rest.endpoint.exception.RestAPIException;
 import org.wso2.carbon.context.PrivilegedCarbonContext;
 
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Random;
-import java.util.Set;
+import java.util.*;
 import java.util.regex.Pattern;
 
 public class ServiceUtils {
@@ -787,6 +782,33 @@ public class ServiceUtils {
 
     }
 
+
+    public static Cluster getCluster (String cartridgeType, String subscriptionAlias, ConfigurationContext configurationContext) {
+
+        return TopologyClusterModel.getInstance().getCluster(ApplicationManagementUtil.getTenantId(configurationContext)
+                ,cartridgeType , subscriptionAlias);
+    }
+
+    public static Cluster[] getClustersForTenant (ConfigurationContext configurationContext) {
+
+        Set<Cluster> clusterSet = TopologyClusterModel.getInstance().getClusters(ApplicationManagementUtil.
+                getTenantId(configurationContext));
+
+        return (clusterSet != null && clusterSet.size() > 0 ) ?
+                clusterSet.toArray(new Cluster[clusterSet.size()]) : new Cluster[0];
+
+    }
+
+    public static Cluster[] getClustersForTenantAndCartridgeType (ConfigurationContext configurationContext,
+                                                                  String cartridgeType) {
+
+        Set<Cluster> clusterSet = TopologyClusterModel.getInstance().getClusters(ApplicationManagementUtil.
+                getTenantId(configurationContext), cartridgeType);
+
+        return (clusterSet != null && clusterSet.size() > 0 ) ?
+                clusterSet.toArray(new Cluster[clusterSet.size()]) : new Cluster[0];
+    }
+
     static void unsubscribe(String alias, String tenantDomain) throws ADCException, NotSubscribedException {
 
         cartridgeSubsciptionManager.unsubscribeFromCartridge(tenantDomain, alias);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/749d1d1e/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
index 0ca7ebf..22029fa 100644
--- a/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
+++ b/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/StratosAdmin.java
@@ -27,6 +27,7 @@ import org.apache.stratos.common.beans.TenantInfoBean;
 import org.apache.stratos.common.exception.StratosException;
 import org.apache.stratos.common.util.ClaimsMgtUtil;
 import org.apache.stratos.common.util.CommonUtil;
+import org.apache.stratos.messaging.domain.topology.Cluster;
 import org.apache.stratos.rest.endpoint.ServiceHolder;
 import org.apache.stratos.rest.endpoint.annotation.AuthorizationAction;
 import org.apache.stratos.rest.endpoint.annotation.SuperTenantService;
@@ -291,6 +292,37 @@ public class StratosAdmin extends AbstractAdmin {
         }
     }
 
+    @GET
+    @Path("/cluster")
+    @Produces("application/json")
+    @Consumes("application/json")
+    @AuthorizationAction("/permission/protected/manage/monitor/tenants")
+    public Cluster[] getClustersForTenant() throws ADCException {
+
+        return ServiceUtils.getClustersForTenant(getConfigContext());
+    }
+
+    @GET
+    @Path("/cluster/{cartridgeType}")
+    @Produces("application/json")
+    @Consumes("application/json")
+    @AuthorizationAction("/permission/protected/manage/monitor/tenants")
+    public Cluster[] getClusters(@PathParam("cartridgeType") String cartridgeType) throws ADCException {
+
+        return ServiceUtils.getClustersForTenantAndCartridgeType(getConfigContext(), cartridgeType);
+    }
+
+    @GET
+    @Path("/cluster/{cartridgeType}/{subscriptionAlias}")
+    @Produces("application/json")
+    @Consumes("application/json")
+    @AuthorizationAction("/permission/protected/manage/monitor/tenants")
+    public Cluster getCluster(@PathParam("cartridgeType") String cartridgeType,
+                              @PathParam("subscriptionAlias") String subscriptionAlias) throws ADCException {
+
+        return ServiceUtils.getCluster(cartridgeType, subscriptionAlias, getConfigContext());
+    }
+
     @POST
     @Path("/cartridge/unsubscribe")
     @Consumes("application/json")


[8/8] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr

Posted by is...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/85abff74
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/85abff74
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/85abff74

Branch: refs/heads/master
Commit: 85abff74dae74db85b6d4c69f7d71bb99fe2ae13
Parents: 749d1d1 ba3b50c
Author: Isuru <is...@wso2.com>
Authored: Wed Dec 11 01:39:17 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Wed Dec 11 01:39:17 2013 +0530

----------------------------------------------------------------------
 .../cloud/controller/iaases/AWSEC2Iaas.java     | 10 --------
 .../controller/iaases/OpenstackNovaIaas.java    | 26 +++-----------------
 .../messaging/domain/topology/Cluster.java      | 10 +++++++-
 .../rest/endpoint/services/ServiceUtils.java    |  3 +--
 4 files changed, 13 insertions(+), 36 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/85abff74/components/org.apache.stratos.rest.endpoint/src/main/java/org/apache/stratos/rest/endpoint/services/ServiceUtils.java
----------------------------------------------------------------------


[3/8] git commit: Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr

Posted by is...@apache.org.
Merge branch 'master' of https://git-wip-us.apache.org/repos/asf/incubator-stratos into topology_mgr


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/c07f4595
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/c07f4595
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/c07f4595

Branch: refs/heads/master
Commit: c07f459558fd17c8e6c5ab0d16e3d23528a65920
Parents: c66b090 93ee236
Author: Isuru <is...@wso2.com>
Authored: Tue Dec 10 15:18:01 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Tue Dec 10 15:18:01 2013 +0530

----------------------------------------------------------------------
 .../adc/mgt/client/AutoscalerServiceClient.java |   67 +-
 .../client/CloudControllerServiceClient.java    |    3 +-
 .../org/apache/stratos/adc/mgt/dao/Cluster.java |    4 +-
 .../adc/mgt/listener/TenantStatusListner.java   |   42 +
 .../ClusterIdToCartridgeSubscriptionMap.java    |   70 +
 .../adc/mgt/lookup/LookupDataHolder.java        |  254 ++
 ...criptionAliasToCartridgeSubscriptionMap.java |   71 +
 .../manager/CartridgeSubscriptionManager.java   |   19 +-
 .../adc/mgt/payload/NonCarbonPayload.java       |   13 +-
 .../apache/stratos/adc/mgt/payload/Payload.java |    7 +-
 .../DatabaseBasedPersistenceManager.java        | 2817 +++++++++---------
 .../adc/mgt/persistence/PersistenceManager.java |   28 +-
 .../RegistryBasedPersistenceManager.java        |  125 +
 .../adc/mgt/registry/RegistryManager.java       |  138 +
 .../stratos/adc/mgt/repository/Repository.java  |    4 +-
 .../adc/mgt/retriever/DataRetrievalManager.java |  184 ++
 .../service/ApplicationManagementService.java   |    2 +-
 .../stratos/adc/mgt/subscriber/Subscriber.java  |    4 +-
 .../ApplicationCartridgeSubscription.java       |    9 +-
 .../mgt/subscription/CartridgeSubscription.java |   41 +-
 .../subscription/CartridgeSubscription_old.java |    8 +-
 .../subscription/DataCartridgeSubscription.java |   13 +-
 .../DataCartridgeSubscription_old.java          |    4 +-
 .../FrameworkCartridgeSubscription.java         |    9 +-
 .../MultiTenantCartridgeSubscription.java       |    2 +-
 .../SingleTenantCartridgeSubscription.java      |    2 +-
 .../SubscriptionSingleTenantBehaviour.java      |    2 +
 .../tenancy/SubscriptionTenancyBehaviour.java   |    3 +-
 .../mgt/utils/ApplicationManagementUtil.java    |   10 +-
 .../stratos/adc/mgt/utils/Deserializer.java     |   52 +
 .../stratos/adc/mgt/utils/Serializer.java       |   83 +
 .../autoscaler/policy/PolicyManager.java        |   10 +-
 .../autoscaler/registry/RegistryManager.java    |    2 +-
 .../rule/AutoscalerRuleEvaluator.java           |   36 +-
 .../cloud/controller/pojo/Registrant.java       |   10 +-
 .../stratos/load/balancer/EndpointDeployer.java |    8 +-
 .../balancer/LoadBalancerTenantReceiver.java    |   45 +-
 .../balancer/LoadBalancerTopologyReceiver.java  |    4 +-
 .../stratos/load/balancer/RequestDelegator.java |   12 +-
 .../conf/LoadBalancerConfiguration.java         |   37 +-
 .../conf/configurator/JndiConfigurator.java     |    5 +-
 .../conf/configurator/SynapseConfigurator.java  |   20 +-
 .../load/balancer/conf/structure/Node.java      |  181 +-
 .../balancer/conf/structure/NodeBuilder.java    |   34 +-
 .../load/balancer/conf/util/Constants.java      |    2 +
 .../balancer/context/LoadBalancerContext.java   |  177 +-
 .../context/LoadBalancerContextUtil.java        |   56 +-
 .../context/map/ClusterIdClusterContextMap.java |   61 +
 .../context/map/ClusterIdClusterMap.java        |   57 +
 .../context/map/HostNameClusterMap.java         |   63 +
 .../context/map/MultiTenantClusterMap.java      |   66 +
 .../map/ServiceNameServiceContextMap.java       |   58 +
 .../TenantIdSynapseEnvironmentServiceMap.java   |   63 +
 .../TenantAwareLoadBalanceEndpoint.java         |   18 +-
 .../internal/LoadBalancerServiceComponent.java  |   24 +-
 .../balancer/mediators/ResponseInterceptor.java |    7 +-
 ...adBalancerInFlightRequestCountCollector.java |  138 +-
 .../stratos/load/balancer/util/Constants.java   |    3 +-
 .../test/LoadBalancerConfigurationTest.java     |    5 +-
 .../sample/configuration/loadbalancer1.conf     |    2 +-
 .../sample/configuration/loadbalancer2.conf     |    4 +-
 .../sample/configuration/loadbalancer3.conf     |    2 +-
 .../messaging/domain/topology/Cloud.java        |   84 -
 .../messaging/domain/topology/Cluster.java      |   32 +-
 .../messaging/domain/topology/Region.java       |   79 -
 .../stratos/messaging/domain/topology/Zone.java |   55 -
 .../tenant/TenantMessageProcessorChain.java     |   13 +-
 .../rest/endpoint/bean/CartridgeInfoBean.java   |   19 +-
 .../autoscaler/partition/PartitionGroup.java    |    2 +-
 .../policy/deployment/DeploymentPolicy.java     |    4 +
 .../bean/util/converter/PojoConverter.java      |  153 +-
 .../rest/endpoint/services/ServiceUtils.java    |   90 +-
 .../rest/endpoint/services/StratosAdmin.java    |   60 +-
 .../ec2/load-balancer/start-load-balancer.sh    |   70 +
 .../templates/loadbalancer.conf.template        |  126 +
 .../src/main/conf/loadbalancer.conf             |    4 +-
 .../pom.xml                                     |    5 +-
 .../src/main/resources/AutoScalerService.wsdl   |  455 ++-
 .../main/resources/CloudControllerService.wsdl  |   13 +-
 79 files changed, 4298 insertions(+), 2266 deletions(-)
----------------------------------------------------------------------



[5/8] git commit: Topology model initial implementation at SM backend

Posted by is...@apache.org.
Topology model initial implementation at SM backend


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/cbc482a2
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/cbc482a2
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/cbc482a2

Branch: refs/heads/master
Commit: cbc482a249e8a6cae2175639dd2f2f9a15289814
Parents: a0323f5
Author: Isuru <is...@wso2.com>
Authored: Tue Dec 10 20:48:09 2013 +0530
Committer: Isuru <is...@wso2.com>
Committed: Tue Dec 10 20:48:09 2013 +0530

----------------------------------------------------------------------
 .../internal/ADCManagementServerComponent.java  |   7 +
 .../adc/mgt/listener/TopologyEventListner.java  |  40 ++
 .../processor/InstanceStatusProcessor.java      | 409 +++++++++++++++++++
 .../event/processor/TopologyEventProcessor.java |  33 ++
 .../processor/TopologyEventProcessorChain.java  |  59 +++
 .../topology/model/TopologyClusterModel.java    | 203 +++++++++
 6 files changed, 751 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
index a77cd48..0b9c7b8 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/internal/ADCManagementServerComponent.java
@@ -21,6 +21,7 @@ package org.apache.stratos.adc.mgt.internal;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.adc.mgt.listener.InstanceStatusListener;
+import org.apache.stratos.adc.mgt.listener.TopologyEventListner;
 import org.apache.stratos.adc.mgt.publisher.TenantEventPublisher;
 import org.apache.stratos.adc.mgt.publisher.TenantSynchronizerTaskScheduler;
 import org.apache.stratos.adc.mgt.utils.CartridgeConfigFileReader;
@@ -97,6 +98,12 @@ public class ADCManagementServerComponent {
             Thread tsubscriber = new Thread(subscriber);
 			tsubscriber.start();
 
+            //initializing the topology event subscriber
+            TopicSubscriber topologyTopicSubscriber = new TopicSubscriber(Constants.TOPOLOGY_TOPIC);
+            topologyTopicSubscriber.setMessageListener(new TopologyEventListner());
+            Thread topologyTopicSubscriberThread = new Thread(topologyTopicSubscriber);
+            topologyTopicSubscriberThread.start();
+
             //Starting Topology Receiver
             TopologyReceiver topologyReceiver = new TopologyReceiver();
             Thread topologyReceiverThread = new Thread(topologyReceiver);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java
new file mode 100644
index 0000000..0ec88f2
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/listener/TopologyEventListner.java
@@ -0,0 +1,40 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.listener;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.topology.event.processor.TopologyEventProcessorChain;
+
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+public class TopologyEventListner implements MessageListener {
+
+    private static final Log log = LogFactory.getLog(TopologyEventListner.class);
+
+    public TopologyEventListner() {
+    }
+
+    public void onMessage(Message message) {
+
+        TopologyEventProcessorChain.getInstance().startProcessing(message);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
new file mode 100644
index 0000000..0f274de
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/InstanceStatusProcessor.java
@@ -0,0 +1,409 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
+import org.apache.stratos.adc.mgt.topology.model.TopologyClusterModel;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.MemberStartedEvent;
+import org.apache.stratos.messaging.event.topology.MemberSuspendedEvent;
+import org.apache.stratos.messaging.event.topology.MemberTerminatedEvent;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.messaging.util.Constants;
+import org.apache.stratos.messaging.util.Util;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import java.util.HashMap;
+import java.util.Map;
+
+public class InstanceStatusProcessor extends TopologyEventProcessor {
+
+    private static final Log log = LogFactory.getLog(InstanceStatusProcessor.class);
+
+    private Map<String, Integer> clusterIdToActiveInstanceCountMap;
+
+    public InstanceStatusProcessor () {
+        clusterIdToActiveInstanceCountMap = new HashMap<String, Integer>();
+    }
+
+    @Override
+    public void process(Message message) {
+
+        //new InstanceStatusListenerThread(message).start();
+        //go to next processor in the chain
+        if(nextTopologyEventProcessor != null) {
+            nextTopologyEventProcessor.process(message);
+        }
+    }
+
+    private void doProcessing (Message message) {
+
+        String messageType = null;
+
+        try {
+            messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+        } catch (JMSException e) {
+            log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
+            return;
+        }
+
+        if (MemberStartedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberStartedEvent event = getMemberStartedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+        }
+        else if (MemberActivatedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberActivatedEvent event = getMemberActivetedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+
+        } else if (MemberSuspendedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberStartedEvent event = getMemberStartedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+        } else if (MemberTerminatedEvent.class.getName().equals(messageType)) {
+
+            log.info("Received message: " + messageType);
+
+            MemberStartedEvent event = getMemberStartedEvent(message);
+            String clusterDomain = event.getClusterId();
+            CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+
+            if(cartridgeSubscriptionInfo != null) {
+                Cluster cluster = TopologyManager.getTopology().
+                        getService(cartridgeSubscriptionInfo.getCartridge()).getCluster(clusterDomain);
+                TopologyClusterModel.getInstance().addCluster(cartridgeSubscriptionInfo.getTenantId(),
+                        cartridgeSubscriptionInfo.getCartridge(),
+                        cartridgeSubscriptionInfo.getAlias(), cluster);
+            }
+
+        } else {
+            //cannot happen
+        }
+    }
+
+    private MemberStartedEvent getMemberStartedEvent (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received Message ", e);
+            return null;
+        }
+        MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() +
+                    "\nStatus: " + event.getStatus().name() + " ]");
+        }
+
+        return event;
+    }
+
+    private MemberActivatedEvent getMemberActivetedEvent (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received Message ", e);
+            return null;
+        }
+        MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() +
+                    "\nIp: " + event.getMemberIp() + " ]");
+        }
+
+        return event;
+    }
+
+    private MemberSuspendedEvent getMemberSuspendedEvent (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received Message ", e);
+            return null;
+        }
+        MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() + " ]");
+        }
+
+        return event;
+    }
+
+    private MemberTerminatedEvent getMemberTerminatedEvebt (Message message) {
+
+        String json = null;
+        try {
+            json = ((TextMessage)message).getText();
+
+        } catch (JMSException e) {
+            log.error("Error in getting Json message type from received Message ", e);
+            return null;
+        }
+        MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+        if(log.isDebugEnabled()) {
+            log.debug("Received message details: [ " +
+                    "Cluster Id: " + event.getClusterId() +
+                    "\nMember Id: " + event.getMemberId() +
+                    "\nService name: " + event.getServiceName() + " ]");
+        }
+
+        return event;
+    }
+
+    private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
+
+        try {
+            return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+        } catch (Exception e) {
+            log.error("Error getting subscription information for cluster " + clusterDomain, e);
+            return null;
+        }
+    }
+
+    /**
+     * Message Processing Thread class for InstanceStatusProcessor
+     */
+    /*private class InstanceStatusListenerThread extends Thread {
+
+        Message message;
+
+        public InstanceStatusListenerThread (Message message) {
+            this.message = message;
+        }
+
+        public void run () {
+
+            String messageType = null;
+
+            try {
+                messageType = message.getStringProperty(Constants.EVENT_CLASS_NAME);
+
+            } catch (JMSException e) {
+                log.error("Error in getting message type from received Message " + message.getClass().toString(), e);
+                return;
+            }
+
+            if (MemberStartedEvent.class.getName().equals(messageType)) {
+
+                log.info("Received message: " + messageType);
+
+                MemberStartedEvent event = getMemberStartedEvent();
+                String clusterDomain = event.getClusterId();
+                CartridgeSubscriptionInfo cartridgeSubscriptionInfo = getCartridgeSubscriptionInfo(clusterDomain);
+                if(cartridgeSubscriptionInfo != null) {
+
+                }
+
+            }
+            else if (MemberActivatedEvent.class.getName().equals(messageType)) {
+
+                log.info("Received message: " + messageType);
+
+                MemberActivatedEvent event = getMemberActivetedEvent();
+                String clusterDomain = event.getClusterId();
+
+
+            } else if (MemberSuspendedEvent.class.getName().equals(messageType)) {
+
+                log.info("Received message: " + messageType);
+
+                MemberStartedEvent event = getMemberStartedEvent();
+                String clusterDomain = event.getClusterId();
+
+            } else if (MemberTerminatedEvent.class.getName().equals(messageType)) {
+
+                log.info("Received message: " + messageType);
+
+                MemberStartedEvent event = getMemberStartedEvent();
+                String clusterDomain = event.getClusterId();
+
+            } else {
+                //cannot happen
+            }
+        }
+
+        private MemberStartedEvent getMemberStartedEvent () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received Message ", e);
+                return null;
+            }
+            MemberStartedEvent event = (MemberStartedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() +
+                        "\nStatus: " + event.getStatus().name() + " ]");
+            }
+
+            return event;
+        }
+
+        private MemberActivatedEvent getMemberActivetedEvent () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received Message ", e);
+                return null;
+            }
+            MemberActivatedEvent event = (MemberActivatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() +
+                        "\nIp: " + event.getMemberIp() + " ]");
+            }
+
+            return event;
+        }
+
+        private MemberSuspendedEvent getMemberSuspendedEvent () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received Message ", e);
+                return null;
+            }
+            MemberSuspendedEvent event = (MemberSuspendedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() + " ]");
+            }
+
+            return event;
+        }
+
+        private MemberTerminatedEvent getMemberTerminatedEvebt () {
+
+            String json = null;
+            try {
+                json = ((TextMessage)message).getText();
+
+            } catch (JMSException e) {
+                log.error("Error in getting Json message type from received Message ", e);
+                return null;
+            }
+            MemberTerminatedEvent event = (MemberTerminatedEvent) Util.jsonToObject(json, MemberStartedEvent.class);
+
+            if(log.isDebugEnabled()) {
+                log.debug("Received message details: [ " +
+                        "Cluster Id: " + event.getClusterId() +
+                        "\nMember Id: " + event.getMemberId() +
+                        "\nService name: " + event.getServiceName() + " ]");
+            }
+
+            return event;
+        }
+
+        private CartridgeSubscriptionInfo getCartridgeSubscriptionInfo (String clusterDomain) {
+
+            try {
+                return PersistenceManager.getSubscriptionFromClusterId(clusterDomain);
+
+            } catch (Exception e) {
+                log.error("Error getting subscription information for cluster " + clusterDomain, e);
+                return null;
+            }
+        }
+    }*/
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java
new file mode 100644
index 0000000..f582d57
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessor.java
@@ -0,0 +1,33 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import javax.jms.Message;
+
+public abstract class TopologyEventProcessor {
+
+    protected TopologyEventProcessor nextTopologyEventProcessor = null;
+
+    public void setNext (TopologyEventProcessor nextTopologyEventProcessor) {
+        this.nextTopologyEventProcessor = nextTopologyEventProcessor;
+    }
+
+    public abstract void process (Message message);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
new file mode 100644
index 0000000..5c25c59
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/event/processor/TopologyEventProcessorChain.java
@@ -0,0 +1,59 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.event.processor;
+
+import javax.jms.Message;
+
+public class TopologyEventProcessorChain {
+
+    private TopologyEventProcessor firstTopologyEventProcessor = null;
+    private static TopologyEventProcessorChain topologyEventProcessorChain;
+
+    private TopologyEventProcessorChain () {
+        firstTopologyEventProcessor = new InstanceStatusProcessor();
+    }
+
+    public static TopologyEventProcessorChain getInstance () {
+
+        if(topologyEventProcessorChain == null) {
+            synchronized (TopologyEventProcessorChain.class) {
+                if(topologyEventProcessorChain == null) {
+                      topologyEventProcessorChain = new TopologyEventProcessorChain();
+                }
+            }
+        }
+
+        return topologyEventProcessorChain;
+    }
+
+    public void initProcessorChain () {
+
+        //if any other topology event processors are added, link them as follows
+        //firstTopologyEventProcessor.setNext(secondTopologyeventProcessor);
+        //secondTopologyeventProcessor.setNext(null);
+        firstTopologyEventProcessor.setNext(null);
+    }
+
+    public void startProcessing (Message message) {
+        firstTopologyEventProcessor.process(message);
+    }
+
+
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/cbc482a2/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
new file mode 100644
index 0000000..f70c0d2
--- /dev/null
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/topology/model/TopologyClusterModel.java
@@ -0,0 +1,203 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.adc.mgt.topology.model;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.locks.ReentrantReadWriteLock;
+
+public class TopologyClusterModel {
+
+    private static final Log log = LogFactory.getLog(TopologyClusterModel.class);
+    private Map<TenantIdAndAliasTopologyKey, Cluster> tenantIdAndAliasTopologyKeyToClusterMap;
+    private Map<Integer, List<Cluster>> tenantIdToClusterMap;
+    private Map<TenantIdAndTypeTopologyKey , List<Cluster>> tenantIdAndTypeTopologyKeyToClusterMap;
+    private static TopologyClusterModel topologyClusterModel;
+
+    //locks
+    private static volatile ReentrantReadWriteLock lock = new ReentrantReadWriteLock();
+    private static volatile ReentrantReadWriteLock.ReadLock readLock = lock.readLock();
+    private static volatile ReentrantReadWriteLock.WriteLock writeLock = lock.writeLock();
+
+    private TopologyClusterModel() {
+        tenantIdAndAliasTopologyKeyToClusterMap = new HashMap<TenantIdAndAliasTopologyKey, Cluster>();
+        tenantIdAndTypeTopologyKeyToClusterMap = new HashMap<TenantIdAndTypeTopologyKey, List<Cluster>>();
+        tenantIdToClusterMap = new HashMap<Integer, List<Cluster>>();
+    }
+
+    public static TopologyClusterModel getInstance () {
+        if(topologyClusterModel == null) {
+            synchronized (TopologyClusterModel.class) {
+                if (topologyClusterModel == null) {
+                    topologyClusterModel = new TopologyClusterModel();
+                }
+            }
+        }
+
+        return topologyClusterModel;
+    }
+
+    public void addCluster (int tenantId, String cartridgeType, String subscriptionAlias, Cluster cluster) {
+
+        List<Cluster> clusters;
+        writeLock.lock();
+
+        try {
+            //[Tenant Id + Subscription Alias] -> Cluster map
+            tenantIdAndAliasTopologyKeyToClusterMap.put(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias), cluster);
+
+            //Tenant Id -> Cluster map
+            clusters = tenantIdToClusterMap.get(tenantId);
+            if(clusters == null) {
+                clusters = new ArrayList<Cluster>();
+                clusters.add(cluster);
+                tenantIdToClusterMap.put(tenantId, clusters);
+            } else {
+                clusters.add(cluster);
+            }
+
+            //[Tenant Id + Cartridge Type] -> Cluster map
+            clusters = tenantIdAndTypeTopologyKeyToClusterMap.get(new TenantIdAndTypeTopologyKey(tenantId, cartridgeType));
+            if(clusters == null) {
+                clusters = new ArrayList<Cluster>();
+                clusters.add(cluster);
+                tenantIdAndTypeTopologyKeyToClusterMap.put(new TenantIdAndTypeTopologyKey(tenantId, cartridgeType), clusters);
+            } else {
+                clusters.add(cluster);
+            }
+
+        } finally {
+            writeLock.unlock();
+        }
+    }
+
+    public Cluster getCluster (int tenantId, String subscriptionAlias) {
+
+        readLock.lock();
+        try {
+            return tenantIdAndAliasTopologyKeyToClusterMap.get(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias));
+
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<Cluster> getClusters (int tenantId, String cartridgeType) {
+
+        readLock.lock();
+        try {
+            return tenantIdAndTypeTopologyKeyToClusterMap.get(new TenantIdAndTypeTopologyKey(tenantId, cartridgeType));
+
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public List<Cluster> getClusters (int tenantId) {
+
+        readLock.lock();
+        try {
+            return tenantIdToClusterMap.get(tenantId);
+
+        } finally {
+            readLock.unlock();
+        }
+    }
+
+    public void removeCluster (int tenantId, String subscriptionAlias) {
+        tenantIdAndAliasTopologyKeyToClusterMap.remove(new TenantIdAndAliasTopologyKey(tenantId, subscriptionAlias));
+    }
+
+    private class TenantIdAndAliasTopologyKey {
+
+        private int tenantId;
+        private String subscriptionAlias;
+
+        public TenantIdAndAliasTopologyKey (int tenantId, String subscriptionAlias) {
+
+            this.tenantId = tenantId;
+            this.subscriptionAlias = subscriptionAlias;
+        }
+
+        public boolean equals(Object other) {
+
+            if(this == other) {
+                return true;
+            }
+            if(!(other instanceof TenantIdAndAliasTopologyKey)) {
+                return false;
+            }
+
+            TenantIdAndAliasTopologyKey that = (TenantIdAndAliasTopologyKey)other;
+            return ((this.tenantId == that.tenantId) && (this.subscriptionAlias == that.subscriptionAlias));
+        }
+
+        public int hashCode () {
+
+            int subscriptionAliasHashCode = 0;
+            if(subscriptionAlias != null) {
+                subscriptionAliasHashCode = subscriptionAlias.hashCode();
+            }
+
+            return (tenantId * 3 + subscriptionAliasHashCode * 5);
+        }
+    }
+
+    public class TenantIdAndTypeTopologyKey {
+
+        private int tenantId;
+        private String subscriptionAlias;
+
+        public TenantIdAndTypeTopologyKey (int tenantId, String subscriptionAlias) {
+
+            this.tenantId = tenantId;
+            this.subscriptionAlias = subscriptionAlias;
+        }
+
+        public boolean equals(Object other) {
+
+            if(this == other) {
+                return true;
+            }
+            if(!(other instanceof TenantIdAndTypeTopologyKey)) {
+                return false;
+            }
+
+            TenantIdAndTypeTopologyKey that = (TenantIdAndTypeTopologyKey)other;
+            return ((this.tenantId == that.tenantId) && (this.subscriptionAlias == that.subscriptionAlias));
+        }
+
+        public int hashCode () {
+
+            int subscriptionAliasHashCode = 0;
+            if(subscriptionAlias != null) {
+                subscriptionAliasHashCode = subscriptionAlias.hashCode();
+            }
+
+            return (tenantId * 3 + subscriptionAliasHashCode * 5);
+        }
+    }
+}