You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/11 19:02:51 UTC

[1/2] git commit: Re-factored autoscaler health event message delegator and added missing message processors

Updated Branches:
  refs/heads/master 0b4ba2e48 -> 75d977eb3


Re-factored autoscaler health event message delegator and added missing message processors


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/bfb87dce
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/bfb87dce
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/bfb87dce

Branch: refs/heads/master
Commit: bfb87dce5f986928b266bb2c45f0bc67e2dc3932
Parents: d3c42bf
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 11 23:32:22 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Dec 11 23:32:22 2013 +0530

----------------------------------------------------------------------
 .../apache/stratos/autoscaler/Constants.java    |  22 +-
 .../health/HealthEventMessageDelegator.java     | 356 ++++++++++++++-----
 2 files changed, 287 insertions(+), 91 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bfb87dce/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
index 2067466..f682260 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/Constants.java
@@ -5,14 +5,9 @@ package org.apache.stratos.autoscaler;
  */
 public class Constants {
 
-    public static String ROUND_ROBIN_ALGORITHM_ID = "round-robin";
-    public static String ONE_AFTER_ANOTHER_ALGORITHM_ID = "one-after-another";
-
-    public static String GRADIENT_OF_REQUESTS_IN_FLIGHT = "gradient_in_flight_requests";
-    public static String AVERAGE_REQUESTS_IN_FLIGHT = "average_in_flight_requests";
-    public static String SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT = "second_derivative_in_flight_requests";
-    
-    public static String MEMBER_FAULT_EVENT_NAME = "member_fault";
+    public static final String ROUND_ROBIN_ALGORITHM_ID = "round-robin";
+    public static final String ONE_AFTER_ANOTHER_ALGORITHM_ID = "one-after-another";
+    public static final String MEMBER_FAULT_EVENT_NAME = "member_fault";
 
     //scheduler
     public static final int SCHEDULE_DEFAULT_INITIAL_DELAY = 30;
@@ -26,4 +21,15 @@ public class Constants {
     // partition properties
     public static final String REGION_PROPERTY = "region";
 
+    public static final String AVERAGE_LOAD_AVERAGE = "average_load_average";
+    public static final String AVERAGE_MEMORY_CONSUMPTION = "average_memory_consumption";
+    public static final String AVERAGE_REQUESTS_IN_FLIGHT = "average_in_flight_requests";
+
+    public static final String GRADIENT_LOAD_AVERAGE = "gradient_load_average";
+    public static final String GRADIENT_MEMORY_CONSUMPTION = "gradient_memory_consumption";
+    public static final String GRADIENT_OF_REQUESTS_IN_FLIGHT = "gradient_in_flight_requests";
+
+    public static final String SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT = "second_derivative_in_flight_requests";
+    public static final String SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION = "second_derivative_memory_consumption";
+    public static final String SECOND_DERIVATIVE_OF_LOAD_AVERAGE = "second_derivative_load_average";
 }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/bfb87dce/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
index 7e6ef90..f7fc6b1 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
@@ -27,7 +27,14 @@ import org.apache.stratos.autoscaler.Constants;
 import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
 import org.apache.stratos.autoscaler.exception.SpawningException;
 import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.partition.PartitionManager;
+import org.apache.stratos.autoscaler.policy.model.LoadAverage;
+import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
 import org.apache.stratos.cloud.controller.deployment.partition.Partition;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
 
 import javax.jms.TextMessage;
 import java.io.BufferedReader;
@@ -43,113 +50,296 @@ import java.util.Map;
 public class HealthEventMessageDelegator implements Runnable {
 
     private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class);
-    private String eventName;
-    private String clusterId;
-    private String networkPartitionId;
-    private Map<String, String> messageProperties;
+
     @Override
     public void run() {
-		log.info("Health stat event message processor started");
+        log.info("Health event message delegator started");
 
         while (true) {
-			try {
-				TextMessage message = HealthEventQueue.getInstance().take();
+            try {
+                TextMessage message = HealthEventQueue.getInstance().take();
+
+                String messageText = message.getText();
+                if (log.isDebugEnabled())
+                    log.debug("Health event message received: [message] " + messageText);
+
+                Event event = jsonToEvent(messageText);
+
+                if (log.isInfoEnabled()) {
+                    log.info(String.format("Received event: [event-name] %s", event.getEventName()));
+                }
 
-				String messageText = message.getText();
-				if(log.isDebugEnabled())
-					log.debug("Health event message received. Message :" + messageText);
+                if (Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(event.getEventName())) {
+                    String clusterId = event.getProperties().get("cluster_id");
+                    String partitionId = event.getProperties().get("partition_id");
+                    String value = event.getProperties().get("value");
+                    String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(partitionId).getId();
+                    Float floatValue = Float.parseFloat(value);
 
-                messageProperties = setEventValues(messageText);
-                this.clusterId = messageProperties.get("cluster_id");
-                log.info("Received event " + eventName + " for cluster " + this.clusterId);
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("%s event: [cluster] %s [partition] %s [value] %s", event.getEventName(), clusterId, partitionId, value));
+                    }
 
-                if(Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)){                	
-                	Float messageValue = Float.parseFloat(messageProperties.get("value"));
                     AutoscalerContext.getInstance().getMonitor(clusterId).getNetworkPartitionCtxt(networkPartitionId)
-                            .setAverageRequestsInFlight(messageValue);
+                            .setAverageRequestsInFlight(floatValue);
+
+                } else if (Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(event.getEventName())) {
+                    String clusterId = event.getProperties().get("cluster_id");
+                    String partitionId = event.getProperties().get("partition_id");
+                    String value = event.getProperties().get("value");
+                    String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(partitionId).getId();
+                    Float floatValue = Float.parseFloat(value);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("%s event: [cluster] %s [partition] %s [value] %s", event.getEventName(), clusterId, partitionId, value));
+                    }
 
-                }  else if(Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)){                	
-                	Float messageValue = Float.parseFloat(messageProperties.get("value"));
                     AutoscalerContext.getInstance().getMonitor(clusterId).getNetworkPartitionCtxt(networkPartitionId)
-                            .setRequestsInFlightGradient(messageValue);
+                            .setRequestsInFlightGradient(floatValue);
+
+                } else if (Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(event.getEventName())) {
+                    String clusterId = event.getProperties().get("cluster_id");
+                    String partitionId = event.getProperties().get("partition_id");
+                    String value = event.getProperties().get("value");
+                    String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(partitionId).getId();
+                    Float floatValue = Float.parseFloat(value);
+
+                    if (log.isDebugEnabled()) {
+                        log.debug(String.format("%s event: [cluster] %s [partition] %s [value] %s", event.getEventName(), clusterId, partitionId, value));
+                    }
 
-                }  else if(Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)){
-                	Float messageValue = Float.parseFloat(messageProperties.get("value"));
                     AutoscalerContext.getInstance().getMonitor(clusterId).getNetworkPartitionCtxt(networkPartitionId)
-                            .setRequestsInFlightSecondDerivative(messageValue);
-
-                }else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(eventName)){
-                	
-                	String memberId = messageProperties.get("member_id");
-                	if(memberId == null || memberId.isEmpty())
-                		log.error("MemberId is not included in the received message");
-                	handleMemberfaultEvent(memberId);                	
+                            .setRequestsInFlightSecondDerivative(floatValue);
+
+                } else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(event.getEventName())) {
+                    String clusterId = event.getProperties().get("cluster_id");
+                    String memberId = event.getProperties().get("member_id");
+
+                    if (memberId == null || memberId.isEmpty()) {
+                        if(log.isErrorEnabled()) {
+                            log.error("Member id not found in received message");
+                        }
+                    } else {
+                        handleMemberFaultEvent(clusterId, memberId);
+                    }
+                } else if(Constants.AVERAGE_LOAD_AVERAGE.equals(event.getEventName())) {
+                    LoadAverage loadAverage = findLoadAverage(event);
+                    if(loadAverage != null) {
+                        String value = event.getProperties().get("value");
+                        Float floatValue = Float.parseFloat(value);
+                        loadAverage.setAverage(floatValue);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+                        }
+                    }
+                } else if(Constants.SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(event.getEventName())) {
+                    LoadAverage loadAverage = findLoadAverage(event);
+                    if(loadAverage != null) {
+                        String value = event.getProperties().get("value");
+                        Float floatValue = Float.parseFloat(value);
+                        loadAverage.setSecondDerivative(floatValue);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+                        }
+                    }
+                } else if(Constants.GRADIENT_LOAD_AVERAGE.equals(event.getEventName())) {
+                    LoadAverage loadAverage = findLoadAverage(event);
+                    if(loadAverage != null) {
+                        String value = event.getProperties().get("value");
+                        Float floatValue = Float.parseFloat(value);
+                        loadAverage.setGradient(floatValue);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+                        }
+                    }
+                } else if(Constants.AVERAGE_MEMORY_CONSUMPTION.equals(event.getEventName())) {
+                    MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+                    if(memoryConsumption != null) {
+                        String value = event.getProperties().get("value");
+                        Float floatValue = Float.parseFloat(value);
+                        memoryConsumption.setAverage(floatValue);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+                        }
+                    }
+                } else if(Constants.SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(event.getEventName())) {
+                    MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+                    if(memoryConsumption != null) {
+                        String value = event.getProperties().get("value");
+                        Float floatValue = Float.parseFloat(value);
+                        memoryConsumption.setSecondDerivative(floatValue);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+                        }
+                    }
+                } else if(Constants.GRADIENT_MEMORY_CONSUMPTION.equals(event.getEventName())) {
+                    MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+                    if(memoryConsumption != null) {
+                        String value = event.getProperties().get("value");
+                        Float floatValue = Float.parseFloat(value);
+                        memoryConsumption.setGradient(floatValue);
+
+                        if (log.isDebugEnabled()) {
+                            log.debug(String.format("%s event: [member] %s [value] %s", event.getProperties().get("member_id"), value));
+                        }
+                    }
                 }
-                
-                // clear the message properties after handling the message.
-                messageProperties.clear();
-                
-			} catch (Exception e) {
-                String error = "Failed to retrieve the health stat event message." + e.getMessage();
-            	log.error(error );
+            } catch (Exception e) {
+                log.error("Failed to retrieve the health stat event message.", e);
             }
         }
     }
 
-    private void handleMemberfaultEvent(String memberId) {
-		try {	
-			
-			ClusterMonitor monitor = AutoscalerContext.getInstance().getMonitor(this.clusterId);
-//            TopologyManager.getTopology().get
-			if(!monitor.memberExist(memberId)){
-				// member has already terminated. So no action required
-				return;
-			}
-				
-			// terminate the faulty member
-			CloudControllerClient ccClient = CloudControllerClient.getInstance();
-			ccClient.terminate(memberId);
-			
-			// start a new member in the same Partition
-			//ClusterContext clsCtx = AutoscalerContext.getInstance().getClusterMonitor(clusterId);
-			String partitionId = monitor.getPartitonOfMember(memberId);
-			Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
-			ccClient.spawnAnInstance(partition, clusterId);
-			
-		} catch (TerminationException e) {
-			log.error(e);
-		}catch(SpawningException e){
-			log.error(e);
-		}		
-	}
-
-	public Map<String, String> setEventValues(String json) {
-    	
-    	Map<String, String> properties = new HashMap<String, String>();
-    	BufferedReader bufferedReader = new BufferedReader(new StringReader(json));
-        JsonReader reader = new JsonReader(bufferedReader);
+    private LoadAverage findLoadAverage(Event event) {
+        String memberId = event.getProperties().get("member_id");
+        Member member = findMember(memberId);
+        if(member != null) {
+            String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(member.getPartitionId()).getId();
+            LoadAverage loadAverage = AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+                    .getNetworkPartitionCtxt(networkPartitionId)
+                    .getPartitionCtxt(member.getPartitionId())
+                    .getMemberStatsContext(memberId).getLoadAverage();
+
+            if(loadAverage == null) {
+                loadAverage = new LoadAverage();
+                AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+                        .getNetworkPartitionCtxt(networkPartitionId)
+                        .getPartitionCtxt(member.getPartitionId())
+                        .getMemberStatsContext(memberId).setLoadAverage(loadAverage);
+            }
+            return loadAverage;
+        }
+        else {
+            if(log.isErrorEnabled()) {
+                log.error(String.format("Member not found: [member] %s", memberId));
+            }
+            return null;
+        }
+    }
+
+    private MemoryConsumption findMemoryConsumption(Event event) {
+        String memberId = event.getProperties().get("member_id");
+        Member member = findMember(memberId);
+        if(member != null) {
+            String networkPartitionId = PartitionManager.getInstance().getNetworkPartitionOfPartition(member.getPartitionId()).getId();
+            MemoryConsumption memoryConsumption = AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+                    .getNetworkPartitionCtxt(networkPartitionId)
+                    .getPartitionCtxt(member.getPartitionId())
+                    .getMemberStatsContext(memberId).getMemoryConsumption();
+
+            if(memoryConsumption == null) {
+                memoryConsumption = new MemoryConsumption();
+                AutoscalerContext.getInstance().getMonitor(member.getClusterId())
+                        .getNetworkPartitionCtxt(networkPartitionId)
+                        .getPartitionCtxt(member.getPartitionId())
+                        .getMemberStatsContext(memberId).setMemoryConsumption(memoryConsumption);
+            }
+            return memoryConsumption;
+        }
+        else {
+            if(log.isErrorEnabled()) {
+                log.error(String.format("Member not found: [member] %s", memberId));
+            }
+            return null;
+        }
+    }
+
+    private Member findMember(String memberId) {
+        try {
+            TopologyManager.acquireReadLock();
+            for(Service service : TopologyManager.getTopology().getServices()) {
+                for(Cluster cluster : service.getClusters()) {
+                    if(cluster.memberExists(memberId)) {
+                        return cluster.getMember(memberId);
+                    }
+                }
+            }
+            return null;
+        }
+        finally {
+            TopologyManager.releaseReadLock();
+        }
+    }
+
+    private void handleMemberFaultEvent(String clusterId, String memberId) {
         try {
 
+            ClusterMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+            if (!monitor.memberExist(memberId)) {
+                // member has already terminated. So no action required
+                return;
+            }
+
+            // terminate the faulty member
+            CloudControllerClient ccClient = CloudControllerClient.getInstance();
+            ccClient.terminate(memberId);
+
+            // start a new member in the same Partition
+            String partitionId = monitor.getPartitonOfMember(memberId);
+            Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
+            ccClient.spawnAnInstance(partition, clusterId);
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Instance spawned for fault member: [partition] %s [cluster] %s", partitionId, clusterId));
+            }
+
+        } catch (TerminationException e) {
+            log.error(e);
+        } catch (SpawningException e) {
+            log.error(e);
+        }
+    }
+
+    public Event jsonToEvent(String json) {
+
+        Event event = new Event();
+        BufferedReader bufferedReader = new BufferedReader(new StringReader(json));
+        JsonReader reader = new JsonReader(bufferedReader);
+        try {
             reader.beginObject();
 
-            if(reader.hasNext()) {
-                eventName = reader.nextName();
-                
-                reader.beginObject();                
-                while(reader.hasNext()){
-                	String name = reader.nextName();
-                	String value = reader.nextString();
-                	properties.put(name, value);
+            if (reader.hasNext()) {
+                event.setEventName(reader.nextName());
+
+                reader.beginObject();
+                Map<String, String> properties = new HashMap<String, String>();
+                while (reader.hasNext()) {
+                    String name = reader.nextName();
+                    String value = reader.nextString();
+                    properties.put(name, value);
                 }
-                               
+                event.setProperties(properties);
             }
-            reader.close(); 
+            reader.close();
+            return event;
+        } catch (IOException e) {
+            log.error("Could not extract event");
+        }
+        return null;
+    }
+
+    private class Event {
+        private String eventName;
+        private Map<String, String> properties;
+
+        private String getEventName() {
+            return eventName;
+        }
+
+        private void setEventName(String eventName) {
+            this.eventName = eventName;
+        }
+
+        private Map<String, String> getProperties() {
             return properties;
-        }catch(IOException e) {
-            log.error( "Could not extract message header");
-//            throw new RuntimeException("Could not extract message header", e);
         }
-		return null;
+
+        private void setProperties(Map<String, String> properties) {
+            this.properties = properties;
+        }
     }
-    
 }


[2/2] git commit: Merge remote-tracking branch 'origin/master'

Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/75d977eb
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/75d977eb
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/75d977eb

Branch: refs/heads/master
Commit: 75d977eb3b44f4f1518028b7a711b97403577960
Parents: bfb87dc 0b4ba2e
Author: Imesh Gunaratne <im...@apache.org>
Authored: Wed Dec 11 23:32:45 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Wed Dec 11 23:32:45 2013 +0530

----------------------------------------------------------------------
 .../stratos/autoscaler/AutoscalerContext.java   |   4 +
 .../autoscaler/NetworkPartitionContext.java     |  19 ++
 .../stratos/autoscaler/PartitionContext.java    |  22 ++
 .../cloud/controller/CloudControllerClient.java |  28 +--
 .../autoscaler/partition/PartitionManager.java  |   4 +
 .../rule/AutoscalerRuleEvaluator.java           |  22 +-
 .../topology/AutoscalerTopologyReceiver.java    | 109 +++++----
 .../stratos/autoscaler/util/AutoscalerUtil.java | 232 ++++++++++++++-----
 .../stratos/messaging/util/Constants.java       |   5 +
 .../apache/stratos/rest/endpoint/Constants.java |   5 +-
 .../rest/endpoint/services/ServiceUtils.java    |  53 +++--
 .../main/resources/CloudControllerService.wsdl  |   1 +
 12 files changed, 364 insertions(+), 140 deletions(-)
----------------------------------------------------------------------