You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/01/04 16:09:50 UTC
[1/2] Moving the autoscaler health stat receiver to message processor
model gc: STRATOS-332
Updated Branches:
refs/heads/master 41687d86d -> 253bbb08a
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
deleted file mode 100644
index d1bcb1c..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageDelegator.java
+++ /dev/null
@@ -1,639 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import java.io.BufferedReader;
-import java.io.IOException;
-import java.io.StringReader;
-import java.util.HashMap;
-import java.util.Map;
-
-import javax.jms.TextMessage;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.*;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.exception.SpawningException;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
-import org.apache.stratos.autoscaler.policy.model.LoadAverage;
-import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.deployment.partition.Partition;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-import com.google.gson.stream.JsonReader;
-
-
-/**
- * A thread for processing topology messages and updating the topology data structure.
- */
-public class HealthEventMessageDelegator implements Runnable {
-
- private static final Log log = LogFactory.getLog(HealthEventMessageDelegator.class);
- private boolean terminate = false;
-
- @Override
- public void run() {
- if(log.isInfoEnabled()) {
- log.info("Health event message delegator started");
- }
-
- if(log.isDebugEnabled()) {
- log.debug("Waiting for topology to be initialized");
- }
- while(!TopologyManager.getTopology().isInitialized());
-
- while (!terminate) {
- try {
- TextMessage message = HealthEventQueue.getInstance().take();
-
- String messageText = message.getText();
- if (log.isDebugEnabled()) {
- log.debug("Health event message received: [message] " + messageText);
- }
- Event event = jsonToEvent(messageText);
- String eventName = event.getEventName();
-
- if (log.isInfoEnabled()) {
- log.info(String.format("Received event: [event-name] %s", eventName));
- }
-
- if (Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)) {
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
-
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageRequestsInFlight(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
-
- } else if (Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setRequestsInFlightGradient(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
-
- } else if (Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
- } else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(eventName)) {
- String clusterId = event.getProperties().get("cluster_id");
- String memberId = event.getProperties().get("member_id");
-
- if (memberId == null || memberId.isEmpty()) {
- if(log.isErrorEnabled()) {
- log.error("Member id not found in received message");
- }
- } else {
- handleMemberFaultEvent(clusterId, memberId);
- }
- } else if(Constants.MEMBER_AVERAGE_LOAD_AVERAGE.equals(eventName)) {
- LoadAverage loadAverage = findLoadAverage(event);
- if(loadAverage != null) {
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
- loadAverage.setAverage(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
- }
- }
- } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
- LoadAverage loadAverage = findLoadAverage(event);
- if(loadAverage != null) {
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
- loadAverage.setSecondDerivative(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
- }
- }
- } else if(Constants.MEMBER_GRADIENT_LOAD_AVERAGE.equals(eventName)) {
- LoadAverage loadAverage = findLoadAverage(event);
- if(loadAverage != null) {
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
- loadAverage.setGradient(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
- }
- }
- } else if(Constants.MEMBER_AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
- MemoryConsumption memoryConsumption = findMemoryConsumption(event);
- if(memoryConsumption != null) {
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
- memoryConsumption.setAverage(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
- }
- }
- } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
- MemoryConsumption memoryConsumption = findMemoryConsumption(event);
- if(memoryConsumption != null) {
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
- memoryConsumption.setSecondDerivative(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
- }
- }
- } else if(Constants.MEMBER_GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
- MemoryConsumption memoryConsumption = findMemoryConsumption(event);
- if(memoryConsumption != null) {
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
- memoryConsumption.setGradient(floatValue);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [member] %s [value] %s",event, event.getProperties().get("member_id"), value));
- }
- }
-
- } else if(Constants.AVERAGE_LOAD_AVERAGE.equals(eventName)) {
-
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageLoadAverage(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
- } else if(Constants.SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
-
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
- } else if(Constants.GRADIENT_LOAD_AVERAGE.equals(eventName)) {
-
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setLoadAverageGradient(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
- } else if(Constants.AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
-
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setAverageMemoryConsumption(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
- } else if(Constants.SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
-
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
- } else if(Constants.GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
-
- String clusterId = event.getProperties().get("cluster_id");
- String networkPartitionId = event.getProperties().get("network_partition_id");
- String value = event.getProperties().get("value");
- Float floatValue = Float.parseFloat(value);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
- clusterId, networkPartitionId, value));
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
- if(null != monitor){
- NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
- if(null != networkPartitionContext){
- networkPartitionContext.setMemoryConsumptionGradient(floatValue);
- } else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Network partition context is not available for :" +
- " [network partition] %s", networkPartitionId));
- }
- }
- } else {
-
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
- }
- }
- }
- } catch (Exception e) {
- log.error("Failed to retrieve the health stat event message.", e);
- }
- }
- log.warn("Health event Message delegater is terminated");
- }
-
- private LoadAverage findLoadAverage(Event event) {
- String memberId = event.getProperties().get("member_id");
- Member member = findMember(memberId);
-
- if(null == member){
- if(log.isErrorEnabled()) {
- log.error(String.format("Member not found: [member] %s", memberId));
- }
- return null;
- }
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
- if(null == monitor){
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [member] %s", memberId));
- }
- return null;
- }
- String networkPartitionId = findNetworkPartitionId(memberId);
- MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
- .getPartitionCtxt(member.getPartitionId())
- .getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isErrorEnabled()) {
- log.error(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }
- else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
- " the health stat", memberId));
- }
- return null;
- }
-
- LoadAverage loadAverage = memberStatsContext.getLoadAverage();
- return loadAverage;
- }
-
- private MemoryConsumption findMemoryConsumption(Event event) {
- String memberId = event.getProperties().get("member_id");
- Member member = findMember(memberId);
-
- if(null == member){
- if(log.isErrorEnabled()) {
- log.error(String.format("Member not found: [member] %s", memberId));
- }
- return null;
- }
-
- AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
- if(null == monitor){
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster monitor is not available for : [member] %s", memberId));
- }
- return null;
- }
-
-
- String networkPartitionId = findNetworkPartitionId(memberId);
- MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
- .getPartitionCtxt(member.getPartitionId())
- .getMemberStatsContext(memberId);
- if(null == memberStatsContext){
- if(log.isErrorEnabled()) {
- log.error(String.format("Member context is not available for : [member] %s", memberId));
- }
- return null;
- }else if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
- " the health stat", memberId));
- }
- return null;
- }
- MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
-
- return memoryConsumption;
- }
-
- private String findNetworkPartitionId(String memberId) {
- for(Service service: TopologyManager.getTopology().getServices()){
- for(Cluster cluster: service.getClusters()){
- if(cluster.memberExists(memberId)){
- return cluster.getMember(memberId).getNetworkPartitionId();
- }
- }
- }
- return null;
- }
-
- private Member findMember(String memberId) {
- try {
- TopologyManager.acquireReadLock();
- for(Service service : TopologyManager.getTopology().getServices()) {
- for(Cluster cluster : service.getClusters()) {
- if(cluster.memberExists(memberId)) {
- return cluster.getMember(memberId);
- }
- }
- }
- return null;
- }
- finally {
- TopologyManager.releaseReadLock();
- }
- }
-
- private void handleMemberFaultEvent(String clusterId, String memberId) {
- try {
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractMonitor monitor;
-
- if(asCtx.moniterExist(clusterId)){
- monitor = asCtx.getMonitor(clusterId);
- }else if(asCtx.lbMoniterExist(clusterId)){
- monitor = asCtx.getLBMonitor(clusterId);
- }else{
- String errMsg = "A monitor is not found for this custer";
- log.error(errMsg);
- throw new RuntimeException(errMsg);
- }
-
- NetworkPartitionContext nwPartitionCtxt;
- try{
- TopologyManager.acquireReadLock();
- Member member = findMember(memberId);
-
- if(null == member){
- return;
- }
- if(!member.isActive()){
- if(log.isDebugEnabled()){
- log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
- " the member fault health stat", memberId));
- }
- return;
- }
-
- nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
-
- }finally{
- TopologyManager.releaseReadLock();
- }
- // terminate the faulty member
- CloudControllerClient ccClient = CloudControllerClient.getInstance();
- ccClient.terminate(memberId);
-
- // start a new member in the same Partition
- String partitionId = monitor.getPartitionOfMember(memberId);
- Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
- PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
-
- String lbClusterId = AutoscalerRuleEvaluator.getLbClusterId(partitionCtxt, nwPartitionCtxt);
- ccClient.spawnAnInstance(partition, clusterId, lbClusterId, nwPartitionCtxt.getId());
- if (log.isInfoEnabled()) {
- log.info(String.format("Instance spawned for fault member: [partition] %s [cluster] %s [lb cluster] %s ",
- partitionId, clusterId, lbClusterId));
- }
-
- } catch (TerminationException e) {
- log.error(e);
- } catch (SpawningException e) {
- log.error(e);
- }
- }
-
- public Event jsonToEvent(String json) {
-
- Event event = new Event();
- BufferedReader bufferedReader = new BufferedReader(new StringReader(json));
- JsonReader reader = new JsonReader(bufferedReader);
- try {
- reader.beginObject();
-
- if (reader.hasNext()) {
- event.setEventName(reader.nextName());
-
- reader.beginObject();
- Map<String, String> properties = new HashMap<String, String>();
- while (reader.hasNext()) {
- String name = reader.nextName();
- String value = reader.nextString();
- properties.put(name, value);
- }
- event.setProperties(properties);
- }
- reader.close();
- return event;
- } catch (IOException e) {
- log.error("Could not extract event");
- }
- return null;
- }
-
- private class Event {
- private String eventName;
- private Map<String, String> properties;
-
- private String getEventName() {
- return eventName;
- }
-
- private void setEventName(String eventName) {
- this.eventName = eventName;
- }
-
- private Map<String, String> getProperties() {
- return properties;
- }
-
- private void setProperties(Map<String, String> properties) {
- this.properties = properties;
- }
- }
-
- public void terminate(){
- this.terminate = true;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
deleted file mode 100644
index 866cd87..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventMessageReceiver.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.TextMessage;
-
-public class HealthEventMessageReceiver implements MessageListener {
-
- private static final Log log = LogFactory.getLog(HealthEventMessageReceiver.class);
-
- @Override
- public void onMessage(Message message) {
- if (message instanceof TextMessage) {
- TextMessage receivedMessage = (TextMessage) message;
- try {
- if (log.isDebugEnabled()) {
- log.debug("Message received: " + ((TextMessage) message).getText());
- }
- // Add received message to the queue
- HealthEventQueue.getInstance().add(receivedMessage);
-
- } catch (JMSException e) {
- log.error(e.getMessage(), e);
- }
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java
deleted file mode 100644
index 731f25b..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/HealthEventQueue.java
+++ /dev/null
@@ -1,46 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.message.receiver.health;
-
-import javax.jms.TextMessage;
-import java.util.concurrent.LinkedBlockingQueue;
-
-/**
- * Implements topology event queue.
- */
-public class HealthEventQueue extends LinkedBlockingQueue<TextMessage>{
-
- private static final long serialVersionUID = 2556240855574421561L;
- private static volatile HealthEventQueue instance;
-
- private HealthEventQueue(){
- }
-
- public static HealthEventQueue getInstance() {
- if (instance == null) {
- synchronized (HealthEventQueue.class){
- if (instance == null) {
- instance = new HealthEventQueue();
- }
- }
- }
- return instance;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
index 74d9770..9b7965e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/topology/AutoscalerTopologyReceiver.java
@@ -82,6 +82,12 @@ public class AutoscalerTopologyReceiver implements Runnable {
private TopologyEventMessageDelegator createMessageDelegator() {
TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+ return new TopologyEventMessageDelegator(processorChain);
+ }
+
+ private TopologyMessageProcessorChain createEventProcessorChain() {
+ // Listen to topology events that affect clusters
+ TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
processorChain.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
@@ -107,12 +113,7 @@ public class AutoscalerTopologyReceiver implements Runnable {
}
});
- return new TopologyEventMessageDelegator(processorChain);
- }
- private TopologyMessageProcessorChain createEventProcessorChain() {
- // Listen to topology events that affect clusters
- TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
processorChain.addEventListener(new ClusterCreatedEventListener() {
@Override
protected void onEvent(Event event) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
index 5d9e9af..bd36b76 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/health/stat/SecondDerivativeOfMemoryConsumptionEvent.java
@@ -25,10 +25,13 @@ import org.apache.stratos.messaging.event.Event;
* This event is fired by Event processing engine to send second derivative of memory consumption
*/
public class SecondDerivativeOfMemoryConsumptionEvent extends Event {
+
+ private final String networkPartitionId;
private final String clusterId;
private final float value;
- public SecondDerivativeOfMemoryConsumptionEvent(String clusterId, float value) {
+ public SecondDerivativeOfMemoryConsumptionEvent(String networkPartitionId, String clusterId, float value) {
+ this.networkPartitionId = networkPartitionId;
this.clusterId = clusterId;
this.value = value;
}
@@ -41,4 +44,8 @@ public class SecondDerivativeOfMemoryConsumptionEvent extends Event {
public float getValue() {
return value;
}
+
+ public String getNetworkPartitionId() {
+ return networkPartitionId;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
index f2a9bc8..7786c39 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatEventMessageDelegator.java
@@ -96,7 +96,7 @@ public class HealthStatEventMessageDelegator implements Runnable {
}
- public EventMessage jsonToEventMessage(String json) {
+ private EventMessage jsonToEventMessage(String json) {
EventMessage event = new EventMessage();
String message;
@@ -107,6 +107,9 @@ public class HealthStatEventMessageDelegator implements Runnable {
String eventType = MessageParts[0].trim();
eventType = eventType.substring(eventType.indexOf("\"") + 1, eventType.lastIndexOf("\""));
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Extracted [event type] %s", eventType));
+ }
event.setEventName(eventType);
String messageTag = MessageParts[1];
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
new file mode 100644
index 0000000..728cd8a
--- /dev/null
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/message/receiver/health/stat/HealthStatReceiver.java
@@ -0,0 +1,77 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.messaging.message.receiver.health.stat;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
+import org.apache.stratos.messaging.util.Constants;
+
+/**
+ * A thread for receiving health stat information from message broker
+ */
+public class HealthStatReceiver implements Runnable {
+ private static final Log log = LogFactory.getLog(HealthStatReceiver.class);
+ private HealthStatEventMessageDelegator messageDelegator;
+ private TopicSubscriber topicSubscriber;
+ private boolean terminated;
+
+ public HealthStatReceiver() {
+ this.messageDelegator = new HealthStatEventMessageDelegator();
+ }
+
+ public HealthStatReceiver(HealthStatEventMessageDelegator messageDelegator) {
+ this.messageDelegator = messageDelegator;
+ }
+
+ @Override
+ public void run() {
+ try {
+ // Start topic subscriber thread
+ topicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
+ topicSubscriber.setMessageListener(new HealthStatEventMessageListener());
+ Thread subscriberThread = new Thread(topicSubscriber);
+ subscriberThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Health stst event message receiver thread started");
+ }
+
+ // Start health stat event message delegator thread
+ Thread receiverThread = new Thread(messageDelegator);
+ receiverThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Health stst event message delegator thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated);
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Topology receiver failed", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ topicSubscriber.terminate();
+ messageDelegator.terminate();
+ terminated = true;
+ }
+}
[2/2] git commit: Moving the autoscaler health stat receiver to
message processor model gc: STRATOS-332
Posted by la...@apache.org.
Moving the autoscaler health stat receiver to message processor model gc: STRATOS-332
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/253bbb08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/253bbb08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/253bbb08
Branch: refs/heads/master
Commit: 253bbb08a9fab307570a2832cc965757384e0422
Parents: 41687d8
Author: Lahiru Sandaruwan <la...@apache.org>
Authored: Sat Jan 4 20:44:19 2014 +0530
Committer: Lahiru Sandaruwan <la...@apache.org>
Committed: Sat Jan 4 20:44:19 2014 +0530
----------------------------------------------------------------------
.../event/AverageRequestsInFlightEvent.java | 49 -
.../event/GradientOfRequestsInFlightEvent.java | 49 -
...SecondDerivativeOfRequestsInFlightEvent.java | 49 -
.../internal/AutoscalerServerComponent.java | 40 +-
.../health/AutoscalerHealthStatReceiver.java | 1045 ++++++++++++++++++
.../health/HealthEventMessageDelegator.java | 639 -----------
.../health/HealthEventMessageReceiver.java | 49 -
.../receiver/health/HealthEventQueue.java | 46 -
.../topology/AutoscalerTopologyReceiver.java | 11 +-
...econdDerivativeOfMemoryConsumptionEvent.java | 9 +-
.../stat/HealthStatEventMessageDelegator.java | 5 +-
.../health/stat/HealthStatReceiver.java | 77 ++
12 files changed, 1158 insertions(+), 910 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java
deleted file mode 100644
index 37edcf6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.event;
-
-import java.io.Serializable;
-
-/**
- * This event is fired by Event processing engine to send average of requests in flight
- */
-public class AverageRequestsInFlightEvent implements Serializable {
-
- private static final long serialVersionUID = 7178667274015434275L;
- private String clusterId;
- private float value;
-
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public float getValue() {
- return value;
- }
-
- public void setValue(float value) {
- this.value = value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java
deleted file mode 100644
index ad55619..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.event;
-
-import java.io.Serializable;
-
-/**
- * This event is fired by Event processing engine to send gradient of requests in flight
- */
-public class GradientOfRequestsInFlightEvent implements Serializable {
-
- private static final long serialVersionUID = 6140723469565274572L;
- private String clusterId;
- private float value;
-
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public float getValue() {
- return value;
- }
-
- public void setValue(float value) {
- this.value = value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java
deleted file mode 100644
index ae4ca7a..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.event;
-
-import java.io.Serializable;
-
-/**
- * This event is fired by Event processing engine to send second derivative of requests in flight
- */
-public class SecondDerivativeOfRequestsInFlightEvent implements Serializable {
-
- private static final long serialVersionUID = 8857808689466762084L;
- private String clusterId;
- private float value;
-
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public float getValue() {
- return value;
- }
-
- public void setValue(float value) {
- this.value = value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 257cf6c..2a1e5df 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -18,29 +18,26 @@
*/
package org.apache.stratos.autoscaler.internal;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
import org.apache.stratos.autoscaler.exception.AutoScalerException;
-import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageDelegator;
-import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageReceiver;
+import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatReceiver;
+import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyReceiver;
import org.apache.stratos.autoscaler.partition.PartitionManager;
import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
import org.apache.stratos.autoscaler.registry.RegistryManager;
-import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyReceiver;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.cloud.controller.deployment.partition.Partition;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.registry.api.RegistryException;
import org.wso2.carbon.registry.core.service.RegistryService;
+import java.util.Iterator;
+import java.util.List;
+
/**
* @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent"
* immediate="true"
@@ -55,8 +52,8 @@ public class AutoscalerServerComponent {
private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
AutoscalerTopologyReceiver asTopologyReceiver;
- TopicSubscriber healthStatTopicSubscriber;
- HealthEventMessageDelegator healthEventMessageDelegator;
+// TopicSubscriber healthStatTopicSubscriber;
+ AutoscalerHealthStatReceiver autoscalerHealthStatReceiver;
protected void activate(ComponentContext componentContext) throws Exception {
try {
@@ -67,18 +64,18 @@ public class AutoscalerServerComponent {
if (log.isDebugEnabled()) {
log.debug("Topology receiver thread started");
}
+// healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
+// healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver());
+// Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber);
+// healthStatTopicSubscriberThread.start();
+// if (log.isDebugEnabled()) {
+// log.debug("Health event message receiver thread started");
+// }
- // Start health stat receiver
- healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
- healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver());
- Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber);
- healthStatTopicSubscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Health event message receiver thread started");
- }
- healthEventMessageDelegator = new HealthEventMessageDelegator();
- Thread healthDelegatorThread = new Thread(healthEventMessageDelegator);
+ // Start health stat receiver
+ autoscalerHealthStatReceiver = new AutoscalerHealthStatReceiver();
+ Thread healthDelegatorThread = new Thread(autoscalerHealthStatReceiver);
healthDelegatorThread.start();
if (log.isDebugEnabled()) {
log.debug("Health message processor thread started");
@@ -124,8 +121,7 @@ public class AutoscalerServerComponent {
protected void deactivate(ComponentContext context) {
asTopologyReceiver.terminate();
- healthStatTopicSubscriber.terminate();
- healthEventMessageDelegator.terminate();
+ autoscalerHealthStatReceiver.terminate();
}
protected void setRegistryService(RegistryService registryService) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
new file mode 100644
index 0000000..3954610
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
@@ -0,0 +1,1045 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.message.receiver.health;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.SpawningException;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
+import org.apache.stratos.autoscaler.policy.model.LoadAverage;
+import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.cloud.controller.deployment.partition.Partition;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.*;
+import org.apache.stratos.messaging.listener.health.stat.*;
+import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+
+/**
+ * A thread for processing topology messages and updating the topology data structure.
+ */
+public class AutoscalerHealthStatReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(AutoscalerHealthStatReceiver.class);
+ private boolean terminated = false;
+
+ private HealthStatReceiver healthStatReceiver;
+// @Override
+// public void run() {
+// if(log.isInfoEnabled()) {
+// log.info("Health event message delegator started");
+// }
+//
+// if(log.isDebugEnabled()) {
+// log.debug("Waiting for topology to be initialized");
+// }
+// while(!TopologyManager.getTopology().isInitialized());
+//
+// while (!terminate) {
+// try {
+// TextMessage message = HealthStatEventMessageQueue.getInstance().take();
+//
+// String messageText = message.getText();
+// if (log.isDebugEnabled()) {
+// log.debug("Health event message received: [message] " + messageText);
+// }
+// Event event = jsonToEvent(messageText);
+// String eventName = event.getEventName();
+//
+// if (log.isInfoEnabled()) {
+// log.info(String.format("Received event: [event-name] %s", eventName));
+// }
+//
+// if (Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+//
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setAverageRequestsInFlight(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+//
+// } else if (Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setRequestsInFlightGradient(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+//
+// } else if (Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String memberId = event.getProperties().get("member_id");
+//
+// if (memberId == null || memberId.isEmpty()) {
+// if(log.isErrorEnabled()) {
+// log.error("Member id not found in received message");
+// }
+// } else {
+// handleMemberFaultEvent(clusterId, memberId);
+// }
+// } else if(Constants.MEMBER_AVERAGE_LOAD_AVERAGE.equals(eventName)) {
+// LoadAverage loadAverage = findLoadAverage(event);
+// if(loadAverage != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// loadAverage.setAverage(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
+// LoadAverage loadAverage = findLoadAverage(event);
+// if(loadAverage != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// loadAverage.setSecondDerivative(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_GRADIENT_LOAD_AVERAGE.equals(eventName)) {
+// LoadAverage loadAverage = findLoadAverage(event);
+// if(loadAverage != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// loadAverage.setGradient(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
+// MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+// if(memoryConsumption != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// memoryConsumption.setAverage(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
+// MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+// if(memoryConsumption != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// memoryConsumption.setSecondDerivative(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
+// MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+// if(memoryConsumption != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// memoryConsumption.setGradient(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s",event, event.getProperties().get("member_id"), value));
+// }
+// }
+//
+// } else if(Constants.AVERAGE_LOAD_AVERAGE.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setAverageLoadAverage(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.GRADIENT_LOAD_AVERAGE.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setLoadAverageGradient(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setAverageMemoryConsumption(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setMemoryConsumptionGradient(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// }
+// } catch (Exception e) {
+// log.error("Failed to retrieve the health stat event message.", e);
+// }
+// }
+// log.warn("Health event Message delegater is terminated");
+// }
+//
+
+
+ @Override
+ public void run() {
+ //FIXME this activated before autoscaler deployer activated.
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException ignore) {
+ }
+ Thread thread = new Thread(healthStatReceiver);
+ thread.start();
+ if(log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated);
+ if(log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread terminated");
+ }
+ }
+
+ private HealthStatEventMessageDelegator createMessageDelegator() {
+ HealthStatMessageProcessorChain processorChain = createEventProcessorChain();
+ return new HealthStatEventMessageDelegator(processorChain);
+ }
+
+ private HealthStatMessageProcessorChain createEventProcessorChain() {
+ // Listen to health stat events that affect clusters
+ HealthStatMessageProcessorChain processorChain = new HealthStatMessageProcessorChain();
+ processorChain.addEventListener(new AverageLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageLoadAverage(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new AverageMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg MC event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageMemoryConsumption(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new AverageRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+ Float floatValue = e.getValue();
+
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageRequestsInFlight(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new GradientOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setLoadAverageGradient(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new GradientOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of MC event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setMemoryConsumptionGradient(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new GradientOfRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setRequestsInFlightGradient(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new MemberAverageLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member avg of load avg event: [member] %s [value] %s", e.getMemberId()
+ , floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
+ MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+ if(memoryConsumption != null) {
+
+ Float floatValue = e.getValue();
+ memoryConsumption.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member avg MC event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberFaultEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberFaultEvent e = (MemberFaultEvent) event;
+ String clusterId = e.getClusterId();
+ String memberId = e.getMemberId();
+
+ if (memberId == null || memberId.isEmpty()) {
+ if(log.isErrorEnabled()) {
+ log.error("Member id not found in received message");
+ }
+ } else {
+ handleMemberFaultEvent(clusterId, memberId);
+ }
+ }
+
+ });
+ processorChain.addEventListener(new MemberGradientOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member grad of load avg event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
+ MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+ if(memoryConsumption != null) {
+
+ Float floatValue = e.getValue();
+ memoryConsumption.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Meber grad of MC event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setSecondDerivative(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member SD of load avg event: [member] %s [value] %s", e.getMemberId()
+ , floatValue));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ }
+
+ });
+ processorChain.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("SD of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("SD of MC event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second dericvative of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+
+ return processorChain;
+ }
+
+
+ private LoadAverage findLoadAverage(String memberId) {
+// String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+
+ if(null == member){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member not found: [member] %s", memberId));
+ }
+ return null;
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
+ if(null == monitor){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [member] %s", memberId));
+ }
+ return null;
+ }
+ String networkPartitionId = findNetworkPartitionId(memberId);
+ MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId);
+ if(null == memberStatsContext){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return null;
+ }
+ else if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the health stat", memberId));
+ }
+ return null;
+ }
+
+ LoadAverage loadAverage = memberStatsContext.getLoadAverage();
+ return loadAverage;
+ }
+
+ private MemoryConsumption findMemoryConsumption(String memberId) {
+// String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+
+ if(null == member){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member not found: [member] %s", memberId));
+ }
+ return null;
+ }
+
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
+ if(null == monitor){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [member] %s", memberId));
+ }
+ return null;
+ }
+
+
+ String networkPartitionId = findNetworkPartitionId(memberId);
+ MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId);
+ if(null == memberStatsContext){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return null;
+ }else if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the health stat", memberId));
+ }
+ return null;
+ }
+ MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
+
+ return memoryConsumption;
+ }
+
+ private String findNetworkPartitionId(String memberId) {
+ for(Service service: TopologyManager.getTopology().getServices()){
+ for(Cluster cluster: service.getClusters()){
+ if(cluster.memberExists(memberId)){
+ return cluster.getMember(memberId).getNetworkPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ private Member findMember(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for(Service service : TopologyManager.getTopology().getServices()) {
+ for(Cluster cluster : service.getClusters()) {
+ if(cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ private void handleMemberFaultEvent(String clusterId, String memberId) {
+ try {
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.moniterExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMoniterExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ String errMsg = "A monitor is not found for this custer";
+ log.error(errMsg);
+ throw new RuntimeException(errMsg);
+ }
+
+ NetworkPartitionContext nwPartitionCtxt;
+ try{
+ TopologyManager.acquireReadLock();
+ Member member = findMember(memberId);
+
+ if(null == member){
+ return;
+ }
+ if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
+
+ }finally{
+ TopologyManager.releaseReadLock();
+ }
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ ccClient.terminate(memberId);
+
+ // start a new member in the same Partition
+ String partitionId = monitor.getPartitionOfMember(memberId);
+ Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+ String lbClusterId = AutoscalerRuleEvaluator.getLbClusterId(partitionCtxt, nwPartitionCtxt);
+ ccClient.spawnAnInstance(partition, clusterId, lbClusterId, nwPartitionCtxt.getId());
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Instance spawned for fault member: [partition] %s [cluster] %s [lb cluster] %s ",
+ partitionId, clusterId, lbClusterId));
+ }
+
+ } catch (TerminationException e) {
+ log.error(e);
+ } catch (SpawningException e) {
+ log.error(e);
+ }
+ }
+
+ public void terminate(){
+ this.terminated = true;
+ }
+}