You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/03/08 05:38:51 UTC
stratos git commit: Updating load balancer statistics reader to user
topology provider
Repository: stratos
Updated Branches:
refs/heads/master 5c502b266 -> de00e1fa3
Updating load balancer statistics reader to user topology provider
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/de00e1fa
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/de00e1fa
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/de00e1fa
Branch: refs/heads/master
Commit: de00e1fa3b76505a469f6536c8d0fac2bfe57b87
Parents: 5c502b2
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sun Mar 8 10:08:23 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sun Mar 8 10:08:45 2015 +0530
----------------------------------------------------------------------
.../impl/CloudControllerServiceImpl.java | 12 +----
.../load/balancer/common/domain/Service.java | 8 ++-
...LoadBalancerCommonTopologyEventReceiver.java | 14 +++--
.../LoadBalancerStatisticsReader.java | 2 +-
.../LoadBalancerStatisticsNotifier.java | 56 +++++++++-----------
.../extension/api/LoadBalancerExtension.java | 46 ++++++++++------
.../internal/LoadBalancerServiceComponent.java | 7 +--
.../LoadBalancerStatisticsCollector.java | 3 +-
.../stratos/haproxy/extension/HAProxy.java | 3 +-
.../extension/HAProxyStatisticsReader.java | 49 ++++++++---------
.../apache/stratos/haproxy/extension/Main.java | 29 ++++++++--
11 files changed, 130 insertions(+), 99 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
index 31eca08..d351134 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/services/impl/CloudControllerServiceImpl.java
@@ -118,7 +118,7 @@ public class CloudControllerServiceImpl implements CloudControllerService {
handleNullObject(cartridgeConfig, "Cartridge definition is null");
if (log.isInfoEnabled()) {
- log.info("Adding cartridge: [cartridge-type] " + cartridgeConfig.getType());
+ log.info("Updating cartridge: [cartridge-type] " + cartridgeConfig.getType());
}
if (log.isDebugEnabled()) {
log.debug("Cartridge definition: " + cartridgeConfig.toString());
@@ -133,12 +133,9 @@ public class CloudControllerServiceImpl implements CloudControllerService {
throw new InvalidCartridgeDefinitionException(msg, e);
}
- // TODO transaction begins
String cartridgeType = cartridge.getType();
- // Undeploy if already deployed
if (cloudControllerContext.getCartridge(cartridgeType) != null) {
Cartridge cartridgeToBeRemoved = cloudControllerContext.getCartridge(cartridgeType);
- // undeploy
try {
removeCartridge(cartridgeToBeRemoved.getType());
} catch (InvalidCartridgeTypeException ignore) {
@@ -152,15 +149,10 @@ public class CloudControllerServiceImpl implements CloudControllerService {
// Add cartridge to the cloud controller context and persist
CloudControllerContext.getInstance().addCartridge(cartridge);
CloudControllerContext.getInstance().persist();
-
- List<Cartridge> cartridgeList = new ArrayList<Cartridge>();
- cartridgeList.add(cartridge);
-
- TopologyBuilder.handleServiceCreated(cartridgeList);
// transaction ends
if (log.isInfoEnabled()) {
- log.info("Successfully added cartridge: [cartridge-type] " + cartridgeType);
+ log.info("Successfully updated cartridge: [cartridge-type] " + cartridgeType);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
index 116078e..f94e7f8 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/domain/Service.java
@@ -89,12 +89,16 @@ public class Service {
public void addPorts(Collection<Port> ports) {
for(Port port : ports) {
- addPort(port);
+ if(!portExists(port)) {
+ addPort(port);
+ }
}
}
public void removePort(Port port) {
- this.portMap.remove(port.getProxy());
+ if(portExists(port)) {
+ this.portMap.remove(port.getProxy());
+ }
}
public boolean portExists(Port port) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
index 8582f3c..88be323 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/event/receivers/LoadBalancerCommonTopologyEventReceiver.java
@@ -293,7 +293,16 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
}
return;
}
- topologyProvider.addMember(transformMember(member));
+
+ org.apache.stratos.load.balancer.common.domain.Member lbMember = transformMember(member);
+ org.apache.stratos.load.balancer.common.domain.Service lbService = topologyProvider.getTopology().
+ getService(serviceName);
+ if(lbService == null) {
+ log.warn(String.format("Service not found: %s", serviceName));
+ return;
+ }
+ lbService.addPorts(lbMember.getPorts());
+ topologyProvider.addMember(lbMember);
}
/**
@@ -347,9 +356,6 @@ public class LoadBalancerCommonTopologyEventReceiver extends TopologyEventReceiv
private org.apache.stratos.load.balancer.common.domain.Service transformService(Service messagingService) {
org.apache.stratos.load.balancer.common.domain.Service service =
new org.apache.stratos.load.balancer.common.domain.Service(messagingService.getServiceName());
- for(Port port : messagingService.getPorts()) {
- service.addPort(transformPort(port));
- }
return service;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
index c0016fe..c9d2556 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/LoadBalancerStatisticsReader.java
@@ -19,7 +19,7 @@
package org.apache.stratos.load.balancer.common.statistics;
-import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
/**
* Load balancer statistics reader interface.
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
index 9c4cb88..52f98be 100644
--- a/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
+++ b/components/org.apache.stratos.load.balancer.common/src/main/java/org/apache/stratos/load/balancer/common/statistics/notifier/LoadBalancerStatisticsNotifier.java
@@ -25,10 +25,10 @@ import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisher;
import org.apache.stratos.common.statistics.publisher.InFlightRequestPublisherFactory;
import org.apache.stratos.common.statistics.publisher.StatisticsPublisherType;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Service;
import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
/**
* Load balancer statistics notifier thread for publishing statistics periodically to CEP.
@@ -37,13 +37,15 @@ public class LoadBalancerStatisticsNotifier implements Runnable {
private static final Log log = LogFactory.getLog(LoadBalancerStatisticsNotifier.class);
private final LoadBalancerStatisticsReader statsReader;
+ private final TopologyProvider topologyProvider;
private final InFlightRequestPublisher inFlightRequestPublisher;
private long statsPublisherInterval = 15000;
private String networkPartitionId;
private boolean terminated;
- public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader) {
+ public LoadBalancerStatisticsNotifier(LoadBalancerStatisticsReader statsReader, TopologyProvider topologyProvider) {
this.statsReader = statsReader;
+ this.topologyProvider = topologyProvider;
this.inFlightRequestPublisher = InFlightRequestPublisherFactory.createInFlightRequestPublisher(
StatisticsPublisherType.WSO2CEP);
@@ -74,36 +76,26 @@ public class LoadBalancerStatisticsNotifier implements Runnable {
log.debug("Publishing load balancer statistics");
}
if (inFlightRequestPublisher.isEnabled()) {
- try {
- TopologyManager.acquireReadLock();
- int requestCount;
- int servedRequestCount;
- int activeInstancesCount;
- for (Service service : TopologyManager.getTopology().getServices()) {
- for (Cluster cluster : service.getClusters()) {
- if (!cluster.isLbCluster()) {
- // Publish in-flight request count of load balancer's network partition
- requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
- servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId());
- if(requestCount == 0) {
- servedRequestCount = 0;
- }
- activeInstancesCount = statsReader.getActiveInstancesCount(cluster);
- inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId,activeInstancesCount, requestCount, servedRequestCount);
- log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ",
- cluster.getClusterId(), networkPartitionId, servedRequestCount , activeInstancesCount ,requestCount ));
- if (log.isDebugEnabled()) {
- log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
- cluster.getClusterId(), networkPartitionId, requestCount));
- }
- }
- else {
- // Load balancer cluster found in topology; we do not need to publish request counts for them.
- }
+ int requestCount;
+ int servedRequestCount;
+ int activeInstancesCount;
+ for (Service service : topologyProvider.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ // Publish in-flight request count of load balancer's network partition
+ requestCount = statsReader.getInFlightRequestCount(cluster.getClusterId());
+ servedRequestCount = statsReader.getServedRequestCount(cluster.getClusterId());
+ if (requestCount == 0) {
+ servedRequestCount = 0;
+ }
+ activeInstancesCount = statsReader.getActiveInstancesCount(cluster);
+ inFlightRequestPublisher.publish(cluster.getClusterId(), networkPartitionId, activeInstancesCount, requestCount, servedRequestCount);
+ log.info(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d [active instances] %d [RIF] %d ",
+ cluster.getClusterId(), networkPartitionId, servedRequestCount, activeInstancesCount, requestCount));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("In-flight request count published to cep: [cluster-id] %s [network-partition] %s [value] %d",
+ cluster.getClusterId(), networkPartitionId, requestCount));
}
}
- } finally {
- TopologyManager.releaseReadLock();
}
} else if (log.isWarnEnabled()) {
log.warn("In-flight request count publisher is disabled");
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 2459b31..ef2dc5b 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -62,10 +62,12 @@ public class LoadBalancerExtension {
* @param loadBalancer Load balancer instance: Mandatory.
* @param statsReader Statistics reader: If null statistics notifier thread will not be started.
*/
- public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) {
- this.loadBalancer = loadBalancer;
+ public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader,
+ TopologyProvider topologyProvider) {
+
+ this.loadBalancer = loadBalancer;
this.statsReader = statsReader;
- this.topologyProvider = new TopologyProvider();
+ this.topologyProvider = topologyProvider;
}
@@ -82,7 +84,7 @@ public class LoadBalancerExtension {
if (statsReader != null) {
// Start stats notifier thread
- statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
+ statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader, topologyProvider);
Thread statsNotifierThread = new Thread(statisticsNotifier);
statsNotifierThread.start();
} else {
@@ -158,7 +160,7 @@ public class LoadBalancerExtension {
// Configure load balancer
Topology topology = topologyProvider.getTopology();
- if(topologyInitialized(topology) && loadBalancer.configure(topology)) {
+ if(topologyPopulated(topology) && loadBalancer.configure(topology)) {
// Start load balancer
loadBalancer.start();
loadBalancerStarted = true;
@@ -168,7 +170,7 @@ public class LoadBalancerExtension {
if (log.isErrorEnabled()) {
log.error("Could not start load balancer", e);
}
- terminate();
+ stop();
}
}
});
@@ -205,11 +207,11 @@ public class LoadBalancerExtension {
}
/**
- * Returns true if topology has initialized
+ * Returns true if topology has populated
* @param topology
* @return
*/
- private boolean topologyInitialized(Topology topology) {
+ private boolean topologyPopulated(Topology topology) {
for(Service service : topology.getServices()) {
for(Cluster cluster : service.getClusters()) {
if(cluster.getMembers().size() > 0) {
@@ -240,15 +242,27 @@ public class LoadBalancerExtension {
}
/**
- * Terminate event receivers and publishers.
+ * Stop load balancer instance.
*/
- public void terminate() {
- if (topologyEventReceiver != null) {
- topologyEventReceiver.terminate();
- }
- if (statisticsNotifier != null) {
- statisticsNotifier.terminate();
- }
+ public void stop() {
+ try {
+ if (topologyEventReceiver != null) {
+ topologyEventReceiver.terminate();
+ }
+ } catch (Exception ignore) {
+ }
+
+ try {
+ if (statisticsNotifier != null) {
+ statisticsNotifier.terminate();
+ }
+ } catch (Exception ignore) {
+ }
+
+ try {
+ loadBalancer.stop();
+ } catch (Exception ignore) {
+ }
}
public ExecutorService getExecutorService() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index fed6e84..33735dc 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -147,7 +147,7 @@ public class LoadBalancerServiceComponent {
if(configuration.isCepStatsPublisherEnabled()) {
// Start statistics notifier
- startStatisticsNotifier();
+ startStatisticsNotifier(topologyProvider);
}
activated = true;
@@ -217,9 +217,10 @@ public class LoadBalancerServiceComponent {
}
}
- private void startStatisticsNotifier() {
+ private void startStatisticsNotifier(TopologyProvider topologyProvider) {
// Start stats notifier thread
- statisticsNotifier = new LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance());
+ statisticsNotifier = new LoadBalancerStatisticsNotifier(LoadBalancerStatisticsCollector.getInstance(),
+ topologyProvider);
Thread statsNotifierThread = new Thread(statisticsNotifier);
statsNotifierThread.start();
if (log.isInfoEnabled()) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
index a5239b4..2e5723f 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerStatisticsCollector.java
@@ -21,6 +21,7 @@ package org.apache.stratos.load.balancer.statistics;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
import java.util.Map;
@@ -93,7 +94,7 @@ public class LoadBalancerStatisticsCollector implements LoadBalancerStatisticsRe
}
}
- public int getActiveInstancesCount(org.apache.stratos.messaging.domain.topology.Cluster cluster) {
+ public int getActiveInstancesCount(Cluster cluster) {
return cluster.getMembers().size();
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
index 8c0fe6a..18f005d 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxy.java
@@ -151,9 +151,8 @@ public class HAProxy implements LoadBalancer {
}
} catch (Exception e) {
if (log.isErrorEnabled()) {
- log.error("Could not stop haproxy instance");
+ log.error("Could not stop haproxy instance", e);
}
- throw new LoadBalancerExtensionException(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
index 5dec4d3..3c2cf36 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/HAProxyStatisticsReader.java
@@ -22,12 +22,12 @@ package org.apache.stratos.haproxy.extension;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.common.util.CommandUtils;
+import org.apache.stratos.load.balancer.common.domain.Cluster;
+import org.apache.stratos.load.balancer.common.domain.Member;
+import org.apache.stratos.load.balancer.common.domain.Port;
+import org.apache.stratos.load.balancer.common.domain.Service;
import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Port;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
import java.io.IOException;
@@ -35,14 +35,17 @@ import java.io.IOException;
* HAProxy statistics reader.
*/
public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
+
private static final Log log = LogFactory.getLog(HAProxyStatisticsReader.class);
private String scriptsPath;
private String statsSocketFilePath;
+ private TopologyProvider topologyProvider;
- public HAProxyStatisticsReader() {
+ public HAProxyStatisticsReader(TopologyProvider topologyProvider) {
this.scriptsPath = HAProxyContext.getInstance().getScriptsPath();
this.statsSocketFilePath = HAProxyContext.getInstance().getStatsSocketFilePath();
+ this.topologyProvider = topologyProvider;
}
@Override
@@ -51,7 +54,7 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
String[] array;
int totalWeight, weight;
- for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Service service : topologyProvider.getTopology().getServices()) {
for (Cluster cluster : service.getClusters()) {
if (cluster.getClusterId().equals(clusterId)) {
totalWeight = 0;
@@ -63,25 +66,23 @@ public class HAProxyStatisticsReader implements LoadBalancerStatisticsReader {
for(String hostname : cluster.getHostNames()) {
backendId = hostname+"-http-members";
for (Member member : cluster.getMembers()) {
- if(member.getNetworkPartitionId().equals(HAProxyContext.getInstance().getNetworkPartitionId())) {
- // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
- command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
- try {
- output = CommandUtils.executeCommand(command);
- if ((output != null) && (output.length() > 0)) {
- array = output.split(" ");
- if ((array != null) && (array.length > 0)) {
- weight = Integer.parseInt(array[0]);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
- }
- totalWeight += weight;
+ // echo "get weight <backend>/<server>" | socat stdio <stats-socket>
+ command = String.format("%s/get-weight.sh %s %s %s", scriptsPath, backendId, member.getMemberId(), statsSocketFilePath);
+ try {
+ output = CommandUtils.executeCommand(command);
+ if ((output != null) && (output.length() > 0)) {
+ array = output.split(" ");
+ if ((array != null) && (array.length > 0)) {
+ weight = Integer.parseInt(array[0]);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member weight found: [cluster] %s [member] %s [weight] %d", member.getClusterId(), member.getMemberId(), weight));
}
+ totalWeight += weight;
}
- } catch (IOException e) {
- if (log.isErrorEnabled()) {
- log.error(e);
- }
+ }
+ } catch (IOException e) {
+ if (log.isErrorEnabled()) {
+ log.error(e);
}
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/de00e1fa/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index c6ee22b..f56541d 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.log4j.PropertyConfigurator;
import org.apache.stratos.common.threading.StratosThreadPool;
+import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension;
import java.util.concurrent.ExecutorService;
@@ -44,12 +45,31 @@ public class Main {
if (log.isInfoEnabled()) {
log.info("HAProxy extension started");
}
+
+ // Add shutdown hook
+ final Thread mainThread = Thread.currentThread();
+ final LoadBalancerExtension finalExtension = extension;
+ Runtime.getRuntime().addShutdownHook(new Thread() {
+ public void run() {
+ try {
+ if(finalExtension != null) {
+ log.info("Shutting haproxy instance...");
+ finalExtension.stop();
+ }
+ mainThread.join();
+ } catch (Exception e) {
+ log.error(e);
+ }
+ }
+ });
+
executorService = StratosThreadPool.getExecutorService("haproxy.extension.thread.pool", 10);
// Validate runtime parameters
HAProxyContext.getInstance().validate();
- extension = new LoadBalancerExtension(new HAProxy(),
- (HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
- new HAProxyStatisticsReader() : null));
+ TopologyProvider topologyProvider = new TopologyProvider();
+ HAProxyStatisticsReader statisticsReader = HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
+ new HAProxyStatisticsReader(topologyProvider) : null;
+ extension = new LoadBalancerExtension(new HAProxy(), statisticsReader, topologyProvider);
extension.setExecutorService(executorService);
extension.execute();
} catch (Exception e) {
@@ -57,7 +77,8 @@ public class Main {
log.error(e);
}
if (extension != null) {
- extension.terminate();
+ log.info("Shutting haproxy instance...");
+ extension.stop();
}
}
}