You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/03/08 05:38:51 UTC

stratos git commit: Updating load balancer statistics reader to user topology provider

Repository: stratos
Updated Branches:
  refs/heads/master 5c502b266 -> de00e1fa3


Updating load balancer statistics reader to user topology provider


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/de00e1fa
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/de00e1fa
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/de00e1fa

Branch: refs/heads/master
Commit: de00e1fa3b76505a469f6536c8d0fac2bfe57b87
Parents: 5c502b2
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sun Mar 8 10:08:23 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sun Mar 8 10:08:45 2015 +0530

----------------------------------------------------------------------
 .../impl/CloudControllerServiceImpl.java        | 12 +----
 .../load/balancer/common/domain/Service.java    |  8 ++-
 ...LoadBalancerCommonTopologyEventReceiver.java | 14 +++--
 .../LoadBalancerStatisticsReader.java           |  2 +-
 .../LoadBalancerStatisticsNotifier.java         | 56 +++++++++-----------
 .../extension/api/LoadBalancerExtension.java    | 46 ++++++++++------
 .../internal/LoadBalancerServiceComponent.java  |  7 +--
 .../LoadBalancerStatisticsCollector.java        |  3 +-
 .../stratos/haproxy/extension/HAProxy.java      |  3 +-
 .../extension/HAProxyStatisticsReader.java      | 49 ++++++++---------
 .../apache/stratos/haproxy/extension/Main.java  | 29 ++++++++--
 11 files changed, 130 insertions(+), 99 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 31eca08..d351134 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -118,7 +118,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 		handleNullObject(cartridgeConfig, "Cartridge definition is null");
 
 		if (log.isInfoEnabled()) {
-			log.info("Adding cartridge: [cartridge-type] " + cartridgeConfig.getType());
+			log.info("Updating cartridge: [cartridge-type] " + cartridgeConfig.getType());
 		}
 		if (log.isDebugEnabled()) {
 			log.debug("Cartridge definition: " + cartridgeConfig.toString());
@@ -133,12 +133,9 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 			throw new InvalidCartridgeDefinitionException(msg, e);
 		}
 
-		// TODO transaction begins
 		String cartridgeType = cartridge.getType();
-		// Undeploy if already deployed
 		if (cloudControllerContext.getCartridge(cartridgeType) != null) {
 			Cartridge cartridgeToBeRemoved = cloudControllerContext.getCartridge(cartridgeType);
-			// undeploy
 			try {
 				removeCartridge(cartridgeToBeRemoved.getType());
 			} catch (InvalidCartridgeTypeException ignore) {
@@ -152,15 +149,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
 		// Add cartridge to the cloud controller context and persist
 		CloudControllerContext.getInstance().addCartridge(cartridge);
 		CloudControllerContext.getInstance().persist();
-
-		List<Cartridge> cartridgeList = new ArrayList<Cartridge>();
-		cartridgeList.add(cartridge);
-
-		TopologyBuilder.handleServiceCreated(cartridgeList);
 		// transaction ends
 
 		if (log.isInfoEnabled()) {
-			log.info("Successfully added cartridge: [cartridge-type] " + cartridgeType);
+			log.info("Successfully updated cartridge: [cartridge-type] " + cartridgeType);
 		}
 	}
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
index 116078e..f94e7f8 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
@@ -89,12 +89,16 @@ public class Service {
 
     public void addPorts(Collection<Port> ports) {
         for(Port port : ports) {
-            addPort(port);
+            if(!portExists(port)) {
+                addPort(port);
+            }
         }
     }
 
     public void removePort(Port port) {
-        this.portMap.remove(port.getProxy());
+        if(portExists(port)) {
+            this.portMap.remove(port.getProxy());
+        }
     }
 
     public boolean portExists(Port port) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 8582f3c..88be323 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -293,7 +293,16 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
             }
             return;
         }
-        topologyProvider.addMember(transformMember(member));
+
+        org.apache.stratos.load.balancer.common.domain.Member lbMember = transformMember(member);
+        org.apache.stratos.load.balancer.common.domain.Service lbService = topologyProvider.getTopology().
+                getService(serviceName);
+        if(lbService == null) {
+            log.warn(String.format("Service not found: %s", serviceName));
+            return;
+        }
+        lbService.addPorts(lbMember.getPorts());
+        topologyProvider.addMember(lbMember);
     }
 
     /**
@@ -347,9 +356,6 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
     private org.apache.stratos.load.balancer.common.domain.Service transformService(Service messagingService) {
         org.apache.stratos.load.balancer.common.domain.Service service =
                 new org.apache.stratos.load.balancer.common.domain.Service(messagingService.getServiceName());
-        for(Port port : messagingService.getPorts()) {
-            service.addPort(transformPort(port));
-        }
         return service;
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
index c0016fe..c9d2556 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
@@ -19,7 +19,7 @@
 
 package org.apache.stratos.load.balancer.common.statistics;
 
-import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
 
 /**
  * Load balancer statistics reader interface.

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index 9c4cb88..52f98be 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -25,10 +25,10 @@ import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
 import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisherFactory;
 import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Service;
 import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
 
 /**
  * Load balancer statistics notifier thread for publishing statistics periodically to CEP.
@@ -37,13 +37,15 @@ public class LoadBalancerStatisticsNotifier implements Runnable {
     private static final Log log = LogFactory.getLog(LoadBalancerStatisticsNotifier.class);
 
     private final LoadBalancerStatisticsReader statsReader;
+    private final TopologyProvider topologyProvider;
     private final InFlightRequestPublisher inFlightRequestPublisher;
     private long statsPublisherInterval = 15000;
     private String networkPartitionId;
     private boolean terminated;
 
-    public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader) {
+    public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader, TopologyProvider topologyProvider) {
         this.statsReader = statsReader;
+        this.topologyProvider = topologyProvider;
         this.inFlightRequestPublisher = InFlightRequestPublisherFactory.createInFlightRequestPublisher(
                 StatisticsPublisherType.WSO2CEP);
 
@@ -74,36 +76,26 @@ public class LoadBalancerStatisticsNotifier implements Runnable {
                     log.debug("Publishing load balancer statistics");
                 }
                 if (inFlightRequestPublisher.isEnabled()) {
-                    try {
-                        TopologyManager.acquireReadLock();
-                        int requestCount;
-                        int servedRequestCount;
-                        int activeInstancesCount;
-                        for (Service service : TopologyManager.getTopology().getServices()) {
-                            for (Cluster cluster : service.getClusters()) {
-                                if (!cluster.isLbCluster()) {
-                                    // Publish in-flight request count of load balancer's network partition
-                                    requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
-                                    servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId());
-                                    if(requestCount == 0) {
-                                        servedRequestCount = 0;
-                                    }
-                                    activeInstancesCount = statsReader.getActiveInstancesCount(cluster);
-                                    inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId,activeInstancesCount, requestCount, servedRequestCount);
-                                    log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ",
-                                            cluster.getClusterId(), networkPartitionId, servedRequestCount , activeInstancesCount ,requestCount ));
-                                    if (log.isDebugEnabled()) {
-                                        log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
-                                                cluster.getClusterId(), networkPartitionId, requestCount));
-                                    }
-                                }
-                                else {
-                                    // Load balancer cluster found in topology; we do not need to publish request counts for them.
-                                }
+                    int requestCount;
+                    int servedRequestCount;
+                    int activeInstancesCount;
+                    for (Service service : topologyProvider.getTopology().getServices()) {
+                        for (Cluster cluster : service.getClusters()) {
+                            // Publish in-flight request count of load balancer's network partition
+                            requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
+                            servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId());
+                            if (requestCount == 0) {
+                                servedRequestCount = 0;
+                            }
+                            activeInstancesCount = statsReader.getActiveInstancesCount(cluster);
+                            inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, activeInstancesCount, requestCount, servedRequestCount);
+                            log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ",
+                                    cluster.getClusterId(), networkPartitionId, servedRequestCount, activeInstancesCount, requestCount));
+                            if (log.isDebugEnabled()) {
+                                log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
+                                        cluster.getClusterId(), networkPartitionId, requestCount));
                             }
                         }
-                    } finally {
-                        TopologyManager.releaseReadLock();
                     }
                 } else if (log.isWarnEnabled()) {
                     log.warn("In-flight request count publisher is disabled");

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 2459b31..ef2dc5b 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -62,10 +62,12 @@ public class LoadBalancerExtension {
 	 * @param loadBalancer Load balancer instance: Mandatory.
 	 * @param statsReader  Statistics reader: If null statistics notifier thread will not be started.
 	 */
-	public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) {
-		this.loadBalancer = loadBalancer;
+	public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader,
+                                 TopologyProvider topologyProvider) {
+
+        this.loadBalancer = loadBalancer;
 		this.statsReader = statsReader;
-        this.topologyProvider = new TopologyProvider();
+        this.topologyProvider = topologyProvider;
 	}
 
 
@@ -82,7 +84,7 @@ public class LoadBalancerExtension {
 
 			if (statsReader != null) {
 				// Start stats notifier thread
-				statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
+				statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader, topologyProvider);
 				Thread statsNotifierThread = new Thread(statisticsNotifier);
 				statsNotifierThread.start();
 			} else {
@@ -158,7 +160,7 @@ public class LoadBalancerExtension {
 
                         // Configure load balancer
                         Topology topology = topologyProvider.getTopology();
-                        if(topologyInitialized(topology) && loadBalancer.configure(topology)) {
+                        if(topologyPopulated(topology) && loadBalancer.configure(topology)) {
                             // Start load balancer
                             loadBalancer.start();
                             loadBalancerStarted = true;
@@ -168,7 +170,7 @@ public class LoadBalancerExtension {
                     if (log.isErrorEnabled()) {
                         log.error("Could not start load balancer", e);
                     }
-                    terminate();
+                    stop();
                 }
             }
         });
@@ -205,11 +207,11 @@ public class LoadBalancerExtension {
 	}
 
     /**
-     * Returns true if topology has initialized
+     * Returns true if topology has populated
      * @param topology
      * @return
      */
-    private boolean topologyInitialized(Topology topology) {
+    private boolean topologyPopulated(Topology topology) {
         for(Service service : topology.getServices()) {
             for(Cluster cluster : service.getClusters()) {
                 if(cluster.getMembers().size() > 0) {
@@ -240,15 +242,27 @@ public class LoadBalancerExtension {
 	}
 
     /**
-     * Terminate event receivers and publishers.
+     * Stop load balancer instance.
      */
-	public void terminate() {
-		if (topologyEventReceiver != null) {
-			topologyEventReceiver.terminate();
-		}
-		if (statisticsNotifier != null) {
-			statisticsNotifier.terminate();
-		}
+	public void stop() {
+        try {
+            if (topologyEventReceiver != null) {
+                topologyEventReceiver.terminate();
+            }
+        } catch (Exception ignore) {
+        }
+
+        try {
+            if (statisticsNotifier != null) {
+                statisticsNotifier.terminate();
+            }
+        } catch (Exception ignore) {
+        }
+
+        try {
+            loadBalancer.stop();
+        } catch (Exception ignore) {
+        }
 	}
 
 	public ExecutorService getExecutorService() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index fed6e84..33735dc 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -147,7 +147,7 @@ public class LoadBalancerServiceComponent {
 
             if(configuration.isCepStatsPublisherEnabled()) {
                 // Start statistics notifier
-                startStatisticsNotifier();
+                startStatisticsNotifier(topologyProvider);
             }
 
             activated = true;
@@ -217,9 +217,10 @@ public class LoadBalancerServiceComponent {
         }
     }
 
-    private void startStatisticsNotifier() {
+    private void startStatisticsNotifier(TopologyProvider topologyProvider) {
         // Start stats notifier thread
-        statisticsNotifier = new LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance());
+        statisticsNotifier = new LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance(),
+                topologyProvider);
         Thread statsNotifierThread = new Thread(statisticsNotifier);
         statsNotifierThread.start();
         if (log.isInfoEnabled()) {

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
index a5239b4..2e5723f 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
@@ -21,6 +21,7 @@ package org.apache.stratos.load.balancer.statistics;
 import org.apache.commons.lang.StringUtils;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
 import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
 
 import java.util.Map;
@@ -93,7 +94,7 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe
         }
     }
 
-    public int getActiveInstancesCount(org.apache.stratos.messaging.domain.topology.Cluster cluster) {
+    public int getActiveInstancesCount(Cluster cluster) {
         return cluster.getMembers().size();
     }
 

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
index 8c0fe6a..18f005d 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
@@ -151,9 +151,8 @@ public class HAProxy implements LoadBalancer {
             }
         } catch (Exception e) {
             if (log.isErrorEnabled()) {
-                log.error("Could not stop haproxy instance");
+                log.error("Could not stop haproxy instance", e);
             }
-            throw new LoadBalancerExtensionException(e);
         }
     }
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
index 5dec4d3..3c2cf36 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
@@ -22,12 +22,12 @@ package org.apache.stratos.haproxy.extension;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.common.util.CommandUtils;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Member;
+import org.apache.stratos.load.balancer.common.domain.Port;
+import org.apache.stratos.load.balancer.common.domain.Service;
 import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Port;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
 
 import java.io.IOException;
 
@@ -35,14 +35,17 @@ import java.io.IOException;
  * HAProxy statistics reader.
  */
 public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
+
     private static final Log log = LogFactory.getLog(HAProxyStatisticsReader.class);
 
     private String scriptsPath;
     private String statsSocketFilePath;
+    private TopologyProvider topologyProvider;
 
-    public HAProxyStatisticsReader() {
+    public HAProxyStatisticsReader(TopologyProvider topologyProvider) {
         this.scriptsPath = HAProxyContext.getInstance().getScriptsPath();
         this.statsSocketFilePath = HAProxyContext.getInstance().getStatsSocketFilePath();
+        this.topologyProvider = topologyProvider;
     }
 
     @Override
@@ -51,7 +54,7 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
         String[] array;
         int totalWeight, weight;
 
-        for (Service service : TopologyManager.getTopology().getServices()) {
+        for (Service service : topologyProvider.getTopology().getServices()) {
             for (Cluster cluster : service.getClusters()) {
                 if (cluster.getClusterId().equals(clusterId)) {
                     totalWeight = 0;
@@ -63,25 +66,23 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
                         for(String hostname : cluster.getHostNames()) {
                             backendId = hostname+"-http-members";
                             for (Member member : cluster.getMembers()) {
-                                if(member.getNetworkPartitionId().equals(HAProxyContext.getInstance().getNetworkPartitionId())) {
-                                    // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
-                                    command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
-                                    try {
-                                        output = CommandUtils.executeCommand(command);
-                                        if ((output != null) && (output.length() > 0)) {
-                                            array = output.split(" ");
-                                            if ((array != null) && (array.length > 0)) {
-                                                weight = Integer.parseInt(array[0]);
-                                                if (log.isDebugEnabled()) {
-                                                    log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
-                                                }
-                                                totalWeight += weight;
+                                // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
+                                command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
+                                try {
+                                    output = CommandUtils.executeCommand(command);
+                                    if ((output != null) && (output.length() > 0)) {
+                                        array = output.split(" ");
+                                        if ((array != null) && (array.length > 0)) {
+                                            weight = Integer.parseInt(array[0]);
+                                            if (log.isDebugEnabled()) {
+                                                log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
                                             }
+                                            totalWeight += weight;
                                         }
-                                    } catch (IOException e) {
-                                        if (log.isErrorEnabled()) {
-                                            log.error(e);
-                                        }
+                                    }
+                                } catch (IOException e) {
+                                    if (log.isErrorEnabled()) {
+                                        log.error(e);
                                     }
                                 }
                             }

http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index c6ee22b..f56541d 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.log4j.PropertyConfigurator;
 import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
 import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension;
 
 import java.util.concurrent.ExecutorService;
@@ -44,12 +45,31 @@ public class Main {
 			if (log.isInfoEnabled()) {
 				log.info("HAProxy extension started");
 			}
+
+            // Add shutdown hook
+            final Thread mainThread = Thread.currentThread();
+            final LoadBalancerExtension finalExtension = extension;
+            Runtime.getRuntime().addShutdownHook(new Thread() {
+                public void run() {
+                    try {
+                        if(finalExtension != null) {
+                            log.info("Shutting haproxy instance...");
+                            finalExtension.stop();
+                        }
+                        mainThread.join();
+                    } catch (Exception e) {
+                        log.error(e);
+                    }
+                }
+            });
+
 			executorService = StratosThreadPool.getExecutorService("haproxy.extension.thread.pool", 10);
 			// Validate runtime parameters
 			HAProxyContext.getInstance().validate();
-			extension = new LoadBalancerExtension(new HAProxy(),
-			                                      (HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
-			                                       new HAProxyStatisticsReader() : null));
+            TopologyProvider topologyProvider = new TopologyProvider();
+            HAProxyStatisticsReader statisticsReader = HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
+                    new HAProxyStatisticsReader(topologyProvider) : null;
+            extension = new LoadBalancerExtension(new HAProxy(), statisticsReader, topologyProvider);
 			extension.setExecutorService(executorService);
 			extension.execute();
 		} catch (Exception e) {
@@ -57,7 +77,8 @@ public class Main {
 				log.error(e);
 			}
 			if (extension != null) {
-				extension.terminate();
+                log.info("Shutting haproxy instance...");
+				extension.stop();
 			}
 		}
 	}