You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/01/04 16:09:51 UTC
[2/2] git commit: Moving the autoscaler health stat receiver to
message processor model gc: STRATOS-332
Moving the autoscaler health stat receiver to message processor model gc: STRATOS-332
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/253bbb08
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/253bbb08
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/253bbb08
Branch: refs/heads/master
Commit: 253bbb08a9fab307570a2832cc965757384e0422
Parents: 41687d8
Author: Lahiru Sandaruwan <la...@apache.org>
Authored: Sat Jan 4 20:44:19 2014 +0530
Committer: Lahiru Sandaruwan <la...@apache.org>
Committed: Sat Jan 4 20:44:19 2014 +0530
----------------------------------------------------------------------
.../event/AverageRequestsInFlightEvent.java | 49 -
.../event/GradientOfRequestsInFlightEvent.java | 49 -
...SecondDerivativeOfRequestsInFlightEvent.java | 49 -
.../internal/AutoscalerServerComponent.java | 40 +-
.../health/AutoscalerHealthStatReceiver.java | 1045 ++++++++++++++++++
.../health/HealthEventMessageDelegator.java | 639 -----------
.../health/HealthEventMessageReceiver.java | 49 -
.../receiver/health/HealthEventQueue.java | 46 -
.../topology/AutoscalerTopologyReceiver.java | 11 +-
...econdDerivativeOfMemoryConsumptionEvent.java | 9 +-
.../stat/HealthStatEventMessageDelegator.java | 5 +-
.../health/stat/HealthStatReceiver.java | 77 ++
12 files changed, 1158 insertions(+), 910 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java
deleted file mode 100644
index 37edcf6..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/AverageRequestsInFlightEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.event;
-
-import java.io.Serializable;
-
-/**
- * This event is fired by Event processing engine to send average of requests in flight
- */
-public class AverageRequestsInFlightEvent implements Serializable {
-
- private static final long serialVersionUID = 7178667274015434275L;
- private String clusterId;
- private float value;
-
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public float getValue() {
- return value;
- }
-
- public void setValue(float value) {
- this.value = value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java
deleted file mode 100644
index ad55619..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/GradientOfRequestsInFlightEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.event;
-
-import java.io.Serializable;
-
-/**
- * This event is fired by Event processing engine to send gradient of requests in flight
- */
-public class GradientOfRequestsInFlightEvent implements Serializable {
-
- private static final long serialVersionUID = 6140723469565274572L;
- private String clusterId;
- private float value;
-
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public float getValue() {
- return value;
- }
-
- public void setValue(float value) {
- this.value = value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java
deleted file mode 100644
index ae4ca7a..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/SecondDerivativeOfRequestsInFlightEvent.java
+++ /dev/null
@@ -1,49 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.autoscaler.event;
-
-import java.io.Serializable;
-
-/**
- * This event is fired by Event processing engine to send second derivative of requests in flight
- */
-public class SecondDerivativeOfRequestsInFlightEvent implements Serializable {
-
- private static final long serialVersionUID = 8857808689466762084L;
- private String clusterId;
- private float value;
-
-
- public String getClusterId() {
- return clusterId;
- }
-
- public void setClusterId(String clusterId) {
- this.clusterId = clusterId;
- }
-
- public float getValue() {
- return value;
- }
-
- public void setValue(float value) {
- this.value = value;
- }
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 257cf6c..2a1e5df 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -18,29 +18,26 @@
*/
package org.apache.stratos.autoscaler.internal;
-import java.util.Iterator;
-import java.util.List;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
import org.apache.stratos.autoscaler.exception.AutoScalerException;
-import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageDelegator;
-import org.apache.stratos.autoscaler.message.receiver.health.HealthEventMessageReceiver;
+import org.apache.stratos.autoscaler.message.receiver.health.AutoscalerHealthStatReceiver;
+import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyReceiver;
import org.apache.stratos.autoscaler.partition.PartitionManager;
import org.apache.stratos.autoscaler.policy.PolicyManager;
import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
import org.apache.stratos.autoscaler.registry.RegistryManager;
-import org.apache.stratos.autoscaler.message.receiver.topology.AutoscalerTopologyReceiver;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.cloud.controller.deployment.partition.Partition;
-import org.apache.stratos.messaging.broker.subscribe.TopicSubscriber;
-import org.apache.stratos.messaging.util.Constants;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.registry.api.RegistryException;
import org.wso2.carbon.registry.core.service.RegistryService;
+import java.util.Iterator;
+import java.util.List;
+
/**
* @scr.component name=org.apache.stratos.autoscaler.internal.AutoscalerServerComponent"
* immediate="true"
@@ -55,8 +52,8 @@ public class AutoscalerServerComponent {
private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
AutoscalerTopologyReceiver asTopologyReceiver;
- TopicSubscriber healthStatTopicSubscriber;
- HealthEventMessageDelegator healthEventMessageDelegator;
+// TopicSubscriber healthStatTopicSubscriber;
+ AutoscalerHealthStatReceiver autoscalerHealthStatReceiver;
protected void activate(ComponentContext componentContext) throws Exception {
try {
@@ -67,18 +64,18 @@ public class AutoscalerServerComponent {
if (log.isDebugEnabled()) {
log.debug("Topology receiver thread started");
}
+// healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
+// healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver());
+// Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber);
+// healthStatTopicSubscriberThread.start();
+// if (log.isDebugEnabled()) {
+// log.debug("Health event message receiver thread started");
+// }
- // Start health stat receiver
- healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
- healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver());
- Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber);
- healthStatTopicSubscriberThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Health event message receiver thread started");
- }
- healthEventMessageDelegator = new HealthEventMessageDelegator();
- Thread healthDelegatorThread = new Thread(healthEventMessageDelegator);
+ // Start health stat receiver
+ autoscalerHealthStatReceiver = new AutoscalerHealthStatReceiver();
+ Thread healthDelegatorThread = new Thread(autoscalerHealthStatReceiver);
healthDelegatorThread.start();
if (log.isDebugEnabled()) {
log.debug("Health message processor thread started");
@@ -124,8 +121,7 @@ public class AutoscalerServerComponent {
protected void deactivate(ComponentContext context) {
asTopologyReceiver.terminate();
- healthStatTopicSubscriber.terminate();
- healthEventMessageDelegator.terminate();
+ autoscalerHealthStatReceiver.terminate();
}
protected void setRegistryService(RegistryService registryService) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/253bbb08/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
new file mode 100644
index 0000000..3954610
--- /dev/null
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/message/receiver/health/AutoscalerHealthStatReceiver.java
@@ -0,0 +1,1045 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.autoscaler.message.receiver.health;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.AutoscalerContext;
+import org.apache.stratos.autoscaler.MemberStatsContext;
+import org.apache.stratos.autoscaler.NetworkPartitionContext;
+import org.apache.stratos.autoscaler.PartitionContext;
+import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
+import org.apache.stratos.autoscaler.exception.SpawningException;
+import org.apache.stratos.autoscaler.exception.TerminationException;
+import org.apache.stratos.autoscaler.monitor.AbstractMonitor;
+import org.apache.stratos.autoscaler.policy.model.LoadAverage;
+import org.apache.stratos.autoscaler.policy.model.MemoryConsumption;
+import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
+import org.apache.stratos.cloud.controller.deployment.partition.Partition;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.health.stat.*;
+import org.apache.stratos.messaging.listener.health.stat.*;
+import org.apache.stratos.messaging.message.processor.health.stat.HealthStatMessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.health.stat.HealthStatReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+
+/**
+ * A thread for processing topology messages and updating the topology data structure.
+ */
+public class AutoscalerHealthStatReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(AutoscalerHealthStatReceiver.class);
+ private boolean terminated = false;
+
+ private HealthStatReceiver healthStatReceiver;
+// @Override
+// public void run() {
+// if(log.isInfoEnabled()) {
+// log.info("Health event message delegator started");
+// }
+//
+// if(log.isDebugEnabled()) {
+// log.debug("Waiting for topology to be initialized");
+// }
+// while(!TopologyManager.getTopology().isInitialized());
+//
+// while (!terminate) {
+// try {
+// TextMessage message = HealthStatEventMessageQueue.getInstance().take();
+//
+// String messageText = message.getText();
+// if (log.isDebugEnabled()) {
+// log.debug("Health event message received: [message] " + messageText);
+// }
+// Event event = jsonToEvent(messageText);
+// String eventName = event.getEventName();
+//
+// if (log.isInfoEnabled()) {
+// log.info(String.format("Received event: [event-name] %s", eventName));
+// }
+//
+// if (Constants.AVERAGE_REQUESTS_IN_FLIGHT.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+//
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setAverageRequestsInFlight(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+//
+// } else if (Constants.GRADIENT_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setRequestsInFlightGradient(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+//
+// } else if (Constants.SECOND_DERIVATIVE_OF_REQUESTS_IN_FLIGHT.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if (Constants.MEMBER_FAULT_EVENT_NAME.equals(eventName)) {
+// String clusterId = event.getProperties().get("cluster_id");
+// String memberId = event.getProperties().get("member_id");
+//
+// if (memberId == null || memberId.isEmpty()) {
+// if(log.isErrorEnabled()) {
+// log.error("Member id not found in received message");
+// }
+// } else {
+// handleMemberFaultEvent(clusterId, memberId);
+// }
+// } else if(Constants.MEMBER_AVERAGE_LOAD_AVERAGE.equals(eventName)) {
+// LoadAverage loadAverage = findLoadAverage(event);
+// if(loadAverage != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// loadAverage.setAverage(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
+// LoadAverage loadAverage = findLoadAverage(event);
+// if(loadAverage != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// loadAverage.setSecondDerivative(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_GRADIENT_LOAD_AVERAGE.equals(eventName)) {
+// LoadAverage loadAverage = findLoadAverage(event);
+// if(loadAverage != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// loadAverage.setGradient(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
+// MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+// if(memoryConsumption != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// memoryConsumption.setAverage(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
+// MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+// if(memoryConsumption != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// memoryConsumption.setSecondDerivative(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s", event, event.getProperties().get("member_id"), value));
+// }
+// }
+// } else if(Constants.MEMBER_GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
+// MemoryConsumption memoryConsumption = findMemoryConsumption(event);
+// if(memoryConsumption != null) {
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+// memoryConsumption.setGradient(floatValue);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [member] %s [value] %s",event, event.getProperties().get("member_id"), value));
+// }
+// }
+//
+// } else if(Constants.AVERAGE_LOAD_AVERAGE.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setAverageLoadAverage(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.SECOND_DERIVATIVE_OF_LOAD_AVERAGE.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.GRADIENT_LOAD_AVERAGE.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setLoadAverageGradient(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.AVERAGE_MEMORY_CONSUMPTION.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setAverageMemoryConsumption(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.SECOND_DERIVATIVE_OF_MEMORY_CONSUMPTION.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// } else if(Constants.GRADIENT_MEMORY_CONSUMPTION.equals(eventName)) {
+//
+// String clusterId = event.getProperties().get("cluster_id");
+// String networkPartitionId = event.getProperties().get("network_partition_id");
+// String value = event.getProperties().get("value");
+// Float floatValue = Float.parseFloat(value);
+//
+// if (log.isDebugEnabled()) {
+// log.debug(String.format("%s event: [cluster] %s [network-partition] %s [value] %s", eventName,
+// clusterId, networkPartitionId, value));
+// }
+// AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+// if(null != monitor){
+// NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+// if(null != networkPartitionContext){
+// networkPartitionContext.setMemoryConsumptionGradient(floatValue);
+// } else {
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Network partition context is not available for :" +
+// " [network partition] %s", networkPartitionId));
+// }
+// }
+// } else {
+//
+// if(log.isErrorEnabled()) {
+// log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+// }
+// }
+// }
+// } catch (Exception e) {
+// log.error("Failed to retrieve the health stat event message.", e);
+// }
+// }
+// log.warn("Health event Message delegater is terminated");
+// }
+//
+
+
+ @Override
+ public void run() {
+ //FIXME this activated before autoscaler deployer activated.
+ try {
+ Thread.sleep(15000);
+ } catch (InterruptedException ignore) {
+ }
+ Thread thread = new Thread(healthStatReceiver);
+ thread.start();
+ if(log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread started");
+ }
+
+ // Keep the thread live until terminated
+ while (!terminated);
+ if(log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread terminated");
+ }
+ }
+
+ private HealthStatEventMessageDelegator createMessageDelegator() {
+ HealthStatMessageProcessorChain processorChain = createEventProcessorChain();
+ return new HealthStatEventMessageDelegator(processorChain);
+ }
+
+ private HealthStatMessageProcessorChain createEventProcessorChain() {
+ // Listen to health stat events that affect clusters
+ HealthStatMessageProcessorChain processorChain = new HealthStatMessageProcessorChain();
+ processorChain.addEventListener(new AverageLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ AverageLoadAverageEvent e = (AverageLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageLoadAverage(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new AverageMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ AverageMemoryConsumptionEvent e = (AverageMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Avg MC event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageMemoryConsumption(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new AverageRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ AverageRequestsInFlightEvent e = (AverageRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+ Float floatValue = e.getValue();
+
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Average Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setAverageRequestsInFlight(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new GradientOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ GradientOfLoadAverageEvent e = (GradientOfLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setLoadAverageGradient(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new GradientOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ GradientOfMemoryConsumptionEvent e = (GradientOfMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Grad of MC event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setMemoryConsumptionGradient(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new GradientOfRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ GradientOfRequestsInFlightEvent e = (GradientOfRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Gradient of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setRequestsInFlightGradient(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new MemberAverageLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberAverageLoadAverageEvent e = (MemberAverageLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member avg of load avg event: [member] %s [value] %s", e.getMemberId()
+ , floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberAverageMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberAverageMemoryConsumptionEvent e = (MemberAverageMemoryConsumptionEvent) event;
+ MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+ if(memoryConsumption != null) {
+
+ Float floatValue = e.getValue();
+ memoryConsumption.setAverage(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member avg MC event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberFaultEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberFaultEvent e = (MemberFaultEvent) event;
+ String clusterId = e.getClusterId();
+ String memberId = e.getMemberId();
+
+ if (memberId == null || memberId.isEmpty()) {
+ if(log.isErrorEnabled()) {
+ log.error("Member id not found in received message");
+ }
+ } else {
+ handleMemberFaultEvent(clusterId, memberId);
+ }
+ }
+
+ });
+ processorChain.addEventListener(new MemberGradientOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberGradientOfLoadAverageEvent e = (MemberGradientOfLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member grad of load avg event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberGradientOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberGradientOfMemoryConsumptionEvent e = (MemberGradientOfMemoryConsumptionEvent) event;
+ MemoryConsumption memoryConsumption = findMemoryConsumption(e.getMemberId());
+ if(memoryConsumption != null) {
+
+ Float floatValue = e.getValue();
+ memoryConsumption.setGradient(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Meber grad of MC event: [member] %s [value] %s", e.getMemberId(),
+ floatValue));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new MemberSecondDerivativeOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ MemberSecondDerivativeOfLoadAverageEvent e = (MemberSecondDerivativeOfLoadAverageEvent) event;
+ LoadAverage loadAverage = findLoadAverage(e.getMemberId());
+ if(loadAverage != null) {
+
+ Float floatValue = e.getValue();
+ loadAverage.setSecondDerivative(floatValue);
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Member SD of load avg event: [member] %s [value] %s", e.getMemberId()
+ , floatValue));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new MemberSecondDerivativeOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ }
+
+ });
+ processorChain.addEventListener(new SecondDerivativeOfLoadAverageEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ SecondDerivativeOfLoadAverageEvent e = (SecondDerivativeOfLoadAverageEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("SD of load avg event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setLoadAverageSecondDerivative(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+ processorChain.addEventListener(new SecondDerivativeOfMemoryConsumptionEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+
+ SecondDerivativeOfMemoryConsumptionEvent e = (SecondDerivativeOfMemoryConsumptionEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("SD of MC event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setMemoryConsumptionSecondDerivative(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+
+ }
+
+ });
+ processorChain.addEventListener(new SecondDerivativeOfRequestsInFlightEventListener() {
+ @Override
+ protected void onEvent(org.apache.stratos.messaging.event.Event event) {
+ SecondDerivativeOfRequestsInFlightEvent e = (SecondDerivativeOfRequestsInFlightEvent) event;
+ String clusterId = e.getClusterId();
+ String networkPartitionId = e.getNetworkPartitionId();
+ Float floatValue = e.getValue();
+
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Second dericvative of Rif event: [cluster] %s [network-partition] %s [value] %s",
+ clusterId, networkPartitionId, floatValue));
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(clusterId);
+ if(null != monitor){
+ NetworkPartitionContext networkPartitionContext = monitor.getNetworkPartitionCtxt(networkPartitionId);
+ if(null != networkPartitionContext){
+ networkPartitionContext.setRequestsInFlightSecondDerivative(floatValue);
+ } else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Network partition context is not available for :" +
+ " [network partition] %s", networkPartitionId));
+ }
+ }
+ } else {
+
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [cluster] %s", clusterId));
+ }
+ }
+ }
+
+ });
+
+ return processorChain;
+ }
+
+
+ private LoadAverage findLoadAverage(String memberId) {
+// String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+
+ if(null == member){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member not found: [member] %s", memberId));
+ }
+ return null;
+ }
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
+ if(null == monitor){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [member] %s", memberId));
+ }
+ return null;
+ }
+ String networkPartitionId = findNetworkPartitionId(memberId);
+ MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId);
+ if(null == memberStatsContext){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return null;
+ }
+ else if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the health stat", memberId));
+ }
+ return null;
+ }
+
+ LoadAverage loadAverage = memberStatsContext.getLoadAverage();
+ return loadAverage;
+ }
+
+ private MemoryConsumption findMemoryConsumption(String memberId) {
+// String memberId = event.getProperties().get("member_id");
+ Member member = findMember(memberId);
+
+ if(null == member){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member not found: [member] %s", memberId));
+ }
+ return null;
+ }
+
+ AbstractMonitor monitor = AutoscalerContext.getInstance().getMonitor(member.getClusterId());
+ if(null == monitor){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Cluster monitor is not available for : [member] %s", memberId));
+ }
+ return null;
+ }
+
+
+ String networkPartitionId = findNetworkPartitionId(memberId);
+ MemberStatsContext memberStatsContext = monitor.getNetworkPartitionCtxt(networkPartitionId)
+ .getPartitionCtxt(member.getPartitionId())
+ .getMemberStatsContext(memberId);
+ if(null == memberStatsContext){
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Member context is not available for : [member] %s", memberId));
+ }
+ return null;
+ }else if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the health stat", memberId));
+ }
+ return null;
+ }
+ MemoryConsumption memoryConsumption = memberStatsContext.getMemoryConsumption();
+
+ return memoryConsumption;
+ }
+
+ private String findNetworkPartitionId(String memberId) {
+ for(Service service: TopologyManager.getTopology().getServices()){
+ for(Cluster cluster: service.getClusters()){
+ if(cluster.memberExists(memberId)){
+ return cluster.getMember(memberId).getNetworkPartitionId();
+ }
+ }
+ }
+ return null;
+ }
+
+ private Member findMember(String memberId) {
+ try {
+ TopologyManager.acquireReadLock();
+ for(Service service : TopologyManager.getTopology().getServices()) {
+ for(Cluster cluster : service.getClusters()) {
+ if(cluster.memberExists(memberId)) {
+ return cluster.getMember(memberId);
+ }
+ }
+ }
+ return null;
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ private void handleMemberFaultEvent(String clusterId, String memberId) {
+ try {
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractMonitor monitor;
+
+ if(asCtx.moniterExist(clusterId)){
+ monitor = asCtx.getMonitor(clusterId);
+ }else if(asCtx.lbMoniterExist(clusterId)){
+ monitor = asCtx.getLBMonitor(clusterId);
+ }else{
+ String errMsg = "A monitor is not found for this custer";
+ log.error(errMsg);
+ throw new RuntimeException(errMsg);
+ }
+
+ NetworkPartitionContext nwPartitionCtxt;
+ try{
+ TopologyManager.acquireReadLock();
+ Member member = findMember(memberId);
+
+ if(null == member){
+ return;
+ }
+ if(!member.isActive()){
+ if(log.isDebugEnabled()){
+ log.debug(String.format("Member activated event has not received for the member %s. Therefore ignoring" +
+ " the member fault health stat", memberId));
+ }
+ return;
+ }
+
+ nwPartitionCtxt = monitor.getNetworkPartitionCtxt(member);
+
+ }finally{
+ TopologyManager.releaseReadLock();
+ }
+ // terminate the faulty member
+ CloudControllerClient ccClient = CloudControllerClient.getInstance();
+ ccClient.terminate(memberId);
+
+ // start a new member in the same Partition
+ String partitionId = monitor.getPartitionOfMember(memberId);
+ Partition partition = monitor.getDeploymentPolicy().getPartitionById(partitionId);
+ PartitionContext partitionCtxt = nwPartitionCtxt.getPartitionCtxt(partitionId);
+
+ String lbClusterId = AutoscalerRuleEvaluator.getLbClusterId(partitionCtxt, nwPartitionCtxt);
+ ccClient.spawnAnInstance(partition, clusterId, lbClusterId, nwPartitionCtxt.getId());
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Instance spawned for fault member: [partition] %s [cluster] %s [lb cluster] %s ",
+ partitionId, clusterId, lbClusterId));
+ }
+
+ } catch (TerminationException e) {
+ log.error(e);
+ } catch (SpawningException e) {
+ log.error(e);
+ }
+ }
+
+ public void terminate(){
+ this.terminated = true;
+ }
+}