You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ga...@apache.org on 2015/10/09 16:35:39 UTC

[26/32] stratos git commit: Implemented In Flight request count logic.

Implemented In Flight request count logic.


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/b641e85f
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/b641e85f
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/b641e85f

Branch: refs/heads/gsoc-projects-2015
Commit: b641e85f28905ec14ec3db79472c7cf95d561268
Parents: cae92c5
Author: swapnilpatilRajaram <sw...@students.iiit.ac.in>
Authored: Thu Aug 13 15:58:05 2015 +0000
Committer: swapnilpatilRajaram <sw...@students.iiit.ac.in>
Committed: Thu Aug 13 15:58:05 2015 +0000

----------------------------------------------------------------------
 .../aws-extension/src/main/conf/aws.properties  |   3 +
 .../apache/stratos/aws/extension/AWSHelper.java | 165 +++++++++++++++++++
 .../stratos/aws/extension/AWSLoadBalancer.java  |   6 +-
 .../aws/extension/AWSStatisticsReader.java      |  35 +++-
 .../apache/stratos/aws/extension/Constants.java |  49 +++---
 5 files changed, 235 insertions(+), 23 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/aws-extension/src/main/conf/aws.properties b/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
index d4cc18a..2bb2879 100644
--- a/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
+++ b/extensions/load-balancer/aws-extension/src/main/conf/aws.properties
@@ -10,3 +10,6 @@ allowed-cidr-ip=0.0.0.0/0
 # Internet Protocol allowed for incoming requests for security group mentioned in 'load-balancer-security-group-name'. 
 # Comma separated e.g. tcp,udp
 allowed-protocols=tcp
+# statistics-interval denotes the interval in seconds for which statistics are gathered to calculate request in flight count.
+# This must be multiple of 60.
+statistics-interval=60

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
index b822939..7929099 100644
--- a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
+++ b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSHelper.java
@@ -24,6 +24,7 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.util.ArrayList;
 import java.util.Collection;
+import java.util.Date;
 import java.util.HashSet;
 import java.util.List;
 import java.util.Properties;
@@ -35,11 +36,18 @@ import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.load.balancer.common.domain.*;
 import org.apache.stratos.load.balancer.extension.api.exception.LoadBalancerExtensionException;
+import org.joda.time.DateTime;
+import org.joda.time.DateTimeZone;
 
 import com.amazonaws.AmazonClientException;
 import com.amazonaws.AmazonServiceException;
 import com.amazonaws.ClientConfiguration;
 import com.amazonaws.auth.BasicAWSCredentials;
+import com.amazonaws.services.cloudwatch.AmazonCloudWatchClient;
+import com.amazonaws.services.cloudwatch.model.Datapoint;
+import com.amazonaws.services.cloudwatch.model.Dimension;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsRequest;
+import com.amazonaws.services.cloudwatch.model.GetMetricStatisticsResult;
 import com.amazonaws.services.ec2.AmazonEC2Client;
 import com.amazonaws.services.ec2.model.AuthorizeSecurityGroupIngressRequest;
 import com.amazonaws.services.ec2.model.CreateSecurityGroupRequest;
@@ -58,6 +66,7 @@ public class AWSHelper {
 	private String lbSecurityGroupName;
 	private String lbSecurityGroupDescription;
 	private String allowedCidrIpForLBSecurityGroup;
+	private int statisticsInterval;
 
 	private AtomicInteger lbSequence;
 
@@ -70,6 +79,7 @@ public class AWSHelper {
 
 	AmazonElasticLoadBalancingClient elbClient;
 	AmazonEC2Client ec2Client;
+	private AmazonCloudWatchClient cloudWatchClient;
 
 	private static final Log log = LogFactory.getLog(AWSHelper.class);
 
@@ -141,6 +151,25 @@ public class AWSHelper {
 				this.allowedProtocolsForLBSecurityGroup.add(protocol);
 			}
 
+			String interval = properties
+					.getProperty(Constants.STATISTICS_INTERVAL);
+
+			if (interval == null || interval.isEmpty()) {
+				this.statisticsInterval = Constants.STATISTICS_INTERVAL_MULTIPLE_OF;
+			} else {
+				try {
+					this.statisticsInterval = Integer.parseInt(interval);
+
+					if (this.statisticsInterval
+							% Constants.STATISTICS_INTERVAL_MULTIPLE_OF != 0) {
+						this.statisticsInterval = Constants.STATISTICS_INTERVAL_MULTIPLE_OF;
+					}
+				} catch (NumberFormatException e) {
+					log.warn("Invalid statistics interval. Setting it to 15.");
+					this.statisticsInterval = 15;
+				}
+			}
+
 			this.lbSecurityGroupDescription = Constants.LOAD_BALANCER_SECURITY_GROUP_DESCRIPTION;
 
 			regionToSecurityGroupIdMap = new ConcurrentHashMap<String, String>();
@@ -153,6 +182,9 @@ public class AWSHelper {
 
 			ec2Client = new AmazonEC2Client(awsCredentials, clientConfiguration);
 
+			cloudWatchClient = new AmazonCloudWatchClient(awsCredentials,
+					clientConfiguration);
+
 		} catch (IOException e) {
 			log.error("Error reading aws configuration file.");
 			throw new LoadBalancerExtensionException(
@@ -166,6 +198,10 @@ public class AWSHelper {
 		}
 	}
 
+	public int getStatisticsInterval() {
+		return statisticsInterval;
+	}
+
 	public int getNextLBSequence() {
 		return lbSequence.getAndIncrement();
 	}
@@ -582,6 +618,135 @@ public class AWSHelper {
 	}
 
 	/**
+	 * @param loadBalancerName
+	 * @param region
+	 * @param timeInterval
+	 *            in seconds
+	 * @return
+	 */
+	public int getRequestCount(String loadBalancerName, String region,
+			int timeInterval) {
+		int count = 0;
+
+		try {
+			GetMetricStatisticsRequest request = new GetMetricStatisticsRequest();
+			request.setMetricName(Constants.REQUEST_COUNT_METRIC_NAME);
+			request.setNamespace(Constants.CLOUD_WATCH_NAMESPACE_NAME);
+
+			Date currentTime = new DateTime(DateTimeZone.UTC).toDate();
+			Date pastTime = new DateTime(DateTimeZone.UTC).minusSeconds(
+					timeInterval).toDate();
+
+			request.setStartTime(pastTime);
+			request.setEndTime(currentTime);
+
+			request.setPeriod(timeInterval);
+
+			HashSet<String> statistics = new HashSet<String>();
+			statistics.add(Constants.SUM_STATISTICS_NAME);
+			request.setStatistics(statistics);
+
+			HashSet<Dimension> dimensions = new HashSet<Dimension>();
+			Dimension loadBalancerDimension = new Dimension();
+			loadBalancerDimension
+					.setName(Constants.LOAD_BALANCER_DIMENTION_NAME);
+			loadBalancerDimension.setValue(loadBalancerName);
+			dimensions.add(loadBalancerDimension);
+			request.setDimensions(dimensions);
+
+			cloudWatchClient.setEndpoint(String.format(
+					Constants.CLOUD_WATCH_ENDPOINT_URL_FORMAT, region));
+
+			GetMetricStatisticsResult result = cloudWatchClient
+					.getMetricStatistics(request);
+
+			List<Datapoint> dataPoints = result.getDatapoints();
+
+			if (dataPoints != null && dataPoints.size() > 0) {
+				count = dataPoints.get(0).getSum().intValue();
+			}
+
+		} catch (AmazonClientException e) {
+			log.error(
+					"Could not get request count statistics of load balancer "
+							+ loadBalancerName, e);
+		}
+
+		return count;
+	}
+
+	public int getAllResponsesCount(String loadBalancerName, String region,
+			int timeInterval) {
+		int total = 0;
+
+		Date currentTime = new DateTime(DateTimeZone.UTC).toDate();
+		Date pastTime = new DateTime(DateTimeZone.UTC).minusSeconds(
+				timeInterval).toDate();
+
+		total += getResponseCountForMetric(loadBalancerName, region,
+				Constants.HTTP_RESPONSE_2XX, pastTime, currentTime,
+				timeInterval);
+		total += getResponseCountForMetric(loadBalancerName, region,
+				Constants.HTTP_RESPONSE_3XX, pastTime, currentTime,
+				timeInterval);
+		total += getResponseCountForMetric(loadBalancerName, region,
+				Constants.HTTP_RESPONSE_4XX, pastTime, currentTime,
+				timeInterval);
+		total += getResponseCountForMetric(loadBalancerName, region,
+				Constants.HTTP_RESPONSE_5XX, pastTime, currentTime,
+				timeInterval);
+
+		return total;
+	}
+
+	public int getResponseCountForMetric(String loadBalancerName,
+			String region, String metricName, Date startTime, Date endTime,
+			int timeInterval) {
+		int count = 0;
+
+		try {
+			GetMetricStatisticsRequest request = new GetMetricStatisticsRequest();
+			request.setMetricName(metricName);
+			request.setNamespace(Constants.CLOUD_WATCH_NAMESPACE_NAME);
+
+			request.setStartTime(startTime);
+			request.setEndTime(endTime);
+
+			request.setPeriod(timeInterval);
+
+			HashSet<String> statistics = new HashSet<String>();
+			statistics.add(Constants.SUM_STATISTICS_NAME);
+			request.setStatistics(statistics);
+
+			HashSet<Dimension> dimensions = new HashSet<Dimension>();
+			Dimension loadBalancerDimension = new Dimension();
+			loadBalancerDimension
+					.setName(Constants.LOAD_BALANCER_DIMENTION_NAME);
+			loadBalancerDimension.setValue(loadBalancerName);
+			dimensions.add(loadBalancerDimension);
+			request.setDimensions(dimensions);
+
+			cloudWatchClient.setEndpoint(String.format(
+					Constants.CLOUD_WATCH_ENDPOINT_URL_FORMAT, region));
+
+			GetMetricStatisticsResult result = cloudWatchClient
+					.getMetricStatistics(request);
+
+			List<Datapoint> dataPoints = result.getDatapoints();
+
+			if (dataPoints != null && dataPoints.size() > 0) {
+				count = dataPoints.get(0).getSum().intValue();
+			}
+
+		} catch (AmazonClientException e) {
+			log.error("Could not get the statistics for metric " + metricName
+					+ " of load balancer " + loadBalancerName, e);
+		}
+
+		return count;
+	}
+
+	/**
 	 * Returns the Listeners required for the service. Listeners are derived
 	 * from the proxy port, port and protocol values of the service.
 	 * 

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
index c3b80c7..ba04e5c 100644
--- a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
+++ b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSLoadBalancer.java
@@ -42,7 +42,7 @@ public class AWSLoadBalancer implements LoadBalancer {
 	private static final Log log = LogFactory.getLog(AWSLoadBalancer.class);
 
 	// A map <clusterId, load balancer id>
-	private ConcurrentHashMap<String, LoadBalancerInfo> clusterIdToLoadBalancerMap;
+	private static ConcurrentHashMap<String, LoadBalancerInfo> clusterIdToLoadBalancerMap;
 
 	private AWSHelper awsHelper;
 
@@ -272,6 +272,10 @@ public class AWSLoadBalancer implements LoadBalancer {
 
 		// Remove domain mappings
 	}
+
+	public static ConcurrentHashMap<String, LoadBalancerInfo> getClusterIdToLoadBalancerMap() {
+		return clusterIdToLoadBalancerMap;
+	}
 }
 
 /**

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
index f7afc3a..40d51e9 100644
--- a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
+++ b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/AWSStatisticsReader.java
@@ -29,8 +29,10 @@ import org.apache.stratos.load.balancer.common.domain.Port;
 import org.apache.stratos.load.balancer.common.domain.Service;
 import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
 import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
+import org.apache.stratos.load.balancer.extension.api.exception.LoadBalancerExtensionException;
 
 import java.io.IOException;
+import java.util.concurrent.ConcurrentHashMap;
 
 /**
  * AWS statistics reader.
@@ -42,11 +44,16 @@ public class AWSStatisticsReader implements LoadBalancerStatisticsReader {
 	private TopologyProvider topologyProvider;
 	private String clusterInstanceId;
 
-	public AWSStatisticsReader(TopologyProvider topologyProvider) {
+	private AWSHelper awsHelper;
+
+	public AWSStatisticsReader(TopologyProvider topologyProvider)
+			throws LoadBalancerExtensionException {
 		this.topologyProvider = topologyProvider;
 		this.clusterInstanceId = System.getProperty(
 				StratosConstants.CLUSTER_INSTANCE_ID,
 				StratosConstants.NOT_DEFINED);
+
+		awsHelper = new AWSHelper();
 	}
 
 	@Override
@@ -56,7 +63,29 @@ public class AWSStatisticsReader implements LoadBalancerStatisticsReader {
 
 	@Override
 	public int getInFlightRequestCount(String clusterId) {
-		// Find out logic
-		return 0;
+
+		int inFlightRequestCount = 0;
+
+		ConcurrentHashMap<String, LoadBalancerInfo> clusterIdToLoadBalancerMap = AWSLoadBalancer
+				.getClusterIdToLoadBalancerMap();
+
+		if (clusterIdToLoadBalancerMap.containsKey(clusterId)) {
+			LoadBalancerInfo loadBalancerInfo = clusterIdToLoadBalancerMap
+					.get(clusterId);
+
+			String loadBalancerName = loadBalancerInfo.getName();
+			String region = loadBalancerInfo.getRegion();
+
+			inFlightRequestCount = awsHelper.getRequestCount(loadBalancerName,
+					region, awsHelper.getStatisticsInterval())
+					- awsHelper.getAllResponsesCount(loadBalancerName, region,
+							awsHelper.getStatisticsInterval());
+
+			if (inFlightRequestCount < 0)
+				inFlightRequestCount = 0;
+
+		}
+
+		return inFlightRequestCount;
 	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/b641e85f/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
index 0792e00..30ada5c 100644
--- a/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
+++ b/extensions/load-balancer/aws-extension/src/main/java/org/apache/stratos/aws/extension/Constants.java
@@ -23,23 +23,34 @@ package org.apache.stratos.aws.extension;
  * AWS proxy extension constants.
  */
 public class Constants {
-    public static final String CEP_STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
-    public static final String THRIFT_RECEIVER_IP = "thrift.receiver.ip";
-    public static final String THRIFT_RECEIVER_PORT = "thrift.receiver.port";
-    public static final String NETWORK_PARTITION_ID = "network.partition.id";
-    public static final String CLUSTER_ID = "cluster.id";
-    public static final String SERVICE_NAME = "service.name";
-    public static final String AWS_PROPERTIES_FILE="aws.properties.file";
-    public static final String AWS_ACCESS_KEY = "access-key";
-    public static final String AWS_SECRET_KEY = "secret-key";
-    public static final String LB_PREFIX = "load-balancer-prefix";
-    public static final String LOAD_BALANCER_SECURITY_GROUP_NAME = "load-balancer-security-group-name";
-    public static final String LOAD_BALANCER_SECURITY_GROUP_DESCRIPTION = "Security group for load balancers created for Apache Stratos.";
-    public static final String ELB_ENDPOINT_URL_FORMAT = "elasticloadbalancing.%s.amazonaws.com";
-    public static final String EC2_ENDPOINT_URL_FORMAT = "ec2.%s.amazonaws.com";
-    public static final String ALLOWED_CIDR_IP_KEY = "allowed-cidr-ip";
-    public static final String ALLOWED_PROTOCOLS = "allowed-protocols";
-    public static final int LOAD_BALANCER_NAME_MAX_LENGTH = 32;
-    public static final int LOAD_BALANCER_PREFIX_MAX_LENGTH = 25;
-    public static final int SECURITY_GROUP_NAME_MAX_LENGTH = 255;
+	public static final String CEP_STATS_PUBLISHER_ENABLED = "cep.stats.publisher.enabled";
+	public static final String THRIFT_RECEIVER_IP = "thrift.receiver.ip";
+	public static final String THRIFT_RECEIVER_PORT = "thrift.receiver.port";
+	public static final String NETWORK_PARTITION_ID = "network.partition.id";
+	public static final String CLUSTER_ID = "cluster.id";
+	public static final String SERVICE_NAME = "service.name";
+	public static final String AWS_PROPERTIES_FILE = "aws.properties.file";
+	public static final String AWS_ACCESS_KEY = "access-key";
+	public static final String AWS_SECRET_KEY = "secret-key";
+	public static final String LB_PREFIX = "load-balancer-prefix";
+	public static final String LOAD_BALANCER_SECURITY_GROUP_NAME = "load-balancer-security-group-name";
+	public static final String LOAD_BALANCER_SECURITY_GROUP_DESCRIPTION = "Security group for load balancers created for Apache Stratos.";
+	public static final String ELB_ENDPOINT_URL_FORMAT = "elasticloadbalancing.%s.amazonaws.com";
+	public static final String EC2_ENDPOINT_URL_FORMAT = "ec2.%s.amazonaws.com";
+	public static final String CLOUD_WATCH_ENDPOINT_URL_FORMAT = "monitoring.%s.amazonaws.com";
+	public static final String ALLOWED_CIDR_IP_KEY = "allowed-cidr-ip";
+	public static final String ALLOWED_PROTOCOLS = "allowed-protocols";
+	public static final int LOAD_BALANCER_NAME_MAX_LENGTH = 32;
+	public static final int LOAD_BALANCER_PREFIX_MAX_LENGTH = 25;
+	public static final int SECURITY_GROUP_NAME_MAX_LENGTH = 255;
+	public static final String REQUEST_COUNT_METRIC_NAME = "RequestCount";
+	public static final String CLOUD_WATCH_NAMESPACE_NAME = "AWS/ELB";
+	public static final String SUM_STATISTICS_NAME = "Sum";
+	public static final String LOAD_BALANCER_DIMENTION_NAME = "LoadBalancerName";
+	public static final String HTTP_RESPONSE_2XX = "HTTPCode_Backend_2XX";
+	public static final String HTTP_RESPONSE_3XX = "HTTPCode_Backend_3XX";
+	public static final String HTTP_RESPONSE_4XX = "HTTPCode_Backend_4XX";
+	public static final String HTTP_RESPONSE_5XX = "HTTPCode_Backend_5XX";
+	public static final String STATISTICS_INTERVAL = "statistics-interval";
+	public static final int STATISTICS_INTERVAL_MULTIPLE_OF = 60;
 }