You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/10/30 10:01:05 UTC
git commit: Fixing autoscaler component merge issues
Repository: stratos
Updated Branches:
refs/heads/docker-grouping-merge 25340242f -> be8885862
Fixing autoscaler component merge issues
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/be888586
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/be888586
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/be888586
Branch: refs/heads/docker-grouping-merge
Commit: be8885862eb7c18230b7c1a066f06a35fee42c53
Parents: 2534024
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Oct 30 14:30:55 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Oct 30 14:30:55 2014 +0530
----------------------------------------------------------------------
.../org.apache.stratos.autoscaler/pom.xml | 1 -
.../internal/AutoscalerServerComponent.java | 22 +-
.../monitor/AbstractClusterMonitor.java | 46 ++-
.../monitor/ApplicationMonitorFactory.java | 127 +-------
.../KubernetesServiceClusterMonitor.java | 8 +-
.../autoscaler/monitor/VMLbClusterMonitor.java | 30 +-
.../monitor/VMServiceClusterMonitor.java | 40 +--
.../monitor/cluster/ClusterMonitor.java | 293 -------------------
.../monitor/cluster/LbClusterMonitor.java | 129 --------
.../status/checker/StatusChecker.java | 15 +-
10 files changed, 108 insertions(+), 603 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/pom.xml
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/pom.xml b/components/org.apache.stratos.autoscaler/pom.xml
index c188021..6fcd1f0 100644
--- a/components/org.apache.stratos.autoscaler/pom.xml
+++ b/components/org.apache.stratos.autoscaler/pom.xml
@@ -149,7 +149,6 @@
<groupId>org.apache.stratos</groupId>
<artifactId>org.apache.stratos.messaging</artifactId>
<version>${project.version}</version>
- <scope>provided</scope>
</dependency>
<dependency>
<groupId>org.apache.stratos</groupId>
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 5296635..4a8b269 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -53,9 +53,9 @@ import java.util.List;
public class AutoscalerServerComponent {
private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
- AutoscalerTopologyEventReceiver asTopologyReceiver;
-// TopicSubscriber healthStatTopicSubscriber;
- AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
+
+ private AutoscalerTopologyEventReceiver asTopologyReceiver;
+ private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
protected void activate(ComponentContext componentContext) throws Exception {
try {
@@ -66,21 +66,13 @@ public class AutoscalerServerComponent {
if (log.isDebugEnabled()) {
log.debug("Topology receiver thread started");
}
-// healthStatTopicSubscriber = new TopicSubscriber(Constants.HEALTH_STAT_TOPIC);
-// healthStatTopicSubscriber.setMessageListener(new HealthEventMessageReceiver());
-// Thread healthStatTopicSubscriberThread = new Thread(healthStatTopicSubscriber);
-// healthStatTopicSubscriberThread.start();
-// if (log.isDebugEnabled()) {
-// log.debug("Health event message receiver thread started");
-// }
-
// Start health stat receiver
autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
healthDelegatorThread.start();
if (log.isDebugEnabled()) {
- log.debug("Health message processor thread started");
+ log.debug("Health statistics receiver thread started");
}
// Adding the registry stored partitions to the information model
@@ -122,10 +114,10 @@ public class AutoscalerServerComponent {
}
if (log.isInfoEnabled()) {
- log.info("Autoscaler Server Component activated");
+ log.info("Autoscaler server Component activated");
}
} catch (Throwable e) {
- log.error("Error in activating the autoscaler component ", e);
+ log.error("Error in activating autoscaler component", e);
}
}
@@ -149,7 +141,7 @@ public class AutoscalerServerComponent {
protected void unsetRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
- log.debug("Unsetting the Registry Service");
+ log.debug("Un-setting the Registry Service");
}
ServiceReferenceHolder.getInstance().setRegistry(null);
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
index 972ddad..030bc53 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/AbstractClusterMonitor.java
@@ -23,9 +23,15 @@ import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
+import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
+import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.MonitorTerminateAllEvent;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
+import org.apache.stratos.messaging.domain.topology.GroupStatus;
import org.apache.stratos.messaging.event.health.stat.AverageLoadAverageEvent;
import org.apache.stratos.messaging.event.health.stat.AverageMemoryConsumptionEvent;
import org.apache.stratos.messaging.event.health.stat.AverageRequestsInFlightEvent;
@@ -54,11 +60,11 @@ import org.drools.runtime.rule.FactHandle;
/*
* Every cluster monitor, which are monitoring a cluster, should extend this class.
*/
-public abstract class AbstractClusterMonitor implements Runnable {
+public abstract class AbstractClusterMonitor extends Monitor implements Runnable {
private String clusterId;
private String serviceId;
- private ClusterStatus status;
+ protected ClusterStatus status;
private int monitoringIntervalMilliseconds;
protected FactHandle minCheckFactHandle;
@@ -68,6 +74,7 @@ public abstract class AbstractClusterMonitor implements Runnable {
private boolean isDestroyed;
private AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+ protected boolean hasFaultyMember = false;
private final ScheduledExecutorService scheduler = Executors.newScheduledThreadPool(1);
@@ -159,7 +166,7 @@ public abstract class AbstractClusterMonitor implements Runnable {
public abstract void handleMemberTerminatedEvent(MemberTerminatedEvent memberTerminatedEvent);
public abstract void handleClusterRemovedEvent(ClusterRemovedEvent clusterRemovedEvent);
-
+
public abstract void handleDynamicUpdates(Properties properties) throws InvalidArgumentException;
public String getClusterId() {
@@ -244,4 +251,37 @@ public abstract class AbstractClusterMonitor implements Runnable {
AutoscalerRuleEvaluator autoscalerRuleEvaluator) {
this.autoscalerRuleEvaluator = autoscalerRuleEvaluator;
}
+
+
+ @Override
+ public void onParentEvent(MonitorStatusEvent statusEvent) {
+ // send the ClusterTerminating event
+ if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
+ ApplicationStatus.Terminating) {
+ StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
+ }
+ }
+
+ @Override
+ public void onChildEvent(MonitorStatusEvent statusEvent) {
+
+ }
+
+ @Override
+ public void onEvent(MonitorTerminateAllEvent terminateAllEvent) {
+
+ }
+
+ @Override
+ public void onEvent(MonitorScalingEvent scalingEvent) {
+
+ }
+
+ public void setHasFaultyMember(boolean hasFaultyMember) {
+ this.hasFaultyMember = hasFaultyMember;
+ }
+
+ public boolean isHasFaultyMember() {
+ return hasFaultyMember;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
index 2ead896..9cf3709 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/ApplicationMonitorFactory.java
@@ -34,7 +34,7 @@ import org.apache.stratos.autoscaler.grouping.dependency.context.ApplicationCont
import org.apache.stratos.autoscaler.grouping.dependency.context.ClusterContext;
import org.apache.stratos.autoscaler.grouping.dependency.context.GroupContext;
import org.apache.stratos.autoscaler.monitor.application.ApplicationMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
import org.apache.stratos.autoscaler.monitor.group.GroupMonitor;
import org.apache.stratos.autoscaler.partition.PartitionGroup;
import org.apache.stratos.autoscaler.policy.PolicyManager;
@@ -172,7 +172,7 @@ public class ApplicationMonitorFactory {
* @throws org.apache.stratos.autoscaler.exception.PolicyValidationException
* @throws org.apache.stratos.autoscaler.exception.PartitionValidationException
*/
- public static ClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
+ public static VMClusterMonitor getClusterMonitor(ParentComponentMonitor parentMonitor,
ClusterContext context, String appId)
throws PolicyValidationException,
PartitionValidationException,
@@ -182,7 +182,7 @@ public class ApplicationMonitorFactory {
String serviceName = context.getServiceName();
Cluster cluster;
- ClusterMonitor clusterMonitor;
+ AbstractClusterMonitor clusterMonitor;
//acquire read lock for the service and cluster
TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
try {
@@ -207,126 +207,17 @@ public class ApplicationMonitorFactory {
}
- String autoscalePolicyName = cluster.getAutoscalePolicyName();
- String deploymentPolicyName = cluster.getDeploymentPolicyName();
- if (log.isDebugEnabled()) {
- log.debug("Deployment policy name: " + deploymentPolicyName);
- log.debug("Autoscaler policy name: " + autoscalePolicyName);
- }
-
- AutoscalePolicy policy =
- PolicyManager.getInstance()
- .getAutoscalePolicy(autoscalePolicyName);
- DeploymentPolicy deploymentPolicy =
- PolicyManager.getInstance()
- .getDeploymentPolicy(deploymentPolicyName);
-
- if (deploymentPolicy == null) {
- String msg = "Deployment Policy is null. Policy name: " + deploymentPolicyName;
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- Partition[] allPartitions = deploymentPolicy.getAllPartitions();
- if (allPartitions == null) {
- String msg =
- "Deployment Policy's Partitions are null. Policy name: " +
- deploymentPolicyName;
- log.error(msg);
- throw new PolicyValidationException(msg);
- }
-
- CloudControllerClient.getInstance().validateDeploymentPolicy(cluster.getServiceName(), deploymentPolicy);
-
- clusterMonitor = new ClusterMonitor(cluster.getClusterId(), cluster.getServiceName(),
- deploymentPolicy, policy);
-
- for (PartitionGroup partitionGroup : deploymentPolicy.getPartitionGroups()) {
-
- NetworkPartitionContext networkPartitionContext = new NetworkPartitionContext(partitionGroup.getId(),
- partitionGroup.getPartitionAlgo(), partitionGroup.getPartitions());
-
- for (Partition partition : partitionGroup.getPartitions()) {
- PartitionContext partitionContext = new PartitionContext(partition);
- partitionContext.setServiceName(cluster.getServiceName());
- partitionContext.setProperties(cluster.getProperties());
- partitionContext.setNetworkPartitionId(partitionGroup.getId());
-
- for (Member member : cluster.getMembers()) {
- String memberId = member.getMemberId();
- if (member.getPartitionId().equalsIgnoreCase(partition.getId())) {
- MemberContext memberContext = new MemberContext();
- memberContext.setClusterId(member.getClusterId());
- memberContext.setMemberId(memberId);
- memberContext.setPartition(partition);
- memberContext.setProperties(convertMemberPropsToMemberContextProps(member.getProperties()));
-
- if (MemberStatus.Activated.equals(member.getStatus())) {
- partitionContext.addActiveMember(memberContext);
- //triggering the status checker
-// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
-// partitionContext.incrementCurrentActiveMemberCount(1);
-
- } else if (MemberStatus.Created.equals(member.getStatus()) || MemberStatus.Starting.equals(member.getStatus())) {
- partitionContext.addPendingMember(memberContext);
-
-// networkPartitionContext.increaseMemberCountOfPartition(partition.getNetworkPartitionId(), 1);
- } else if (MemberStatus.Suspended.equals(member.getStatus())) {
-// partitionContext.addFaultyMember(memberId);
- }
- partitionContext.addMemberStatsContext(new MemberStatsContext(memberId));
- if (log.isInfoEnabled()) {
- log.info(String.format("Member stat context has been added: [member] %s", memberId));
- }
- }
-
- }
- networkPartitionContext.addPartitionContext(partitionContext);
- if (log.isInfoEnabled()) {
- log.info(String.format("Partition context has been added: [partition] %s",
- partitionContext.getPartitionId()));
- }
- }
-
- clusterMonitor.addNetworkPartitionCtxt(networkPartitionContext);
- clusterMonitor.setParent(parentMonitor);
- if(parentMonitor.isDependent() || (context.isDependent() && context.hasChild())) {
- clusterMonitor.setHasDependent(true);
- } else {
- clusterMonitor.setHasDependent(false);
- }
- AutoscalerContext.getInstance().addMonitor(clusterMonitor);
- if (log.isInfoEnabled()) {
- log.info(String.format("Network partition context has been added: [network partition] %s",
- networkPartitionContext.getId()));
- }
- }
- //TODO to make sure when group monitor is async
- //if cluster is not in created state then notify the parent monitor
- if (cluster.getStatus() != clusterMonitor.getStatus()) {
- //updating the status, so that it will notify the parent
- clusterMonitor.setStatus(cluster.getStatus());
- }
-
- if (!cluster.hasMembers()) {
- //triggering the status checker if cluster has members to decide
- // on the current status of the cluster
- StatusChecker.getInstance().onMemberStatusChange(clusterId);
+ clusterMonitor = ClusterMonitorFactory.getMonitor(cluster);
+ if (clusterMonitor instanceof VMClusterMonitor) {
+ return (VMClusterMonitor) clusterMonitor;
+ } else if (clusterMonitor != null) {
+ log.warn("Unknown cluster monitor found: " + clusterMonitor.getClass().toString());
}
+ return null;
} finally {
- //release read lock for the service and cluster
TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
}
-
- // set hasPrimary property
- // hasPrimary is true if there are primary members available in that cluster
- if (cluster.getProperties() != null) {
- clusterMonitor.setHasPrimary(Boolean.parseBoolean(cluster.getProperties().getProperty(Constants.IS_PRIMARY)));
- }
-
- log.info("Cluster monitor created: " + clusterMonitor.toString());
- return clusterMonitor;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
index 67850ba..15b14b6 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/KubernetesServiceClusterMonitor.java
@@ -64,19 +64,19 @@ public final class KubernetesServiceClusterMonitor extends KubernetesClusterMoni
public void run() {
if (log.isDebugEnabled()) {
- log.debug("KubernetesServiceClusterMonitor is running.. " + this.toString());
+ log.debug("KubernetesServiceClusterMonitor is running..." + this.toString());
}
try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
+ if (!ClusterStatus.Active.getNextStates().contains(getStatus())) {
monitor();
} else {
if (log.isDebugEnabled()) {
log.debug("KubernetesServiceClusterMonitor is suspended as the cluster is in "
- + ClusterStatus.In_Maintenance + " mode......");
+ + getStatus() + "state");
}
}
} catch (Exception e) {
- log.error("KubernetesServiceClusterMonitor : Monitor failed." + this.toString(),
+ log.error("KubernetesServiceClusterMonitor: Monitor failed." + this.toString(),
e);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
index 2ed78f1..3e6cddc 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMLbClusterMonitor.java
@@ -63,20 +63,26 @@ public class VMLbClusterMonitor extends VMClusterMonitor {
@Override
public void run() {
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("VMLbClusterMonitor is suspended as the cluster is in " +
- ClusterStatus.In_Maintenance + " mode......");
+ while (!isDestroyed()) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster monitor is running.. " + this.toString());
+ }
+ try {
+ if (!ClusterStatus.Inactive.equals(status)) {
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("LB Cluster monitor is suspended as the cluster is in " +
+ ClusterStatus.Inactive + " mode......");
+ }
}
+ } catch (Exception e) {
+ log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
+ }
+ try {
+ Thread.sleep(getMonitorIntervalMilliseconds());
+ } catch (InterruptedException ignore) {
}
- } catch (Exception e) {
- log.error("VMLbClusterMonitor : Monitor failed. " + this.toString(), e);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
index 9aec279..9a26b42 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/VMServiceClusterMonitor.java
@@ -63,28 +63,28 @@ public class VMServiceClusterMonitor extends VMClusterMonitor {
@Override
public void run() {
-
- try {
- // TODO make this configurable,
- // this is the delay the min check of normal cluster monitor to wait until LB monitor is added
- Thread.sleep(60000);
- } catch (InterruptedException ignore) {
- }
-
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.In_Maintenance.equals(getStatus())) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("VMServiceClusterMonitor is suspended as the cluster is in " +
- ClusterStatus.In_Maintenance + " mode......");
+ while (!isDestroyed()) {
+ try {
+ if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
+ (this.status == ClusterStatus.Inactive && !hasDependent) ||
+ !this.hasFaultyMember) {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster monitor is running.. " + this.toString());
+ }
+ monitor();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Cluster monitor is suspended as the cluster is in " +
+ ClusterStatus.Inactive + " mode......");
+ }
}
+ } catch (Exception e) {
+ log.error("Cluster monitor: Monitor failed." + this.toString(), e);
+ }
+ try {
+ Thread.sleep(getMonitorIntervalMilliseconds());
+ } catch (InterruptedException ignore) {
}
- } catch (Exception e) {
- log.error("VMServiceClusterMonitor : Monitor failed." + this.toString(), e);
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
deleted file mode 100644
index 6d7e8ca..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/ClusterMonitor.java
+++ /dev/null
@@ -1,293 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor.cluster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.client.cloud.controller.CloudControllerClient;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.exception.TerminationException;
-import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.cloud.controller.stub.pojo.MemberContext;
-import org.apache.stratos.cloud.controller.stub.pojo.Properties;
-import org.apache.stratos.cloud.controller.stub.pojo.Property;
-import org.apache.stratos.messaging.domain.topology.ApplicationStatus;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.GroupStatus;
-
-import java.util.ArrayList;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class ClusterMonitor extends AbstractClusterMonitor {
-
- private static final Log log = LogFactory.getLog(ClusterMonitor.class);
- private String lbReferenceType;
- private boolean hasPrimary;
-
-
- public ClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- this.clusterId = clusterId;
- this.id = clusterId;
- this.serviceId = serviceId;
-
- this.autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
- this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
- this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
- this.terminateAllKnowledgeSession = autoscalerRuleEvaluator.getTerminateAllStatefulSession();
-
- this.deploymentPolicy = deploymentPolicy;
- this.autoscalePolicy = autoscalePolicy;
- if (log.isDebugEnabled()) {
- log.debug("ClusterMonitor:autoScalePolicy:" + autoscalePolicy);
- }
- networkPartitionCtxts = new ConcurrentHashMap<String, NetworkPartitionContext>();
- //status = ClusterStatus.Created;
- }
-
-
- @Override
- public void run() {
- while (!isDestroyed()) {
- try {
- if ((this.status.getCode() <= ClusterStatus.Active.getCode()) ||
- (this.status == ClusterStatus.Inactive && !hasDependent) ||
- !this.hasFaultyMember) {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is running.. " + this.toString());
- }
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is suspended as the cluster is in " +
- ClusterStatus.Inactive + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("Cluster monitor: Monitor failed." + this.toString(), e);
- }
- try {
- Thread.sleep(monitorInterval);
- } catch (InterruptedException ignore) {
- }
- }
-
-
- }
-
- @Override
- public void terminateAllMembers() {
-
- Thread memberTerminator = new Thread(new Runnable(){
- public void run(){
-
- for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
- for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
- //if (log.isDebugEnabled()) {
- log.info("Starting to terminate all members in Network Partition [ " +
- networkPartitionContext.getId() + " ], Partition [ " +
- partitionContext.getPartitionId() + " ]");
- // }
- // need to terminate active, pending and obsolete members
-
- // active members
- for (MemberContext activeMemberCtxt : partitionContext.getActiveMembers()) {
- log.info("Terminating active member [member id] " + activeMemberCtxt.getMemberId());
- terminateMember(activeMemberCtxt.getMemberId());
- }
-
- // pending members
- for (MemberContext pendingMemberCtxt : partitionContext.getPendingMembers()) {
- log.info("Terminating pending member [member id] " + pendingMemberCtxt.getMemberId());
- terminateMember(pendingMemberCtxt.getMemberId());
- }
-
- // obsolete members
- for (String obsoleteMemberId : partitionContext.getObsoletedMembers()) {
- log.info("Terminating obsolete member [member id] " + obsoleteMemberId);
- terminateMember(obsoleteMemberId);
- }
-
-// terminateAllFactHandle = AutoscalerRuleEvaluator.evaluateTerminateAll
-// (terminateAllKnowledgeSession, terminateAllFactHandle, partitionContext);
- }
- }
- }
- }, "Member Terminator - [cluster id] " + this.clusterId);
-
- memberTerminator.start();
- }
-
- private static void terminateMember (String memberId) {
- try {
- CloudControllerClient.getInstance().terminate(memberId);
-
- } catch (TerminationException e) {
- log.error("Unable to terminate member [member id ] " + memberId, e);
- }
- }
-
- private boolean isPrimaryMember(MemberContext memberContext) {
- Properties props = memberContext.getProperties();
- if (log.isDebugEnabled()) {
- log.debug(" Properties [" + props + "] ");
- }
- if (props != null && props.getProperties() != null) {
- for (Property prop : props.getProperties()) {
- if (prop.getName().equals("PRIMARY")) {
- if (Boolean.parseBoolean(prop.getValue())) {
- log.debug("Adding member id [" + memberContext.getMemberId() + "] " +
- "member instance id [" + memberContext.getInstanceId() + "] as a primary member");
- return true;
- }
- }
- }
- }
- return false;
- }
-
- public void monitor() {
- //TODO make this concurrent
-
- for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
- // store primary members in the network partition context
- List<String> primaryMemberListInNetworkPartition = new ArrayList<String>();
- //minimum check per partition
- for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts().values()) {
- // store primary members in the partition context
- List<String> primaryMemberListInPartition = new ArrayList<String>();
- // get active primary members in this partition context
- for (MemberContext memberContext : partitionContext.getActiveMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInPartition.add(memberContext.getMemberId());
- }
- }
-
- // get pending primary members in this partition context
- for (MemberContext memberContext : partitionContext.getPendingMembers()) {
- if (isPrimaryMember(memberContext)) {
- primaryMemberListInPartition.add(memberContext.getMemberId());
- }
- }
- primaryMemberListInNetworkPartition.addAll(primaryMemberListInPartition);
- minCheckKnowledgeSession.setGlobal("clusterId", clusterId);
- minCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType);
- minCheckKnowledgeSession.setGlobal("isPrimary", hasPrimary);
-
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for partition %s ", partitionContext.getPartitionId()));
- }
-
- minCheckFactHandle = AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession
- , minCheckFactHandle, partitionContext);
-
- //checking the status of the cluster
-
-
- }
-
- /*boolean rifReset = networkPartitionContext.isRifReset();
- boolean memoryConsumptionReset = networkPartitionContext.isMemoryConsumptionReset();
- boolean loadAverageReset = networkPartitionContext.isLoadAverageReset();
-
- if (log.isDebugEnabled()) {
- log.debug("flag of rifReset: " + rifReset + " flag of memoryConsumptionReset" + memoryConsumptionReset
- + " flag of loadAverageReset" + loadAverageReset);
- }
- if (rifReset || memoryConsumptionReset || loadAverageReset) {
-
- scaleCheckKnowledgeSession.setGlobal("clusterId", clusterId);
- //scaleCheckKnowledgeSession.setGlobal("deploymentPolicy", deploymentPolicy);
- scaleCheckKnowledgeSession.setGlobal("autoscalePolicy", autoscalePolicy);
- scaleCheckKnowledgeSession.setGlobal("rifReset", rifReset);
- scaleCheckKnowledgeSession.setGlobal("mcReset", memoryConsumptionReset);
- scaleCheckKnowledgeSession.setGlobal("laReset", loadAverageReset);
- scaleCheckKnowledgeSession.setGlobal("lbRef", lbReferenceType);
- scaleCheckKnowledgeSession.setGlobal("isPrimary", false);
- scaleCheckKnowledgeSession.setGlobal("primaryMembers", primaryMemberListInNetworkPartition);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running scale check for network partition %s ", networkPartitionContext.getId()));
- log.debug(" Primary members : " + primaryMemberListInNetworkPartition);
- }
-
- scaleCheckFactHandle = AutoscalerRuleEvaluator.evaluateScaleCheck(scaleCheckKnowledgeSession
- , scaleCheckFactHandle, networkPartitionContext);
-
- networkPartitionContext.setRifReset(false);
- networkPartitionContext.setMemoryConsumptionReset(false);
- networkPartitionContext.setLoadAverageReset(false);
- } else if (log.isDebugEnabled()) {
- log.debug(String.format("Scale rule will not run since the LB statistics have not received before this " +
- "cycle for network partition %s", networkPartitionContext.getId()));
- }*/
- }
- }
-
- @Override
- public String toString() {
- return "ClusterMonitor [clusterId=" + clusterId + ", serviceId=" + serviceId +
- ", deploymentPolicy=" + deploymentPolicy + ", autoscalePolicy=" + autoscalePolicy +
- ", lbReferenceType=" + lbReferenceType +
- ", hasPrimary=" + hasPrimary + " ]";
- }
-
- public String getLbReferenceType() {
- return lbReferenceType;
- }
-
- public void setLbReferenceType(String lbReferenceType) {
- this.lbReferenceType = lbReferenceType;
- }
-
- public boolean isHasPrimary() {
- return hasPrimary;
- }
-
- public void setHasPrimary(boolean hasPrimary) {
- this.hasPrimary = hasPrimary;
- }
-
- @Override
- public void onChildEvent(MonitorStatusEvent statusEvent) {
-
- }
-
- @Override
- public void onParentEvent(MonitorStatusEvent statusEvent) {
- // send the ClusterTerminating event
- if (statusEvent.getStatus() == GroupStatus.Terminating || statusEvent.getStatus() ==
- ApplicationStatus.Terminating) {
- StatusEventPublisher.sendClusterTerminatingEvent(appId, serviceId, clusterId);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java
deleted file mode 100644
index 6697d73..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/LbClusterMonitor.java
+++ /dev/null
@@ -1,129 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.monitor.cluster;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.NetworkPartitionContext;
-import org.apache.stratos.autoscaler.PartitionContext;
-import org.apache.stratos.autoscaler.deployment.policy.DeploymentPolicy;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
-import org.apache.stratos.autoscaler.policy.model.AutoscalePolicy;
-import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Is responsible for monitoring a service cluster. This runs periodically
- * and perform minimum instance check and scaling check using the underlying
- * rules engine.
- */
-public class LbClusterMonitor extends AbstractClusterMonitor {
-
- private static final Log log = LogFactory.getLog(LbClusterMonitor.class);
-
- public LbClusterMonitor(String clusterId, String serviceId, DeploymentPolicy deploymentPolicy,
- AutoscalePolicy autoscalePolicy) {
- this.clusterId = clusterId;
- this.serviceId = serviceId;
-
- this.autoscalerRuleEvaluator = new AutoscalerRuleEvaluator();
- this.scaleCheckKnowledgeSession = autoscalerRuleEvaluator.getScaleCheckStatefulSession();
- this.minCheckKnowledgeSession = autoscalerRuleEvaluator.getMinCheckStatefulSession();
- this.terminateAllKnowledgeSession = autoscalerRuleEvaluator.getTerminateAllStatefulSession();
-
- this.deploymentPolicy = deploymentPolicy;
- this.deploymentPolicy = deploymentPolicy;
- networkPartitionCtxts = new ConcurrentHashMap<String, NetworkPartitionContext>();
- }
-
- @Override
- public void run() {
-
- while (!isDestroyed()) {
- if (log.isDebugEnabled()) {
- log.debug("Cluster monitor is running.. " + this.toString());
- }
- try {
- if (!ClusterStatus.Inactive.equals(status)) {
- monitor();
- } else {
- if (log.isDebugEnabled()) {
- log.debug("LB Cluster monitor is suspended as the cluster is in " +
- ClusterStatus.Inactive + " mode......");
- }
- }
- } catch (Exception e) {
- log.error("Cluster monitor: Monitor failed. " + this.toString(), e);
- }
- try {
- Thread.sleep(monitorInterval);
- } catch (InterruptedException ignore) {
- }
- }
- }
-
- @Override
- public void terminateAllMembers() {
- //To change body of implemented methods use File | Settings | File Templates.
- }
-
- private void monitor() {
- // TODO make this concurrent
- for (NetworkPartitionContext networkPartitionContext : networkPartitionCtxts.values()) {
-
- // minimum check per partition
- for (PartitionContext partitionContext : networkPartitionContext.getPartitionCtxts()
- .values()) {
-
- if (partitionContext != null) {
- minCheckKnowledgeSession.setGlobal("clusterId", clusterId);
- minCheckKnowledgeSession.setGlobal("isPrimary", false);
-
- if (log.isDebugEnabled()) {
- log.debug(String.format("Running minimum check for partition %s ",
- partitionContext.getPartitionId()));
- }
-
- minCheckFactHandle =
- AutoscalerRuleEvaluator.evaluateMinCheck(minCheckKnowledgeSession,
- minCheckFactHandle,
- partitionContext);
- // start only in the first partition context
- break;
- }
-
- }
-
- }
- }
-
- @Override
- public String toString() {
- return "LbClusterMonitor [clusterId=" + clusterId + ", serviceId=" + serviceId + "]";
- }
-
-
- @Override
- public void onParentEvent(MonitorStatusEvent statusEvent) {
-
- }
-}
http://git-wip-us.apache.org/repos/asf/stratos/blob/be888586/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
index 3a925d1..5f3b590 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/checker/StatusChecker.java
@@ -24,8 +24,7 @@ import org.apache.stratos.autoscaler.AutoscalerContext;
import org.apache.stratos.autoscaler.NetworkPartitionContext;
import org.apache.stratos.autoscaler.PartitionContext;
import org.apache.stratos.autoscaler.grouping.topic.StatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.AbstractClusterMonitor;
-import org.apache.stratos.autoscaler.monitor.cluster.ClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.VMClusterMonitor;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -57,7 +56,7 @@ public class StatusChecker {
public void onMemberStatusChange(final String clusterId) {
Runnable group = new Runnable() {
public void run() {
- ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId);
+ VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
boolean clusterActive = false;
if (monitor != null) {
clusterActive = clusterActive(monitor);
@@ -80,7 +79,7 @@ public class StatusChecker {
public void onMemberTermination(final String clusterId) {
Runnable group = new Runnable() {
public void run() {
- ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId);
+ VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
boolean clusterMonitorHasMembers = clusterMonitorHasMembers(monitor);
boolean clusterActive = clusterActive(monitor);
@@ -121,7 +120,7 @@ public class StatusChecker {
}
- private boolean clusterActive(AbstractClusterMonitor monitor) {
+ private boolean clusterActive(VMClusterMonitor monitor) {
boolean clusterActive = false;
for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
//minimum check per partition
@@ -140,7 +139,7 @@ public class StatusChecker {
return clusterActive;
}
- private boolean clusterMonitorHasMembers(AbstractClusterMonitor monitor) {
+ private boolean clusterMonitorHasMembers(VMClusterMonitor monitor) {
boolean hasMember = false;
for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
//minimum check per partition
@@ -161,7 +160,7 @@ public class StatusChecker {
public void onMemberFaultEvent(final String clusterId, final String partitionId) {
Runnable group = new Runnable() {
public void run() {
- ClusterMonitor monitor = (ClusterMonitor) AutoscalerContext.getInstance().getMonitor(clusterId);
+ VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
boolean clusterInActive = getClusterInActive(monitor, partitionId);
String appId = monitor.getAppId();
if (clusterInActive) {
@@ -185,7 +184,7 @@ public class StatusChecker {
groupThread.start();
}
- private boolean getClusterInActive(AbstractClusterMonitor monitor, String partitionId) {
+ private boolean getClusterInActive(VMClusterMonitor monitor, String partitionId) {
boolean clusterInActive = false;
for (NetworkPartitionContext networkPartitionContext : monitor.getNetworkPartitionCtxts().values()) {
for (PartitionContext partition : networkPartitionContext.getPartitionCtxts().values()) {