You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2015/12/24 14:04:34 UTC
[17/19] stratos git commit: Refactor the aws lb extension
Refactor the aws lb extension
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/9f7b4837
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/9f7b4837
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/9f7b4837
Branch: refs/heads/stratos-4.1.x
Commit: 9f7b4837f11dc43b534ec4e604312b8dbd886734
Parents: b5b6609
Author: gayangunarathne <ga...@wso2.com>
Authored: Tue Dec 8 10:41:57 2015 +0530
Committer: Isuru Haththotuwa <is...@apache.org>
Committed: Thu Dec 24 16:55:58 2015 +0530
----------------------------------------------------------------------
.../apache/stratos/aws/extension/AWSHelper.java | 9 +-
.../stratos/aws/extension/AWSLoadBalancer.java | 259 +++++++++----------
.../org/apache/stratos/aws/extension/Main.java | 24 +-
3 files changed, 136 insertions(+), 156 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/9f7b4837/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
index 8078252..a12fc96 100644
--- a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
+++ b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
@@ -471,20 +471,17 @@ public class AWSHelper {
public List<Instance> getAttachedInstances(String loadBalancerName,
String region) {
try {
- LoadBalancerDescription lbDescription = getLoadBalancerDescription(
- loadBalancerName, region);
+ LoadBalancerDescription lbDescription = getLoadBalancerDescription(loadBalancerName, region);
if (lbDescription == null) {
- log.warn("Could not find description of load balancer "
- + loadBalancerName);
+ log.warn("Could not find description of load balancer "+ loadBalancerName);
return null;
}
return lbDescription.getInstances();
} catch (AmazonClientException e) {
- log.error("Could not find instances attached load balancer "
- + loadBalancerName, e);
+ log.error("Could not find instances attached load balancer "+ loadBalancerName, e);
}
return null;
http://git-wip-us.apache.org/repos/asf/stratos/blob/9f7b4837/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
index 62f9882..5cd5556 100644
--- a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
+++ b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
@@ -80,63 +80,13 @@ public class AWSLoadBalancer implements LoadBalancer {
// Get the load balancer and update it.
if (log.isDebugEnabled()) {
- log.debug("Load balancer for cluster " + cluster.getClusterId() + " is already present.");
+ log.debug(String.format("Load balancer for cluster %s is already present.", cluster.getClusterId()));
}
- LoadBalancerInfo loadBalancerInfo = clusterIdToLoadBalancerMap.get(cluster.getClusterId());
+ if(updateExistingLoadBalancer(cluster)){
+ activeClusters.add(cluster.getClusterId());
+ }
- String loadBalancerName = loadBalancerInfo.getName();
- String region = loadBalancerInfo.getRegion();
-
- // Get all the instances attached - Attach newly added instances to load balancer
-
- // attachedInstances list is useful in finding out what are the new instances which
- // should be attached to this load balancer.
- List<Instance> attachedInstances = awsHelper.getAttachedInstances(loadBalancerName, region);
-
- // clusterMembers stores all the members of a cluster.
- Collection<Member> clusterMembers = cluster.getMembers();
-
- if (clusterMembers.size() > 0) {
- activeClusters.add(cluster.getClusterId());
-
- List<Instance> instancesToAddToLoadBalancer = new ArrayList<Instance>();
- List<String> availabilityZones = new ArrayList<String>();
-
- for (Member member : clusterMembers) {
- // if instance id of member is not in
- // attachedInstances
- // add this to instancesToAddToLoadBalancer
- Instance instance = new Instance(awsHelper.getAWSInstanceName(member.getInstanceId()));
-
- if (attachedInstances == null || !attachedInstances.contains(instance)) {
- instancesToAddToLoadBalancer.add(instance);
-
- if (log.isDebugEnabled()) {
- log.debug("Instance " + awsHelper.getAWSInstanceName(member.getInstanceId()) +
- " needs to be registered to load balancer " + loadBalancerName);
- }
-
- // LB Common Member has a property 'EC2_AVAILABILITY_ZONE' points to the ec2 availability zone
- // for this member. Use the property value to update the LB about the relevant zone
- String availabilityZone = getEC2AvaialbilityZoneOfMember(member);
- if (availabilityZone != null) {
- availabilityZones.add(availabilityZone);
- }
- }
- }
-
- if (instancesToAddToLoadBalancer.size() > 0) {
- awsHelper.registerInstancesToLoadBalancer(
- loadBalancerName,
- instancesToAddToLoadBalancer, region);
- }
-
- // update LB with the zones
- if (!availabilityZones.isEmpty() && !AWSExtensionContext.getInstance().isOperatingInVPC()) {
- awsHelper.addAvailabilityZonesForLoadBalancer(loadBalancerName, availabilityZones, region);
- }
- }
} else {
// Create a new load balancer for this cluster
@@ -161,8 +111,7 @@ public class AWSLoadBalancer implements LoadBalancer {
if (initialZones.isEmpty()) {
// initial availability zones not defined
// use the default (<region>a)
- initialAvailabilityZones.add(awsHelper.getAvailabilityZoneFromRegion
- (region));
+ initialAvailabilityZones.add(awsHelper.getAvailabilityZoneFromRegion(region));
} else {
// prepend the region and construct the availability zone list with
// full names (<region> + <zone>)
@@ -170,98 +119,29 @@ public class AWSLoadBalancer implements LoadBalancer {
initialAvailabilityZones.add(region + zone);
}
}
+ String loadBalancerDNSName =
+ createAWSLoadBalancer(loadBalancerName, region, listenersForThisCluster,initialAvailabilityZones);
- // Returns DNS name of load balancer which was created.
- // This is used in the domain mapping of this cluster.
- String loadBalancerDNSName = awsHelper.createLoadBalancer(loadBalancerName, listenersForThisCluster,
- region, initialAvailabilityZones, AWSExtensionContext.getInstance().isOperatingInVPC());
-
- // enable connection draining (default) and cross zone load balancing (if specified in aws-extension.sh)
- awsHelper.modifyLBAttributes(loadBalancerName, region, AWSExtensionContext.getInstance().
- isCrossZoneLoadBalancingEnabled(), true);
-
- // Add the inbound rule the security group of the load balancer
- // For each listener, add a new rule with load balancer port as allowed protocol in the security group.
- // if security group id is defined, directly use that
- for (Listener listener : listenersForThisCluster) {
- int port = listener.getLoadBalancerPort();
-
- if (awsHelper.getLbSecurityGroupIdDefinedInConfiguration() != null && !awsHelper.
- getLbSecurityGroupIdDefinedInConfiguration().isEmpty()) {
- for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) {
- awsHelper.addInboundRuleToSecurityGroup(awsHelper.getLbSecurityGroupIdDefinedInConfiguration(),
- region, protocol, port);
- }
- } else if (awsHelper.getLbSecurityGroupName() != null && !awsHelper
- .getLbSecurityGroupName().isEmpty()) {
- for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) {
- awsHelper.addInboundRuleToSecurityGroup(awsHelper.getSecurityGroupId(awsHelper
- .getLbSecurityGroupName(), region), region, protocol, port);
- }
- }
- }
-
- log.info("Load balancer '" + loadBalancerDNSName + "' created for cluster '" + cluster.getClusterId());
-
- // Register instances in the cluster to load balancer
- List<Instance> instances = new ArrayList<Instance>();
- List<String> availabilityZones = new ArrayList<String>();
-
- for (Member member : clusterMembers) {
- String instanceId = member.getInstanceId();
-
- if (log.isDebugEnabled()) {
- log.debug("Instance " + awsHelper.getAWSInstanceName(instanceId) + " needs to be registered to load balancer "
- + loadBalancerName);
- }
-
- Instance instance = new Instance();
- instance.setInstanceId(awsHelper.getAWSInstanceName(instanceId));
- instances.add(instance);
- // LB Common Member has a property 'EC2_AVAILABILITY_ZONE' which points to the ec2 availability
- // zone for this member. Use the property value to update the LB about the relevant zone
- String availabilityZone = getEC2AvaialbilityZoneOfMember(member);
- if (availabilityZone != null) {
- availabilityZones.add(availabilityZone);
- }
- }
+ log.info(String.format("Load balancer %s created for cluster %s " , loadBalancerDNSName, cluster.getClusterId()));
- awsHelper.registerInstancesToLoadBalancer(loadBalancerName, instances, region);
-
- // update LB with the zones
- if (!availabilityZones.isEmpty() && !AWSExtensionContext.getInstance().isOperatingInVPC()) {
- awsHelper.addAvailabilityZonesForLoadBalancer(loadBalancerName, availabilityZones, region);
- }
-
- // add stickiness policy
- if (awsHelper.getAppStickySessionCookie() != null && !awsHelper.getAppStickySessionCookie().isEmpty()) {
- CreateAppCookieStickinessPolicyResult result = awsHelper.createStickySessionPolicy(loadBalancerName,
- awsHelper.getAppStickySessionCookie(), Constants.STICKINESS_POLICY, region);
-
- if (result != null) {
- // Take a single port mapping from a member, and apply the policy for
- // the LB Listener port (Proxy port of the port mapping)
- awsHelper.applyPolicyToLBListenerPorts(aMember.getPorts(), loadBalancerName,
- Constants.STICKINESS_POLICY, region);
- }
- }
+ if(addClusterMembersInfo(clusterMembers, loadBalancerName, region)){
+ activeClusters.add(cluster.getClusterId());
+ }
// persist LB info
try {
persistenceManager.persist(new LBInfoDTO(loadBalancerName, cluster.getClusterId(), region));
} catch (PersistenceException e) {
- log.error("Unable to persist LB Information for " + loadBalancerName + ", cluster id " +
- cluster.getClusterId());
+ log.error(String.format(
+ "Unable to persist LB Information for %s , cluster id %s " + loadBalancerName,
+ cluster.getClusterId()));
}
- LoadBalancerInfo loadBalancerInfo = new LoadBalancerInfo(
- loadBalancerName, region);
+ LoadBalancerInfo loadBalancerInfo = new LoadBalancerInfo(loadBalancerName, region);
+ clusterIdToLoadBalancerMap.put(cluster.getClusterId(),loadBalancerInfo);
- clusterIdToLoadBalancerMap.put(cluster.getClusterId(),
- loadBalancerInfo);
- activeClusters.add(cluster.getClusterId());
}
pause(3000);
@@ -311,7 +191,111 @@ public class AWSLoadBalancer implements LoadBalancer {
return true;
}
- private String getEC2AvaialbilityZoneOfMember(Member member) {
+ private Boolean addClusterMembersInfo(Collection<Member> clusterMembers, String loadBalancerName, String region) {
+ Boolean isUpdated=false;
+ // Register instances in the cluster to load balancer
+ List<Instance> instances = new ArrayList<Instance>();
+ List<String> availabilityZones = new ArrayList<String>();
+
+ for (Member member : clusterMembers) {
+ isUpdated=true;
+ String instanceId = member.getInstanceId();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Instance " + awsHelper.getAWSInstanceName(instanceId) + " needs to be registered to load balancer "
+ + loadBalancerName);
+ }
+
+ Instance instance = new Instance();
+ instance.setInstanceId(awsHelper.getAWSInstanceName(instanceId));
+
+ instances.add(instance);
+ // LB Common Member has a property 'EC2_AVAILABILITY_ZONE' which points to the ec2 availability
+ // zone for this member. Use the property value to update the LB about the relevant zone
+ String availabilityZone = getEC2AvaialbilityZoneOfMember(member);
+ if (availabilityZone != null) {
+ availabilityZones.add(availabilityZone);
+ }
+
+ // add stickiness policy
+ if (awsHelper.getAppStickySessionCookie() != null && !awsHelper.getAppStickySessionCookie().isEmpty()) {
+ CreateAppCookieStickinessPolicyResult result = awsHelper.createStickySessionPolicy(loadBalancerName, awsHelper.getAppStickySessionCookie(),
+ Constants.STICKINESS_POLICY,
+ region);
+
+ if (result != null) {
+ // Take a single port mapping from a member, and apply the policy for
+ // the LB Listener port (Proxy port of the port mapping)
+ awsHelper.applyPolicyToLBListenerPorts(member.getPorts(), loadBalancerName,
+ Constants.STICKINESS_POLICY, region);
+ }
+ }
+
+ }
+
+ awsHelper.registerInstancesToLoadBalancer(loadBalancerName, instances, region);
+
+ // update LB with the zones
+ if (!availabilityZones.isEmpty() && !AWSExtensionContext.getInstance().isOperatingInVPC()) {
+ awsHelper.addAvailabilityZonesForLoadBalancer(loadBalancerName, availabilityZones, region);
+ }
+ return isUpdated;
+ }
+
+ private String createAWSLoadBalancer(String loadBalancerName, String region, List<Listener> listenersForThisCluster,
+ Set<String> initialAvailabilityZones) throws LoadBalancerExtensionException {
+ // Returns DNS name of load balancer which was created.
+ // This is used in the domain mapping of this cluster.
+ String loadBalancerDNSName = awsHelper.createLoadBalancer(loadBalancerName, listenersForThisCluster,
+ region, initialAvailabilityZones, AWSExtensionContext.getInstance().isOperatingInVPC());
+
+ // enable connection draining (default) and cross zone load balancing (if specified in aws-extension.sh)
+ awsHelper.modifyLBAttributes(loadBalancerName, region, AWSExtensionContext.getInstance().
+ isCrossZoneLoadBalancingEnabled(), true);
+ // Add the inbound rule the security group of the load balancer
+ // For each listener, add a new rule with load balancer port as allowed protocol in the security group.
+ // if security group id is defined, directly use that
+ for (Listener listener : listenersForThisCluster) {
+ int port = listener.getLoadBalancerPort();
+
+ if (awsHelper.getLbSecurityGroupIdDefinedInConfiguration() != null && !awsHelper.getLbSecurityGroupIdDefinedInConfiguration().isEmpty()) {
+ for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) {
+ awsHelper.addInboundRuleToSecurityGroup(awsHelper.getLbSecurityGroupIdDefinedInConfiguration(),
+ region, protocol, port);
+ }
+ } else if (awsHelper.getLbSecurityGroupName() != null && !awsHelper
+ .getLbSecurityGroupName().isEmpty()) {
+ for (String protocol : awsHelper.getAllowedProtocolsForLBSecurityGroup()) {
+ awsHelper.addInboundRuleToSecurityGroup(awsHelper.getSecurityGroupId(awsHelper.getLbSecurityGroupName(),region), region, protocol,port);
+ }
+ }
+ }
+
+ return loadBalancerDNSName;
+ }
+
+ private Boolean updateExistingLoadBalancer(Cluster cluster) {
+ Boolean isUpdated=false;
+ LoadBalancerInfo loadBalancerInfo = clusterIdToLoadBalancerMap.get(cluster.getClusterId());
+
+ String loadBalancerName = loadBalancerInfo.getName();
+ String region = loadBalancerInfo.getRegion();
+
+ // Get all the instances attached - Attach newly added instances to load balancer
+
+ // attachedInstances list is useful in finding out what are the new instances which
+ // should be attached to this load balancer.
+ List<Instance> attachedInstances = awsHelper.getAttachedInstances(loadBalancerName, region);
+
+ // clusterMembers stores all the members of a cluster.
+ Collection<Member> clusterMembers = cluster.getMembers();
+
+ isUpdated= addClusterMembersInfo(clusterMembers, loadBalancerName, region);
+
+ return isUpdated;
+ }
+
+ private String getEC2AvaialbilityZoneOfMember(Member member) {
if (member.getProperties() != null) {
return member.getProperties().getProperty(Constants.EC2_AVAILABILITY_ZONE_PROPERTY);
}
@@ -324,7 +308,6 @@ public class AWSLoadBalancer implements LoadBalancer {
* nothing but logs the message.
*/
public void start() throws LoadBalancerExtensionException {
-
log.info("AWS load balancer extension started.");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/9f7b4837/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java
index 73fa971..80b6481 100644
--- a/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java
+++ b/extensions/load-balancer/modules/aws-extension/src/main/java/org/apache/stratos/aws/extension/Main.java
@@ -25,6 +25,7 @@ import org.apache.log4j.PropertyConfigurator;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
import org.apache.stratos.load.balancer.extension.api.LoadBalancerExtension;
+import org.apache.stratos.load.balancer.extension.api.exception.LoadBalancerExtensionException;
import java.util.concurrent.ExecutorService;
@@ -34,6 +35,8 @@ import java.util.concurrent.ExecutorService;
public class Main {
private static final Log log = LogFactory.getLog(Main.class);
+ public static final String AWS_EXTENSION_THREAD_POOL = "aws.extension.thread.pool";
+ public static final int THREAD_POOL_SIZE = 10;
private static ExecutorService executorService;
public static void main(String[] args) {
@@ -41,23 +44,20 @@ public class Main {
LoadBalancerExtension extension = null;
try {
// Configure log4j properties
- PropertyConfigurator.configure(System
- .getProperty("log4j.properties.file.path"));
+ PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
if (log.isInfoEnabled()) {
log.info("AWS extension started");
}
- executorService = StratosThreadPool.getExecutorService(
- "aws.extension.thread.pool", 10);
+ executorService = StratosThreadPool.getExecutorService(AWS_EXTENSION_THREAD_POOL, THREAD_POOL_SIZE);
// Validate runtime parameters
AWSExtensionContext.getInstance().validate();
TopologyProvider topologyProvider = new TopologyProvider();
- AWSStatisticsReader statisticsReader = AWSExtensionContext
- .getInstance().isCEPStatsPublisherEnabled() ? new AWSStatisticsReader(
- topologyProvider) : null;
- extension = new LoadBalancerExtension(new AWSLoadBalancer(),
- statisticsReader, topologyProvider);
+ AWSStatisticsReader statisticsReader =
+ AWSExtensionContext.getInstance().isCEPStatsPublisherEnabled() ? new AWSStatisticsReader(
+ topologyProvider) : null;
+ extension = new LoadBalancerExtension(new AWSLoadBalancer(), statisticsReader, topologyProvider);
extension.setExecutorService(executorService);
extension.execute();
@@ -68,7 +68,7 @@ public class Main {
public void run() {
try {
if (finalExtension != null) {
- log.info("Shutting aws extension...");
+ log.info("Shutting aws LB extension...");
finalExtension.stop();
}
mainThread.join();
@@ -77,9 +77,9 @@ public class Main {
}
}
});
- } catch (Exception e) {
+ } catch (LoadBalancerExtensionException e) {
if (log.isErrorEnabled()) {
- log.error(e);
+ log.error("Error occurred while running the aws lb extension");
}
if (extension != null) {
log.info("Shutting aws extension...");