You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/27 11:02:50 UTC

git commit: Fixed load balancer statistics publisher issue of not publishing statistics periodically when there are no incoming requests found

Updated Branches:
  refs/heads/master 5f9188065 -> 6db5968e4


Fixed load balancer statistics publisher issue of not publishing statistics periodically when there are no incoming requests found


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/6db5968e
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/6db5968e
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/6db5968e

Branch: refs/heads/master
Commit: 6db5968e4d0c9c58f3ebd7f03261f61da48f3b86
Parents: 5f91880
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri Dec 27 15:32:38 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri Dec 27 15:32:38 2013 +0530

----------------------------------------------------------------------
 .../LoadBalancerStatisticsReader.java           |  34 +++++
 .../LoadBalancerStatisticsNotifier.java         | 109 +++++++++++++++
 .../extension/api/LoadBalancerExtension.java    |  14 +-
 ...oadBalancerInFlightRequestCountNotifier.java | 101 --------------
 .../extension/api/LoadBalancerStatsReader.java  |  34 -----
 .../TenantAwareLoadBalanceEndpoint.java         |   4 +-
 .../internal/LoadBalancerServiceComponent.java  |  19 +++
 .../balancer/mediators/ResponseInterceptor.java |   4 +-
 ...adBalancerInFlightRequestCountCollector.java | 136 -------------------
 .../LoadBalancerStatisticsCollector.java        | 101 ++++++++++++++
 .../WSO2CEPInFlightRequestCountObserver.java    |  66 ---------
 .../extension/HAProxyStatisticsReader.java      |  96 +++++++++++++
 .../haproxy/extension/HAProxyStatsReader.java   |  96 -------------
 .../apache/stratos/haproxy/extension/Main.java  |   2 +-
 14 files changed, 372 insertions(+), 444 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
new file mode 100644
index 0000000..41e81e8
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics;
+
+import java.util.HashMap;
+
+/**
+ * Load balancer statistics reader interface.
+ */
+public interface LoadBalancerStatisticsReader {
+
+    /**
+     * Get in-flight request count of a given cluster.
+     * @param clusterId
+     */
+    int getInFlightRequestCount(String clusterId);
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
new file mode 100644
index 0000000..88cfd4d
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -0,0 +1,109 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.common.statistics.notifier;
+
+import org.apache.commons.lang3.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
+import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Load balancer statistics notifier thread for publishing statistics periodically to CEP.
+ */
+public class LoadBalancerStatisticsNotifier implements Runnable {
+    private static final Log log = LogFactory.getLog(LoadBalancerStatisticsNotifier.class);
+
+    private final LoadBalancerStatisticsReader statsReader;
+    private final WSO2CEPInFlightRequestPublisher inFlightRequestPublisher;
+    private long statsPublisherInterval = 15000;
+    private String networkPartitionId;
+    private boolean terminated;
+
+    public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader) {
+        this.statsReader = statsReader;
+        this.inFlightRequestPublisher = new WSO2CEPInFlightRequestPublisher();
+
+        String interval = System.getProperty("stats.notifier.interval");
+        if (interval != null) {
+            statsPublisherInterval = Long.getLong(interval);
+        }
+        if(log.isDebugEnabled()) {
+            log.debug(String.format("stats.notifier.interval: %dms", statsPublisherInterval));
+        }
+
+        networkPartitionId = System.getProperty("network.partition.id");
+        if (StringUtils.isBlank(networkPartitionId)) {
+            throw new RuntimeException("network.partition.id system property was not found.");
+        }
+    }
+
+    @Override
+    public void run() {
+        while (!terminated) {
+            try {
+                try {
+                    Thread.sleep(statsPublisherInterval);
+                } catch (InterruptedException ignore) {
+                }
+
+                if (log.isDebugEnabled()) {
+                    log.debug("Publishing load balancer statistics");
+                }
+                if (inFlightRequestPublisher.isEnabled()) {
+                    try {
+                        TopologyManager.acquireReadLock();
+                        int requestCount;
+                        for (Service service : TopologyManager.getTopology().getServices()) {
+                            for (Cluster cluster : service.getClusters()) {
+                                // Publish in-flight request count of load balancer's network partition
+                                requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
+                                inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount);
+                                if (log.isDebugEnabled()) {
+                                    log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
+                                            cluster.getClusterId(), networkPartitionId, requestCount));
+                                }
+                            }
+
+                        }
+                    } finally {
+                        TopologyManager.releaseReadLock();
+                    }
+                } else if (log.isWarnEnabled()) {
+                    log.warn("In-flight request count publisher is disabled");
+                }
+            } catch (Exception e) {
+                if (log.isErrorEnabled()) {
+                    log.error("Could not publish load balancer statistics", e);
+                }
+            }
+        }
+    }
+
+    /**
+     * Terminate load balancer statistics notifier thread.
+     */
+    public void terminate() {
+        terminated = true;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 0e602c7..9fe5107 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -21,6 +21,8 @@ package org.apache.stratos.load.balancer.extension.api;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
+import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.listener.topology.*;
 import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
@@ -36,13 +38,13 @@ public class LoadBalancerExtension implements Runnable {
     private static final Log log = LogFactory.getLog(LoadBalancerExtension.class);
 
     private LoadBalancer loadBalancer;
-    private LoadBalancerStatsReader statsReader;
+    private LoadBalancerStatisticsReader statsReader;
     private boolean loadBalancerStarted;
     private TopologyReceiver topologyReceiver;
-    private LoadBalancerInFlightRequestCountNotifier inFlightRequestCountNotifier;
+    private LoadBalancerStatisticsNotifier statisticsNotifier;
     private boolean terminated;
 
-    public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatsReader statsReader) {
+    public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) {
         this.loadBalancer = loadBalancer;
         this.statsReader = statsReader;
     }
@@ -60,8 +62,8 @@ public class LoadBalancerExtension implements Runnable {
             topologyReceiverThread.start();
 
             // Start stats notifier thread
-            inFlightRequestCountNotifier = new LoadBalancerInFlightRequestCountNotifier(statsReader);
-            Thread statsNotifierThread = new Thread(inFlightRequestCountNotifier);
+            statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
+            Thread statsNotifierThread = new Thread(statisticsNotifier);
             statsNotifierThread.start();
 
             // Keep the thread live until terminated
@@ -146,7 +148,7 @@ public class LoadBalancerExtension implements Runnable {
 
     public void terminate() {
         topologyReceiver.terminate();
-        inFlightRequestCountNotifier.terminate();
+        statisticsNotifier.terminate();
         terminated = true;
     }
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
deleted file mode 100644
index c5bf52b..0000000
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerInFlightRequestCountNotifier.java
+++ /dev/null
@@ -1,101 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.extension.api;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-/**
- * Load balancer statistics notifier thread for publishing statistics periodically to CEP.
- */
-public class LoadBalancerInFlightRequestCountNotifier implements Runnable {
-    private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountNotifier.class);
-
-    private LoadBalancerStatsReader statsReader;
-    private final WSO2CEPInFlightRequestPublisher statsPublisher;
-    private long statsPublisherInterval = 15000;
-    private String networkPartitionId;
-    private boolean terminated;
-
-    public LoadBalancerInFlightRequestCountNotifier(LoadBalancerStatsReader statsReader) {
-        this.statsReader = statsReader;
-        this.statsPublisher = new WSO2CEPInFlightRequestPublisher();
-
-        String interval = System.getProperty("stats.notifier.interval");
-        if (interval != null) {
-            statsPublisherInterval = Long.getLong(interval);
-        }
-        networkPartitionId = System.getProperty("network.partition.id");
-        if (StringUtils.isBlank(networkPartitionId)) {
-            throw new RuntimeException("network.partition.id system property was not found.");
-        }
-    }
-
-    @Override
-    public void run() {
-        while (!terminated) {
-            try {
-                try {
-                    Thread.sleep(statsPublisherInterval);
-                } catch (InterruptedException ignore) {
-                }
-
-                if (statsPublisher.isEnabled()) {
-                    try {
-                        TopologyManager.acquireReadLock();
-                        int requestCount;
-                        for (Service service : TopologyManager.getTopology().getServices()) {
-                            for (Cluster cluster : service.getClusters()) {
-                                // Publish in-flight request count of load balancer's network partition
-                                requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
-                                statsPublisher.publish(cluster.getClusterId(), networkPartitionId, requestCount);
-                                if (log.isDebugEnabled()) {
-                                    log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
-                                            cluster.getClusterId(), networkPartitionId, requestCount));
-                                }
-                            }
-
-                        }
-                    } finally {
-                        TopologyManager.releaseReadLock();
-                    }
-                } else if (log.isWarnEnabled()) {
-                    log.warn("CEP statistics publisher is disabled");
-                }
-            } catch (Exception e) {
-                if (log.isErrorEnabled()) {
-                    log.error("Could not publish in-flight request count", e);
-                }
-            }
-        }
-    }
-
-    /**
-     * Terminate load balancer statistics notifier thread.
-     */
-    public void terminate() {
-        terminated = true;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
deleted file mode 100644
index 2c6f324..0000000
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerStatsReader.java
+++ /dev/null
@@ -1,34 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer.extension.api;
-
-import java.util.HashMap;
-
-/**
- * Load balancer statistics reader interface.
- */
-public interface LoadBalancerStatsReader {
-
-    /**
-     * Get in-flight request count of a given cluster.
-     * @param clusterId
-     */
-    int getInFlightRequestCount(String clusterId);
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index b6fe08b..efc88c9 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -27,7 +27,7 @@ import org.apache.stratos.load.balancer.RequestDelegator;
 import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
 import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
 import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
-import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector;
+import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.stratos.messaging.domain.tenant.Tenant;
 import org.apache.stratos.messaging.domain.topology.Member;
@@ -526,7 +526,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
             if(StringUtils.isBlank(clusterId)) {
                 throw new RuntimeException("Cluster id not found in message context");
             }
-            LoadBalancerInFlightRequestCountCollector.getInstance().incrementInFlightRequestCount(clusterId);
+            LoadBalancerStatisticsCollector.getInstance().incrementInFlightRequestCount(clusterId);
         }
         catch (Exception e) {
             if(log.isDebugEnabled()) {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 2d11458..da7f3de 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -27,12 +27,15 @@ import org.apache.stratos.load.balancer.EndpointDeployer;
 import org.apache.stratos.load.balancer.LoadBalancerTenantReceiver;
 import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver;
 import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
+import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
 import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
 import org.apache.stratos.load.balancer.conf.configurator.CEPConfigurator;
 import org.apache.stratos.load.balancer.conf.configurator.JndiConfigurator;
 import org.apache.stratos.load.balancer.conf.configurator.SynapseConfigurator;
 import org.apache.stratos.load.balancer.conf.configurator.TopologyFilterConfigurator;
 import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
@@ -100,6 +103,7 @@ public class LoadBalancerServiceComponent {
     private boolean activated = false;
     private LoadBalancerTopologyReceiver topologyReceiver;
     private LoadBalancerTenantReceiver tenantReceiver;
+    private LoadBalancerStatisticsNotifier statisticsNotifier;
 
     protected void activate(ComponentContext ctxt) {
         try {
@@ -179,6 +183,19 @@ public class LoadBalancerServiceComponent {
                 }
             }
 
+            if(configuration.isCepStatsPublisherEnabled()) {
+                // Get stats reader
+                LoadBalancerStatisticsReader statsReader = LoadBalancerStatisticsCollector.getInstance();
+
+                // Start stats notifier thread
+                statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
+                Thread statsNotifierThread = new Thread(statisticsNotifier);
+                statsNotifierThread.start();
+                if (log.isInfoEnabled()) {
+                    log.info("Load balancer statistics notifier thread started");
+                }
+            }
+
             activated = true;
             if (log.isInfoEnabled()) {
                 log.info("Load balancer service component is activated ");
@@ -206,6 +223,8 @@ public class LoadBalancerServiceComponent {
         tenantReceiver.terminate();
         // Terminate topology receiver
         topologyReceiver.terminate();
+        // Terminate statistics notifier
+        statisticsNotifier.terminate();
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
index fef3fea..cf3e768 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/mediators/ResponseInterceptor.java
@@ -19,7 +19,7 @@
 package org.apache.stratos.load.balancer.mediators;
 
 import org.apache.commons.lang3.StringUtils;
-import org.apache.stratos.load.balancer.statistics.LoadBalancerInFlightRequestCountCollector;
+import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsCollector;
 import org.apache.stratos.load.balancer.util.Constants;
 import org.apache.synapse.ManagedLifecycle;
 import org.apache.synapse.MessageContext;
@@ -40,7 +40,7 @@ public class ResponseInterceptor extends AbstractMediator implements ManagedLife
             if (StringUtils.isBlank(clusterId)) {
                 throw new RuntimeException("Cluster id not found in message context");
             }
-            LoadBalancerInFlightRequestCountCollector.getInstance().decrementInFlightRequestCount(clusterId);
+            LoadBalancerStatisticsCollector.getInstance().decrementInFlightRequestCount(clusterId);
         } catch (Exception e) {
             if(log.isErrorEnabled()) {
                 log.error("Could not decrement in-flight request count", e);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
deleted file mode 100644
index 3c8927a..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
+++ /dev/null
@@ -1,136 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.statistics;
-
-import org.apache.commons.lang.StringUtils;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver;
-
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Observable;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * This is the load balancing in-flight request count collector and any observer can get registered here
- * and receive notifications periodically.
- * This is a Singleton object.
- *
- * @author nirmal
- */
-public class LoadBalancerInFlightRequestCountCollector extends Observable {
-    private static final Log log = LogFactory.getLog(LoadBalancerInFlightRequestCountCollector.class);
-
-    private static LoadBalancerInFlightRequestCountCollector collector;
-    // Map<ClusterId, Map<PartitionId, InFlightRequestCount>
-    private Map<String, Integer> inFlightRequestCountMap;
-    private Thread notifier;
-
-    private LoadBalancerInFlightRequestCountCollector() {
-        inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>();
-        if (notifier == null || (notifier != null && !notifier.isAlive())) {
-            notifier = new Thread(new ObserverNotifier());
-            notifier.start();
-        }
-    }
-
-    public static LoadBalancerInFlightRequestCountCollector getInstance() {
-        if (collector == null) {
-            synchronized (LoadBalancerInFlightRequestCountCollector.class) {
-                if (collector == null) {
-                    collector = new LoadBalancerInFlightRequestCountCollector();
-                    // add observers
-                    collector.addObserver(new WSO2CEPInFlightRequestCountObserver());
-                }
-            }
-        }
-        return collector;
-    }
-
-    public int getInFlightRequestCount(String clusterId) {
-        if (inFlightRequestCountMap.containsKey(clusterId)) {
-            return inFlightRequestCountMap.get(clusterId);
-        }
-        return 0;
-    }
-
-    public void setInFlightRequestCount(String clusterId, int value) {
-        if (StringUtils.isBlank(clusterId)) {
-            return;
-        }
-
-        inFlightRequestCountMap.put(clusterId, value);
-        if(log.isDebugEnabled()) {
-            log.debug(String.format("In-flight request count updated: [cluster] %s [value] %d", clusterId, value));
-        }
-        setChanged();
-    }
-
-    public void incrementInFlightRequestCount(String clusterId) {
-        incrementInFlightRequestCount(clusterId, 1);
-    }
-
-    public void incrementInFlightRequestCount(String clusterId, int value) {
-        if (StringUtils.isBlank(clusterId)) {
-            return;
-        }
-
-        int count = getInFlightRequestCount(clusterId);
-        setInFlightRequestCount(clusterId, (count + value));
-    }
-
-    public void decrementInFlightRequestCount(String clusterId) {
-        decrementInFlightRequestCount(clusterId, 1);
-    }
-
-    public void decrementInFlightRequestCount(String clusterId, int value) {
-        if (StringUtils.isBlank(clusterId)) {
-            return;
-        }
-
-        int count = getInFlightRequestCount(clusterId);
-        int newValue = (count - value) < 0 ? 0 : (count - value);
-        setInFlightRequestCount(clusterId, newValue);
-    }
-
-
-    /**
-     * This thread will notify all the observers of this subject.
-     *
-     * @author nirmal
-     */
-    private class ObserverNotifier implements Runnable {
-
-        @Override
-        public void run() {
-            if (log.isInfoEnabled()) {
-                log.info("Load balancing statistics notifier thread started");
-            }
-            while (true) {
-                try {
-                    Thread.sleep(15000);
-                } catch (InterruptedException ignore) {
-                }
-                LoadBalancerInFlightRequestCountCollector.getInstance().notifyObservers(new HashMap<String, Integer>(inFlightRequestCountMap));
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
new file mode 100644
index 0000000..72186fe
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
@@ -0,0 +1,101 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one 
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *  http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
+ * KIND, either express or implied.  See the License for the 
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.load.balancer.statistics;
+
+import org.apache.commons.lang.StringUtils;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
+
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * This is the load balancer statistics collector.
+ */
+public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsReader {
+    private static final Log log = LogFactory.getLog(LoadBalancerStatisticsCollector.class);
+
+    private static volatile LoadBalancerStatisticsCollector instance;
+    // Map<ClusterId, Map<PartitionId, InFlightRequestCount>
+    private Map<String, Integer> inFlightRequestCountMap;
+
+    private LoadBalancerStatisticsCollector() {
+        inFlightRequestCountMap = new ConcurrentHashMap<String, Integer>();
+    }
+
+    public static LoadBalancerStatisticsCollector getInstance() {
+        if (instance == null) {
+            synchronized (LoadBalancerStatisticsCollector.class) {
+                if (instance == null) {
+                    if(log.isDebugEnabled()) {
+                        log.debug("Load balancer in-flight request count collector instance created");
+                    }
+                    instance = new LoadBalancerStatisticsCollector();
+                }
+            }
+        }
+        return instance;
+    }
+
+    public int getInFlightRequestCount(String clusterId) {
+        if (inFlightRequestCountMap.containsKey(clusterId)) {
+            return inFlightRequestCountMap.get(clusterId);
+        }
+        return 0;
+    }
+
+    public void setInFlightRequestCount(String clusterId, int value) {
+        if (StringUtils.isBlank(clusterId)) {
+            return;
+        }
+
+        inFlightRequestCountMap.put(clusterId, value);
+        if (log.isDebugEnabled()) {
+            log.debug(String.format("In-flight request count updated: [cluster] %s [value] %d", clusterId, value));
+        }
+    }
+
+    public void incrementInFlightRequestCount(String clusterId) {
+        incrementInFlightRequestCount(clusterId, 1);
+    }
+
+    public void incrementInFlightRequestCount(String clusterId, int value) {
+        if (StringUtils.isBlank(clusterId)) {
+            return;
+        }
+
+        int count = getInFlightRequestCount(clusterId);
+        setInFlightRequestCount(clusterId, (count + value));
+    }
+
+    public void decrementInFlightRequestCount(String clusterId) {
+        decrementInFlightRequestCount(clusterId, 1);
+    }
+
+    public void decrementInFlightRequestCount(String clusterId, int value) {
+        if (StringUtils.isBlank(clusterId)) {
+            return;
+        }
+
+        int count = getInFlightRequestCount(clusterId);
+        int newValue = (count - value) < 0 ? 0 : (count - value);
+        setInFlightRequestCount(clusterId, newValue);
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
deleted file mode 100644
index 47a2602..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
+++ /dev/null
@@ -1,66 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one 
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- * 
- *  http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY 
- * KIND, either express or implied.  See the License for the 
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.load.balancer.statistics.observers;
-
-import org.apache.commons.lang3.StringUtils;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.common.statistics.publisher.WSO2CEPInFlightRequestPublisher;
-
-import java.util.Map;
-import java.util.Observable;
-import java.util.Observer;
-
-public class WSO2CEPInFlightRequestCountObserver implements Observer {
-    private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestCountObserver.class);
-    private WSO2CEPInFlightRequestPublisher publisher;
-    private String networkPartitionId;
-
-    public WSO2CEPInFlightRequestCountObserver() {
-        this.publisher = new WSO2CEPInFlightRequestPublisher();
-        networkPartitionId = System.getProperty("network.partition.id");
-        if (StringUtils.isBlank(networkPartitionId)) {
-            throw new RuntimeException("network.partition.id system property was not found.");
-        }
-    }
-
-    public void update(Observable observable, Object object) {
-        try {
-            if (publisher.isEnabled()) {
-                // Map<ClusterId, Count>
-                Map<String, Integer> inFlightRequestCountMap = (Map<String, Integer>) object;
-                // Publish event per cluster id
-                for (String clusterId : inFlightRequestCountMap.keySet()) {
-                    // Publish event
-                    publisher.publish(clusterId, networkPartitionId, inFlightRequestCountMap.get(clusterId));
-                    if (log.isDebugEnabled()) {
-                        log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
-                                clusterId, networkPartitionId, inFlightRequestCountMap.get(clusterId)));
-                    }
-                }
-            } else if (log.isWarnEnabled()) {
-                log.warn("CEP statistics publisher is disabled");
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not publish in-flight request count to cep", e);
-            }
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
new file mode 100644
index 0000000..24fc423
--- /dev/null
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
@@ -0,0 +1,96 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.haproxy.extension;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Port;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.io.IOException;
+
+/**
+ * HAProxy statistics reader.
+ */
+public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
+    private static final Log log = LogFactory.getLog(HAProxyStatisticsReader.class);
+
+    private String scriptsPath;
+    private String statsSocketFilePath;
+
+    public HAProxyStatisticsReader() {
+        this.scriptsPath = HAProxyContext.getInstance().getScriptsPath();
+        this.statsSocketFilePath = HAProxyContext.getInstance().getStatsSocketFilePath();
+    }
+
+    @Override
+    public int getInFlightRequestCount(String clusterId) {
+        String frontendId, backendId, command, output;
+        String[] array;
+        int totalWeight, weight;
+
+        for (Service service : TopologyManager.getTopology().getServices()) {
+            for (Cluster cluster : service.getClusters()) {
+                if (cluster.getClusterId().equals(clusterId)) {
+                    totalWeight = 0;
+                    if ((service.getPorts() == null) || (service.getPorts().size() == 0)) {
+                        throw new RuntimeException(String.format("No ports found in service: %s", service.getServiceName()));
+                    }
+
+                    for (Port port : service.getPorts()) {
+                        frontendId = cluster.getClusterId() + "-proxy-" + port.getProxy();
+                        backendId = frontendId + "-members";
+
+                        for (Member member : cluster.getMembers()) {
+                            // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
+                            command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
+                            try {
+                                output = CommandUtil.executeCommand(command);
+                                if ((output != null) && (output.length() > 0)) {
+                                    array = output.split(" ");
+                                    if ((array != null) && (array.length > 0)) {
+                                        weight = Integer.parseInt(array[0]);
+                                        if (log.isDebugEnabled()) {
+                                            log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
+                                        }
+                                        totalWeight += weight;
+                                    }
+                                }
+                            } catch (IOException e) {
+                                if (log.isErrorEnabled()) {
+                                    log.error(e);
+                                }
+                            }
+                        }
+                    }
+                    if (log.isInfoEnabled()) {
+                        log.info(String.format("Cluster weight found: [cluster] %s [weight] %d", cluster.getClusterId(), totalWeight));
+                    }
+                    return totalWeight;
+                }
+            }
+        }
+        return 0;
+    }
+}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
deleted file mode 100644
index 57c6bc0..0000000
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatsReader.java
+++ /dev/null
@@ -1,96 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied.  See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.haproxy.extension;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.extension.api.LoadBalancerStatsReader;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Port;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-import java.io.IOException;
-
-/**
- * HAProxy statistics reader.
- */
-public class HAProxyStatsReader implements LoadBalancerStatsReader {
-    private static final Log log = LogFactory.getLog(HAProxyStatsReader.class);
-
-    private String scriptsPath;
-    private String statsSocketFilePath;
-
-    public HAProxyStatsReader() {
-        this.scriptsPath = HAProxyContext.getInstance().getScriptsPath();
-        this.statsSocketFilePath = HAProxyContext.getInstance().getStatsSocketFilePath();
-    }
-
-    @Override
-    public int getInFlightRequestCount(String clusterId) {
-        String frontendId, backendId, command, output;
-        String[] array;
-        int totalWeight, weight;
-
-        for (Service service : TopologyManager.getTopology().getServices()) {
-            for (Cluster cluster : service.getClusters()) {
-                if (cluster.getClusterId().equals(clusterId)) {
-                    totalWeight = 0;
-                    if ((service.getPorts() == null) || (service.getPorts().size() == 0)) {
-                        throw new RuntimeException(String.format("No ports found in service: %s", service.getServiceName()));
-                    }
-
-                    for (Port port : service.getPorts()) {
-                        frontendId = cluster.getClusterId() + "-proxy-" + port.getProxy();
-                        backendId = frontendId + "-members";
-
-                        for (Member member : cluster.getMembers()) {
-                            // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
-                            command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
-                            try {
-                                output = CommandUtil.executeCommand(command);
-                                if ((output != null) && (output.length() > 0)) {
-                                    array = output.split(" ");
-                                    if ((array != null) && (array.length > 0)) {
-                                        weight = Integer.parseInt(array[0]);
-                                        if (log.isDebugEnabled()) {
-                                            log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
-                                        }
-                                        totalWeight += weight;
-                                    }
-                                }
-                            } catch (IOException e) {
-                                if (log.isErrorEnabled()) {
-                                    log.error(e);
-                                }
-                            }
-                        }
-                    }
-                    if (log.isInfoEnabled()) {
-                        log.info(String.format("Cluster weight found: [cluster] %s [weight] %d", cluster.getClusterId(), totalWeight));
-                    }
-                    return totalWeight;
-                }
-            }
-        }
-        return 0;
-    }
-}

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/6db5968e/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index 176da20..d2a8731 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -43,7 +43,7 @@ public class Main {
 
             // Validate runtime parameters
             HAProxyContext.getInstance().validate();
-            extension = new LoadBalancerExtension(new HAProxy(), new HAProxyStatsReader());
+            extension = new LoadBalancerExtension(new HAProxy(), new HAProxyStatisticsReader());
             Thread thread = new Thread(extension);
             thread.start();
         } catch (Exception e) {