You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/11 19:02:51 UTC
[1/2] git commit: Re-factored autoscaler health event message
delegator and added missing message processors
Updated Branches:
refs/heads/master 0b4ba2e48 -> 75d977eb3
Re-factored autoscaler health event message delegator and added missing message processors
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/bfb87dce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/bfb87dce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/bfb87dce
Branch: refs/heads/master
Commit: bfb87dce5f986928b266bb2c45f0bc67e2dc3932
Parents: d3c42bf
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 11 23:32:22 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Dec 11 23:32:22 2013 +0530
----------------------------------------------------------------------
.../apache/stratos/autoscaler/Constants.java | 22 +-
.../health/HealthEventMessageDelegator.java | 356 ++++++++++++++-----
2 files changed, 287 insertions(+), 91 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bfb87dce/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
index 2067466..f682260 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
@@ -5,14 +5,9 @@ package org.apache.stratos.autoscaler;
*/
public class Constants {
- public static String ROUND_ROBIN_ALGORITHM_ID = "round-robin";
- public static String ONE_AFTER_ANOTHER_ALGORITHM_ID = "one-after-another";
-
- public static String GRADIENT_OF_REQUESTS_IN_FLIGHT = "gradient_in_flight_requests";
- public static String AVERAGE_REQUESTS_IN_FLIGHT = "average_in_flight_requests";
- public static String SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT = "second_derivative_in_flight_requests";
-
- public static String MEMBER_FAULT_EVENT_NAME = "member_fault";
+ public static final String ROUND_ROBIN_ALGORITHM_ID = "round-robin";
+ public static final String ONE_AFTER_ANOTHER_ALGORITHM_ID = "one-after-another";
+ public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";
//scheduler
public static final int SCHEDULE_DEFAULT_INITIAL_DELAY = 30;
@@ -26,4 +21,15 @@ public class Constants {
// partition properties
public static final String REGION_PROPERTY = "region";
+ public static final String AVERAGE_LOAD_AVERAGE = "average_load_average";
+ public static final String AVERAGE_MEMORY_CONSUMPTION = "average_memory_consumption";
+ public static final String AVERAGE_REQUESTS_IN_FLIGHT = "average_in_flight_requests";
+
+ public static final String GRADIENT_LOAD_AVERAGE = "gradient_load_average";
+ public static final String GRADIENT_MEMORY_CONSUMPTION = "gradient_memory_consumption";
+ public static final String GRADIENT_OF_REQUESTS_IN_FLIGHT = "gradient_in_flight_requests";
+
+ public static final String SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT = "second_derivative_in_flight_requests";
+ public static final String SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION = "second_derivative_memory_consumption";
+ public static final String SECOND_DERIVATIVE_OF_LOAD_AVERAGE = "second_derivative_load_average";
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bfb87dce/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
index 7e6ef90..f7fc6b1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
@@ -27,7 +27,14 @@ import org.apache.stratos.autoscaler.Constants;
import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
import org.apache.stratos.autoscaler.exception.SpawningException;
import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.model.LoadAverage;
+import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
import org.apache.stratos.cloud.controller.deployment.partition.Partition;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import javax.jms.TextMessage;
import java.io.BufferedReader;
@@ -43,113 +50,296 @@ import java.util.Map;
public class HealthEventMessageDelegator implements Runnable {
private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class);
- private String eventName;
- private String clusterId;
- private String networkPartitionId;
- private Map<String, String> messageProperties;
+
@Override
public void run() {
- log.info("Health stat event message processor started");
+ log.info("Health event message delegator started");
while (true) {
- try {
- TextMessage message = HealthEventQueue.getInstance().take();
+ try {
+ TextMessage message = HealthEventQueue.getInstance().take();
+
+ String messageText = message.getText();
+ if (log.isDebugEnabled())
+ log.debug("Health event message received: [message] " + messageText);
+
+ Event event = jsonToEvent(messageText);
+
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Received event: [event-name] %s", event.getEventName()));
+ }
- String messageText = message.getText();
- if(log.isDebugEnabled())
- log.debug("Health event message received. Message :" + messageText);
+ if (Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(event.getEventName())) {
+ String clusterId = event.getProperties().get("cluster_id");
+ String partitionId = event.getProperties().get("partition_id");
+ String value = event.getProperties().get("value");
+ String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(partitionId).getId();
+ Float floatValue = Float.parseFloat(value);
- messageProperties = setEventValues(messageText);
- this.clusterId = messageProperties.get("cluster_id");
- log.info("Received event " + eventName + " for cluster " + this.clusterId);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [cluster] %s [partition] %s [value] %s", event.getEventName(), clusterId, partitionId, value));
+ }
- if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){
- Float messageValue = Float.parseFloat(messageProperties.get("value"));
AutoscalerContext.getInstance().getMonitor(clusterId).getNetworkPartitionCtxt(networkPartitionId)
- .setAverageRequestsInFlight(messageValue);
+ .setAverageRequestsInFlight(floatValue);
+
+ } else if (Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(event.getEventName())) {
+ String clusterId = event.getProperties().get("cluster_id");
+ String partitionId = event.getProperties().get("partition_id");
+ String value = event.getProperties().get("value");
+ String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(partitionId).getId();
+ Float floatValue = Float.parseFloat(value);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [cluster] %s [partition] %s [value] %s", event.getEventName(), clusterId, partitionId, value));
+ }
- } else if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){
- Float messageValue = Float.parseFloat(messageProperties.get("value"));
AutoscalerContext.getInstance().getMonitor(clusterId).getNetworkPartitionCtxt(networkPartitionId)
- .setRequestsInFlightGradient(messageValue);
+ .setRequestsInFlightGradient(floatValue);
+
+ } else if (Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(event.getEventName())) {
+ String clusterId = event.getProperties().get("cluster_id");
+ String partitionId = event.getProperties().get("partition_id");
+ String value = event.getProperties().get("value");
+ String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(partitionId).getId();
+ Float floatValue = Float.parseFloat(value);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [cluster] %s [partition] %s [value] %s", event.getEventName(), clusterId, partitionId, value));
+ }
- } else if(Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)){
- Float messageValue = Float.parseFloat(messageProperties.get("value"));
AutoscalerContext.getInstance().getMonitor(clusterId).getNetworkPartitionCtxt(networkPartitionId)
- .setRequestsInFlightSecondDerivative(messageValue);
-
- }else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(eventName)){
-
- String memberId = messageProperties.get("member_id");
- if(memberId == null || memberId.isEmpty())
- log.error("MemberId is not included in the received message");
- handleMemberfaultEvent(memberId);
+ .setRequestsInFlightSecondDerivative(floatValue);
+
+ } else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(event.getEventName())) {
+ String clusterId = event.getProperties().get("cluster_id");
+ String memberId = event.getProperties().get("member_id");
+
+ if (memberId == null || memberId.isEmpty()) {
+ if(log.isErrorEnabled()) {
+ log.error("Member id not found in received message");
+ }
+ } else {
+ handleMemberFaultEvent(clusterId, memberId);
+ }
+ } else if(Constants.AVERAGE_LOAD_AVERAGE.equals(event.getEventName())) {
+ LoadAverage loadAverage = findLoadAverage(event);
+ if(loadAverage != null) {
+ String value = event.getProperties().get("value");
+ Float floatValue = Float.parseFloat(value);
+ loadAverage.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+ }
+ }
+ } else if(Constants.SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(event.getEventName())) {
+ LoadAverage loadAverage = findLoadAverage(event);
+ if(loadAverage != null) {
+ String value = event.getProperties().get("value");
+ Float floatValue = Float.parseFloat(value);
+ loadAverage.setSecondDerivative(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+ }
+ }
+ } else if(Constants.GRADIENT_LOAD_AVERAGE.equals(event.getEventName())) {
+ LoadAverage loadAverage = findLoadAverage(event);
+ if(loadAverage != null) {
+ String value = event.getProperties().get("value");
+ Float floatValue = Float.parseFloat(value);
+ loadAverage.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+ }
+ }
+ } else if(Constants.AVERAGE_MEMORY_CONSUMPTION.equals(event.getEventName())) {
+ MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+ if(memoryConsumption != null) {
+ String value = event.getProperties().get("value");
+ Float floatValue = Float.parseFloat(value);
+ memoryConsumption.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+ }
+ }
+ } else if(Constants.SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(event.getEventName())) {
+ MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+ if(memoryConsumption != null) {
+ String value = event.getProperties().get("value");
+ Float floatValue = Float.parseFloat(value);
+ memoryConsumption.setSecondDerivative(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+ }
+ }
+ } else if(Constants.GRADIENT_MEMORY_CONSUMPTION.equals(event.getEventName())) {
+ MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+ if(memoryConsumption != null) {
+ String value = event.getProperties().get("value");
+ Float floatValue = Float.parseFloat(value);
+ memoryConsumption.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+ }
+ }
}
-
- // clear the message properties after handling the message.
- messageProperties.clear();
-
- } catch (Exception e) {
- String error = "Failed to retrieve the health stat event message." + e.getMessage();
- log.error(error );
+ } catch (Exception e) {
+ log.error("Failed to retrieve the health stat event message.", e);
}
}
}
- private void handleMemberfaultEvent(String memberId) {
- try {
-
- ClusterMonitor monitor = AutoscalerContext.getInstance().getMonitor(this.clusterId);
-// TopologyManager.getTopology().get
- if(!monitor.memberExist(memberId)){
- // member has already terminated. So no action required
- return;
- }
-
- // terminate the faulty member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // start a new member in the same Partition
- //ClusterContext clsCtx = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- String partitionId = monitor.getPartitonOfMember(memberId);
- Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
- ccClient.spawnAnInstance(partition, clusterId);
-
- } catch (TerminationException e) {
- log.error(e);
- }catch(SpawningException e){
- log.error(e);
- }
- }
-
- public Map<String, String> setEventValues(String json) {
-
- Map<String, String> properties = new HashMap<String, String>();
- BufferedReader bufferedReader = new BufferedReader(new StringReader(json));
- JsonReader reader = new JsonReader(bufferedReader);
+ private LoadAverage findLoadAverage(Event event) {
+ String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+ if(member != null) {
+ String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(member.getPartitionId()).getId();
+ LoadAverage loadAverage = AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+ .getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId).getLoadAverage();
+
+ if(loadAverage == null) {
+ loadAverage = new LoadAverage();
+ AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+ .getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId).setLoadAverage(loadAverage);
+ }
+ return loadAverage;
+ }
+ else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member not found: [member] %s", memberId));
+ }
+ return null;
+ }
+ }
+
+ private MemoryConsumption findMemoryConsumption(Event event) {
+ String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+ if(member != null) {
+ String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(member.getPartitionId()).getId();
+ MemoryConsumption memoryConsumption = AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+ .getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId).getMemoryConsumption();
+
+ if(memoryConsumption == null) {
+ memoryConsumption = new MemoryConsumption();
+ AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+ .getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId).setMemoryConsumption(memoryConsumption);
+ }
+ return memoryConsumption;
+ }
+ else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member not found: [member] %s", memberId));
+ }
+ return null;
+ }
+ }
+
+ private Member findMember(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for(Service service : TopologyManager.getTopology().getServices()) {
+ for(Cluster cluster : service.getClusters()) {
+ if(cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ private void handleMemberFaultEvent(String clusterId, String memberId) {
try {
+ ClusterMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if (!monitor.memberExist(memberId)) {
+ // member has already terminated. So no action required
+ return;
+ }
+
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ ccClient.terminate(memberId);
+
+ // start a new member in the same Partition
+ String partitionId = monitor.getPartitonOfMember(memberId);
+ Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
+ ccClient.spawnAnInstance(partition, clusterId);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Instance spawned for fault member: [partition] %s [cluster] %s", partitionId, clusterId));
+ }
+
+ } catch (TerminationException e) {
+ log.error(e);
+ } catch (SpawningException e) {
+ log.error(e);
+ }
+ }
+
+ public Event jsonToEvent(String json) {
+
+ Event event = new Event();
+ BufferedReader bufferedReader = new BufferedReader(new StringReader(json));
+ JsonReader reader = new JsonReader(bufferedReader);
+ try {
reader.beginObject();
- if(reader.hasNext()) {
- eventName = reader.nextName();
-
- reader.beginObject();
- while(reader.hasNext()){
- String name = reader.nextName();
- String value = reader.nextString();
- properties.put(name, value);
+ if (reader.hasNext()) {
+ event.setEventName(reader.nextName());
+
+ reader.beginObject();
+ Map<String, String> properties = new HashMap<String, String>();
+ while (reader.hasNext()) {
+ String name = reader.nextName();
+ String value = reader.nextString();
+ properties.put(name, value);
}
-
+ event.setProperties(properties);
}
- reader.close();
+ reader.close();
+ return event;
+ } catch (IOException e) {
+ log.error("Could not extract event");
+ }
+ return null;
+ }
+
+ private class Event {
+ private String eventName;
+ private Map<String, String> properties;
+
+ private String getEventName() {
+ return eventName;
+ }
+
+ private void setEventName(String eventName) {
+ this.eventName = eventName;
+ }
+
+ private Map<String, String> getProperties() {
return properties;
- }catch(IOException e) {
- log.error( "Could not extract message header");
-// throw new RuntimeException("Could not extract message header", e);
}
- return null;
+
+ private void setProperties(Map<String, String> properties) {
+ this.properties = properties;
+ }
}
-
}
[2/2] git commit: Merge remote-tracking branch 'origin/master'
Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/75d977eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/75d977eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/75d977eb
Branch: refs/heads/master
Commit: 75d977eb3b44f4f1518028b7a711b97403577960
Parents: bfb87dc 0b4ba2e
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 11 23:32:45 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Dec 11 23:32:45 2013 +0530
----------------------------------------------------------------------
.../stratos/autoscaler/AutoscalerContext.java | 4 +
.../autoscaler/NetworkPartitionContext.java | 19 ++
.../stratos/autoscaler/PartitionContext.java | 22 ++
.../cloud/controller/CloudControllerClient.java | 28 +--
.../autoscaler/partition/PartitionManager.java | 4 +
.../rule/AutoscalerRuleEvaluator.java | 22 +-
.../topology/AutoscalerTopologyReceiver.java | 109 +++++----
.../stratos/autoscaler/util/AutoscalerUtil.java | 232 ++++++++++++++-----
.../stratos/messaging/util/Constants.java | 5 +
.../apache/stratos/rest/endpoint/Constants.java | 5 +-
.../rest/endpoint/services/ServiceUtils.java | 53 +++--
.../main/resources/CloudControllerService.wsdl | 1 +
12 files changed, 364 insertions(+), 140 deletions(-)
----------------------------------------------------------------------