You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by re...@apache.org on 2014/12/04 16:40:44 UTC
stratos git commit: adding instanceID to cluster events
Repository: stratos
Updated Branches:
refs/heads/master e40d36d13 -> 39162a58d
adding instanceID to cluster events
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/39162a58
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/39162a58
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/39162a58
Branch: refs/heads/master
Commit: 39162a58d450ef9f2d61664061a50fdae923be71
Parents: e40d36d
Author: reka <rt...@gmail.com>
Authored: Thu Dec 4 21:10:17 2014 +0530
Committer: reka <rt...@gmail.com>
Committed: Thu Dec 4 21:10:31 2014 +0530
----------------------------------------------------------------------
.../context/cluster/ClusterInstanceContext.java | 4 +-
.../publisher/ClusterStatusEventPublisher.java | 24 +-
.../AutoscalerTopologyEventReceiver.java | 337 ++++++------
.../monitor/cluster/AbstractClusterMonitor.java | 16 +-
.../cluster/ClusterStatusActiveProcessor.java | 16 +-
.../cluster/ClusterStatusInActiveProcessor.java | 2 +-
.../ClusterStatusTerminatedProcessor.java | 12 +-
.../stratos/autoscaler/util/StatusChecker.java | 519 -------------------
8 files changed, 202 insertions(+), 728 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
index ab4bec3..ade8a4e 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/context/cluster/ClusterInstanceContext.java
@@ -440,8 +440,8 @@ public class ClusterInstanceContext extends InstanceContext {
public int getActiveMembers() {
int activeMembers = 0;
- for(ClusterLevelPartitionContext partitionContext : this.partitionCtxts) {
- activeMembers += partitionContext.getActiveInstanceCount();
+ for (ClusterLevelPartitionContext partitionContext : this.partitionCtxts) {
+ activeMembers += partitionContext.getActiveInstanceCount();
}
return activeMembers;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java
index c3bc678..5ce090d 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/publisher/ClusterStatusEventPublisher.java
@@ -22,6 +22,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
+import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
import org.apache.stratos.messaging.domain.topology.Service;
@@ -110,14 +111,16 @@ public class ClusterStatusEventPublisher {
}
- public static void sendClusterActivatedEvent(String appId, String serviceName, String clusterId) {
+ public static void sendClusterActivatedEvent(String appId, String serviceName, String clusterId,
+ String instanceId) {
TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
try {
Service service = TopologyManager.getTopology().getService(serviceName);
if (service != null) {
Cluster cluster = service.getCluster(clusterId);
- if (cluster.isStateTransitionValid(ClusterStatus.Active, null) &&
- cluster.getStatus(null) != ClusterStatus.Active) {
+ ClusterInstance clusterInstance = cluster.getInstanceContexts(instanceId);
+ if (clusterInstance.isStateTransitionValid(ClusterStatus.Active) &&
+ clusterInstance.getStatus() != ClusterStatus.Active) {
ClusterStatusClusterActivatedEvent clusterActivatedEvent =
new ClusterStatusClusterActivatedEvent(appId, serviceName, clusterId, "test***");
@@ -138,8 +141,9 @@ public class ClusterStatusEventPublisher {
Service service = TopologyManager.getTopology().getService(serviceName);
if (service != null) {
Cluster cluster = service.getCluster(clusterId);
- if (cluster.isStateTransitionValid(ClusterStatus.Inactive, null) &&
- cluster.getStatus(null) != ClusterStatus.Inactive) {
+ ClusterInstance clusterInstance = cluster.getInstanceContexts(instanceId);
+ if (clusterInstance.isStateTransitionValid(ClusterStatus.Active) &&
+ clusterInstance.getStatus() != ClusterStatus.Active) {
ClusterStatusClusterInactivateEvent clusterInActivateEvent =
new ClusterStatusClusterInactivateEvent(appId, serviceName, clusterId, instanceId);
@@ -162,8 +166,9 @@ public class ClusterStatusEventPublisher {
Service service = TopologyManager.getTopology().getService(serviceName);
if (service != null) {
Cluster cluster = service.getCluster(clusterId);
- if (cluster.isStateTransitionValid(ClusterStatus.Terminating, null) &&
- cluster.getStatus(null) != ClusterStatus.Terminating) {
+ ClusterInstance clusterInstance = cluster.getInstanceContexts(instanceId);
+ if (clusterInstance.isStateTransitionValid(ClusterStatus.Active) &&
+ clusterInstance.getStatus() != ClusterStatus.Active) {
ClusterStatusClusterTerminatingEvent appStatusClusterTerminatingEvent =
new ClusterStatusClusterTerminatingEvent(appId, serviceName, clusterId, instanceId);
@@ -186,8 +191,9 @@ public class ClusterStatusEventPublisher {
Service service = TopologyManager.getTopology().getService(serviceName);
if (service != null) {
Cluster cluster = service.getCluster(clusterId);
- if (cluster.isStateTransitionValid(ClusterStatus.Terminated, null) &&
- cluster.getStatus(null) != ClusterStatus.Terminated) {
+ ClusterInstance clusterInstance = cluster.getInstanceContexts(instanceId);
+ if (clusterInstance.isStateTransitionValid(ClusterStatus.Active) &&
+ clusterInstance.getStatus() != ClusterStatus.Active) {
ClusterStatusClusterTerminatedEvent appStatusClusterTerminatedEvent =
new ClusterStatusClusterTerminatedEvent(appId, serviceName, clusterId, instanceId);
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 3d5aa7f..729ae02 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -23,7 +23,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.applications.ApplicationHolder;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
-import org.apache.stratos.autoscaler.context.cluster.AbstractClusterContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
@@ -42,12 +41,10 @@ import org.apache.stratos.autoscaler.util.AutoscalerUtil;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.Applications;
-import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.Topology;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.topology.*;
import org.apache.stratos.messaging.listener.topology.*;
@@ -61,29 +58,29 @@ import java.util.concurrent.ExecutorService;
*/
public class AutoscalerTopologyEventReceiver {
- private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
+ private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
- private TopologyEventReceiver topologyEventReceiver;
- private boolean terminated;
- private boolean topologyInitialized;
- private ExecutorService executorService;
+ private TopologyEventReceiver topologyEventReceiver;
+ private boolean terminated;
+ private boolean topologyInitialized;
+ private ExecutorService executorService;
- public AutoscalerTopologyEventReceiver() {
- this.topologyEventReceiver = new TopologyEventReceiver();
- addEventListeners();
- }
+ public AutoscalerTopologyEventReceiver() {
+ this.topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+ }
- public void execute() {
- //FIXME this activated before autoscaler deployer activated.
+ public void execute() {
+ //FIXME this activated before autoscaler deployer activated.
- topologyEventReceiver.setExecutorService(getExecutorService());
- topologyEventReceiver.execute();
+ topologyEventReceiver.setExecutorService(getExecutorService());
+ topologyEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Autoscaler topology receiver thread started");
- }
+ if (log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread started");
+ }
- }
+ }
private void addEventListeners() {
// Listen to topology events that affect clusters
@@ -142,8 +139,8 @@ public class AutoscalerTopologyEventReceiver {
//start the application monitor if the policy exists
DeploymentPolicy policy = PolicyManager.getInstance().
getDeploymentPolicyByApplication(appId);
- if(policy != null && !AutoscalerContext.getInstance().
- containsPendingMonitor(appId)) {
+ if (policy != null && !AutoscalerContext.getInstance().
+ containsPendingMonitor(appId)) {
AutoscalerUtil.getInstance().startApplicationMonitor(appId);
}
} catch (Exception e) {
@@ -292,14 +289,14 @@ public class AutoscalerTopologyEventReceiver {
if (appMonitor != null) {
appMonitor.onChildStatusEvent(
new ClusterStatusEvent(ClusterStatus.Terminated,
- clusterId, instanceId));
+ clusterId, instanceId));
}
return;
}
//changing the status in the monitor, will notify its parent monitor
monitor.setStatus(ClusterStatus.Terminated, instanceId);
monitor.removeInstance(instanceId);
- if(!monitor.hasInstance() && appMonitor.isTerminating()) {
+ if (!monitor.hasInstance() && appMonitor.isTerminating()) {
//Destroying and Removing the Cluster monitor
monitor.destroy();
AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
@@ -413,87 +410,87 @@ public class AutoscalerTopologyEventReceiver {
});
topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- (ClusterInstanceCreatedEvent) event;
- AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
- getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
- String instanceId = ((ClusterInstanceCreatedEvent) event).getInstanceId();
- //FIXME to take lock when clusterMonitor is running
- if (clusterMonitor != null) {
- TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
-
- try {
- Service service = TopologyManager.getTopology().
- getService(clusterInstanceCreatedEvent.getServiceName());
-
- if (service != null) {
- Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
- if (cluster != null) {
- try {
- if (cluster.isKubernetesCluster()) {
- clusterMonitor.setClusterContext(
- ClusterContextFactory.getKubernetesClusterContext(
- instanceId,
- cluster));
- } else {
- VMClusterContext clusterContext =
- (VMClusterContext) clusterMonitor.getClusterContext();
- if (clusterContext == null) {
- clusterContext = ClusterContextFactory.
- getVMClusterContext(instanceId,
- cluster);
- clusterMonitor.setClusterContext(clusterContext);
-
- }
- clusterContext.addInstanceContext(instanceId, cluster);
- if(clusterMonitor.getInstance(instanceId) == null) {
- ClusterInstance clusterInstance = cluster.
- getInstanceContexts(instanceId);
- ClusterInstance instance = new ClusterInstance(clusterInstance.getAlias(),
- cluster.getClusterId(),
- clusterInstance.getInstanceId());
- instance.setParentId(clusterInstance.getParentId());
- instance.setNetworkPartitionId(clusterInstance.getNetworkPartitionId());
- instance.setPartitionId(clusterInstance.getPartitionId());
- instance.setStatus(clusterInstance.getStatus());
- clusterMonitor.addInstance(instance);
- }
-
-
- }
- if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
- clusterMonitor.startScheduler();
- log.info("Monitoring task for Cluster Monitor with cluster id " +
- clusterInstanceCreatedEvent.getClusterId() + " started successfully");
- }
- } catch (PolicyValidationException e) {
- log.error(e.getMessage(), e);
- } catch (PartitionValidationException e) {
- log.error(e.getMessage(), e);
- }
- }
-
- } else {
- log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
- " not found, no cluster instance added to ClusterMonitor " +
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } finally {
- TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } else {
- log.error("No Cluster Monitor found for cluster id " +
- clusterInstanceCreatedEvent.getClusterId());
- }
- }
- }
+ @Override
+ protected void onEvent(Event event) {
+
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+ (ClusterInstanceCreatedEvent) event;
+ AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
+ getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+ String instanceId = ((ClusterInstanceCreatedEvent) event).getInstanceId();
+ //FIXME to take lock when clusterMonitor is running
+ if (clusterMonitor != null) {
+ TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+
+ try {
+ Service service = TopologyManager.getTopology().
+ getService(clusterInstanceCreatedEvent.getServiceName());
+
+ if (service != null) {
+ Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+ if (cluster != null) {
+ try {
+ if (cluster.isKubernetesCluster()) {
+ clusterMonitor.setClusterContext(
+ ClusterContextFactory.getKubernetesClusterContext(
+ instanceId,
+ cluster));
+ } else {
+ VMClusterContext clusterContext =
+ (VMClusterContext) clusterMonitor.getClusterContext();
+ if (clusterContext == null) {
+ clusterContext = ClusterContextFactory.
+ getVMClusterContext(instanceId,
+ cluster);
+ clusterMonitor.setClusterContext(clusterContext);
+
+ }
+ clusterContext.addInstanceContext(instanceId, cluster);
+ if (clusterMonitor.getInstance(instanceId) == null) {
+ ClusterInstance clusterInstance = cluster.
+ getInstanceContexts(instanceId);
+ ClusterInstance instance = new ClusterInstance(clusterInstance.getAlias(),
+ cluster.getClusterId(),
+ clusterInstance.getInstanceId());
+ instance.setParentId(clusterInstance.getParentId());
+ instance.setNetworkPartitionId(clusterInstance.getNetworkPartitionId());
+ instance.setPartitionId(clusterInstance.getPartitionId());
+ instance.setStatus(clusterInstance.getStatus());
+ clusterMonitor.addInstance(instance);
+ }
+
+
+ }
+ if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+ clusterMonitor.startScheduler();
+ log.info("Monitoring task for Cluster Monitor with cluster id " +
+ clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+ }
+ } catch (PolicyValidationException e) {
+ log.error(e.getMessage(), e);
+ } catch (PartitionValidationException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
+
+ } else {
+ log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
+ " not found, no cluster instance added to ClusterMonitor " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } finally {
+ TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } else {
+ log.error("No Cluster Monitor found for cluster id " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+ }
+ }
);
}
@@ -507,70 +504,70 @@ public class AutoscalerTopologyEventReceiver {
terminated = true;
}
- public ExecutorService getExecutorService() {
- return executorService;
- }
-
- public void setExecutorService(ExecutorService executorService) {
- this.executorService = executorService;
- }
-
- private class ApplicationMonitorAdder implements Runnable {
- private String appId;
-
- public ApplicationMonitorAdder(String appId) {
- this.appId = appId;
- }
-
- public void run() {
- ApplicationMonitor applicationMonitor = null;
- int retries = 5;
- boolean success = false;
- do {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- }
- try {
- long start = System.currentTimeMillis();
- if (log.isDebugEnabled()) {
- log.debug("application monitor is going to be started for [application] " +
- appId);
- }
- try {
- applicationMonitor = MonitorFactory.getApplicationMonitor(appId);
- } catch (PolicyValidationException e) {
- String msg = "Application monitor creation failed for Application: ";
- log.warn(msg, e);
- retries--;
- }
- long end = System.currentTimeMillis();
- log.info("Time taken to start app monitor: " + (end - start) / 1000);
- success = true;
- } catch (DependencyBuilderException e) {
- String msg = "Application monitor creation failed for Application: ";
- log.warn(msg, e);
- retries--;
- } catch (TopologyInConsistentException e) {
- String msg = "Application monitor creation failed for Application: ";
- log.warn(msg, e);
- retries--;
- }
- } while (!success && retries != 0);
-
- if (applicationMonitor == null) {
- String msg = "Application monitor creation failed, even after retrying for 5 times, "
- + "for Application: " + appId;
- log.error(msg);
- throw new RuntimeException(msg);
- }
-
- AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
- if (log.isInfoEnabled()) {
- log.info(String.format("Application monitor has been added successfully: " +
- "[application] %s", applicationMonitor.getId()));
- }
- }
- }
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ private class ApplicationMonitorAdder implements Runnable {
+ private String appId;
+
+ public ApplicationMonitorAdder(String appId) {
+ this.appId = appId;
+ }
+
+ public void run() {
+ ApplicationMonitor applicationMonitor = null;
+ int retries = 5;
+ boolean success = false;
+ do {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("application monitor is going to be started for [application] " +
+ appId);
+ }
+ try {
+ applicationMonitor = MonitorFactory.getApplicationMonitor(appId);
+ } catch (PolicyValidationException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ long end = System.currentTimeMillis();
+ log.info("Time taken to start app monitor: " + (end - start) / 1000);
+ success = true;
+ } catch (DependencyBuilderException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ } catch (TopologyInConsistentException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ } while (!success && retries != 0);
+
+ if (applicationMonitor == null) {
+ String msg = "Application monitor creation failed, even after retrying for 5 times, "
+ + "for Application: " + appId;
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+ AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Application monitor has been added successfully: " +
+ "[application] %s", applicationMonitor.getId()));
+ }
+ }
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
index 77fdbc9..4c7498b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/cluster/AbstractClusterMonitor.java
@@ -20,30 +20,26 @@ package org.apache.stratos.autoscaler.monitor.cluster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.context.cluster.AbstractClusterContext;
import org.apache.stratos.autoscaler.applications.ApplicationHolder;
+import org.apache.stratos.autoscaler.context.cluster.AbstractClusterContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import org.apache.stratos.autoscaler.exception.InvalidArgumentException;
import org.apache.stratos.autoscaler.monitor.Monitor;
-import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
import org.apache.stratos.autoscaler.monitor.events.MonitorScalingEvent;
import org.apache.stratos.autoscaler.monitor.events.MonitorStatusEvent;
+import org.apache.stratos.autoscaler.monitor.events.builder.MonitorStatusEventBuilder;
import org.apache.stratos.autoscaler.rule.AutoscalerRuleEvaluator;
import org.apache.stratos.common.Properties;
-import org.apache.stratos.common.constants.StratosConstants;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.ApplicationStatus;
import org.apache.stratos.messaging.domain.applications.Group;
import org.apache.stratos.messaging.domain.applications.GroupStatus;
-import org.apache.stratos.messaging.domain.instance.ClusterInstance;
import org.apache.stratos.messaging.domain.topology.ClusterStatus;
import org.apache.stratos.messaging.event.health.stat.*;
import org.apache.stratos.messaging.event.topology.*;
import org.drools.runtime.StatefulKnowledgeSession;
import org.drools.runtime.rule.FactHandle;
-import java.util.HashMap;
-import java.util.Map;
import java.util.concurrent.Executors;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -62,20 +58,18 @@ public abstract class AbstractClusterMonitor extends Monitor implements Runnable
protected FactHandle dependentScaleCheckFactHandle;
protected boolean hasFaultyMember = false;
protected boolean stop = false;
- private AtomicBoolean monitoringStarted;
protected AbstractClusterContext clusterContext;
-
protected StatefulKnowledgeSession minCheckKnowledgeSession;
protected StatefulKnowledgeSession obsoleteCheckKnowledgeSession;
protected StatefulKnowledgeSession scaleCheckKnowledgeSession;
protected StatefulKnowledgeSession dependentScaleCheckKnowledgeSession;
-
+ protected AutoscalerRuleEvaluator autoscalerRuleEvaluator;
+ protected String serviceType;
+ private AtomicBoolean monitoringStarted;
private String clusterId;
private ClusterStatus status;
private int monitoringIntervalMilliseconds;
private boolean isDestroyed;
- protected AutoscalerRuleEvaluator autoscalerRuleEvaluator;
- protected String serviceType;
protected AbstractClusterMonitor(String serviceType, String clusterId) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
index 8e9d99c..b80cbdf 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusActiveProcessor.java
@@ -22,8 +22,6 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
-import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
-import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
@@ -57,21 +55,21 @@ public class ClusterStatusActiveProcessor extends ClusterStatusProcessor {
} else {
throw new RuntimeException(String.format("Failed to process message using " +
"available message processors: [type] %s [cluster] %s [instance]",
- type, clusterId, instanceId));
+ type, clusterId, instanceId));
}
}
- return false;
+ return false;
}
private boolean doProcess(String clusterId, String instanceId) {
VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().
- getClusterMonitor(clusterId);
+ getClusterMonitor(clusterId);
boolean clusterActive = false;
for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getNetworkPartitionCtxts()) {
//minimum check per partition
ClusterInstanceContext instanceContext = clusterLevelNetworkPartitionContext.getClusterInstanceContext(instanceId);
- if(instanceContext != null) {
- if(instanceContext.getActiveMembers() >= instanceContext.getMaxInstanceCount()) {
+ if (instanceContext != null) {
+ if (instanceContext.getActiveMembers() >= instanceContext.getMaxInstanceCount()) {
clusterActive = true;
} else {
clusterActive = false;
@@ -79,13 +77,13 @@ public class ClusterStatusActiveProcessor extends ClusterStatusProcessor {
}
}
- if(clusterActive) {
+ if (clusterActive) {
if (log.isInfoEnabled()) {
log.info("Publishing Cluster activated event for [application]: "
+ monitor.getAppId() + " [cluster]: " + clusterId);
}
ClusterStatusEventPublisher.sendClusterActivatedEvent(monitor.getAppId(),
- monitor.getServiceId(), monitor.getClusterId());
+ monitor.getServiceId(), monitor.getClusterId(), instanceId);
}
return clusterActive;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInActiveProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInActiveProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInActiveProcessor.java
index 490f914..83cb2cf 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInActiveProcessor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusInActiveProcessor.java
@@ -85,7 +85,7 @@ public class ClusterStatusInActiveProcessor extends ClusterStatusProcessor {
+ monitor.getAppId() + " [cluster]: " + clusterId);
}
ClusterStatusEventPublisher.sendClusterActivatedEvent(monitor.getAppId(),
- monitor.getServiceId(), clusterId);
+ monitor.getServiceId(), clusterId, instanceId);
}
return clusterInActive;
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
index 52b5f36..0a4fdd2 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/status/processor/cluster/ClusterStatusTerminatedProcessor.java
@@ -20,17 +20,15 @@ package org.apache.stratos.autoscaler.status.processor.cluster;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterInstanceContext;
import org.apache.stratos.autoscaler.context.partition.ClusterLevelPartitionContext;
import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
-import org.apache.stratos.autoscaler.applications.ApplicationHolder;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
import org.apache.stratos.autoscaler.status.processor.StatusProcessor;
-import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -123,11 +121,11 @@ public class ClusterStatusTerminatedProcessor extends ClusterStatusProcessor {
private boolean clusterInstanceHasMembers(VMClusterMonitor monitor, String instanceId) {
boolean hasMember = false;
for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext :
- monitor.getAllNetworkPartitionCtxts().values()) {
+ monitor.getAllNetworkPartitionCtxts().values()) {
//minimum check per partition
- if(clusterLevelNetworkPartitionContext.containsClusterInstanceContext(instanceId)) {
- ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext.
- getClusterInstanceContext(instanceId);
+ if (clusterLevelNetworkPartitionContext.containsClusterInstanceContext(instanceId)) {
+ ClusterInstanceContext clusterInstanceContext = clusterLevelNetworkPartitionContext.
+ getClusterInstanceContext(instanceId);
for (ClusterLevelPartitionContext partitionContext :
clusterInstanceContext.getPartitionCtxts()) {
if (partitionContext.getNonTerminatedMemberCount() > 0) {
http://git-wip-us.apache.org/repos/asf/stratos/blob/39162a58/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java
deleted file mode 100644
index d0a993e..0000000
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/StatusChecker.java
+++ /dev/null
@@ -1,519 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.stratos.autoscaler.util;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.autoscaler.context.AutoscalerContext;
-import org.apache.stratos.autoscaler.context.partition.network.ClusterLevelNetworkPartitionContext;
-import org.apache.stratos.autoscaler.applications.ApplicationHolder;
-import org.apache.stratos.autoscaler.applications.topic.ApplicationBuilder;
-import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
-import org.apache.stratos.autoscaler.monitor.cluster.VMClusterMonitor;
-import org.apache.stratos.messaging.domain.applications.*;
-import org.apache.stratos.messaging.domain.instance.ClusterInstance;
-import org.apache.stratos.messaging.domain.instance.GroupInstance;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.stratos.messaging.domain.topology.ClusterStatus;
-import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
-
-import java.util.Map;
-
-
-/**
- * This will be used to evaluate the status of a group
- * and notify the interested parties about the status changes.
- */
-public class StatusChecker {
- private static final Log log = LogFactory.getLog(StatusChecker.class);
-
-
- private StatusChecker() {
-
- }
-
- public static StatusChecker getInstance() {
- return Holder.INSTANCE;
- }
-
- /**
- * Calculating whether the cluster has all min instances as active and send the
- * ClusterActivatedEvent.
- *
- * @param clusterId id of the cluster
- */
- public void onMemberStatusChange(final String clusterId) {
- Runnable group = new Runnable() {
- public void run() {
- VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- boolean clusterActive = false;
- if (monitor != null) {
- clusterActive = clusterActive(monitor);
-
- }
- log.info("Status processor running for [cluster] " + clusterId +
- " the status [clusterActive] " + clusterActive);
- // if active then notify upper layer
- if (clusterActive) {
- //send event to cluster status topic
- monitor.setHasFaultyMember(false);
- if (log.isInfoEnabled()) {
- log.info("Publishing Cluster activated event for [application]: "
- + monitor.getAppId() + " [cluster]: " + clusterId);
- }
- ClusterStatusEventPublisher.sendClusterActivatedEvent(monitor.getAppId(),
- monitor.getServiceId(), monitor.getClusterId());
- }
- }
- };
- Thread groupThread = new Thread(group);
- groupThread.start();
- }
-
- /**
- * This will calculate the status of the cluster upon a member termination.
- * The possible states which cluster can change upon member termination are
- * Active --> InActive, Terminating-->Terminated, Terminating-->Reset(Created)
- *
- * @param clusterId id of the cluster
- */
- public void onMemberTermination(final String clusterId, final String instanceId) {
- Runnable group = new Runnable() {
- public void run() {
- VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- boolean clusterMonitorHasMembers = clusterMonitorHasMembers(monitor);
- boolean clusterActive = clusterActive(monitor);
-
- try {
- TopologyManager.acquireReadLockForCluster(monitor.getServiceId(), monitor.getClusterId());
- Service service = TopologyManager.getTopology().getService(monitor.getServiceId());
- Cluster cluster;
- String appId = monitor.getAppId();
- if (service != null) {
- cluster = service.getCluster(monitor.getClusterId());
- if (cluster != null) {
- try {
- ApplicationHolder.acquireReadLock();
- Application application = ApplicationHolder.getApplications().getApplication(appId);
- //if all members removed from the cluster and cluster is in terminating,
- // either it has to be terminated or Reset
- if (!clusterMonitorHasMembers && cluster.getStatus(null) == ClusterStatus.Terminating) {
- if (application.getStatus(null) == ApplicationStatus.Terminating) {
- if (log.isInfoEnabled()) {
- log.info("Publishing Cluster terminated event for [application]: " + appId +
- " [cluster]: " + clusterId);
- }
- ClusterStatusEventPublisher.sendClusterTerminatedEvent(appId, monitor.getServiceId(),
- monitor.getClusterId(), instanceId);
- } else {
- if (log.isInfoEnabled()) {
- log.info("Publishing Cluster created event for [application]: " + appId +
- " [cluster]: " + clusterId);
- }
- ClusterStatusEventPublisher.sendClusterResetEvent(appId, monitor.getServiceId(),
- monitor.getClusterId(), instanceId);
- }
-
- } else {
- //if the cluster is not active and, if it is in Active state
- if (!clusterActive && cluster.getStatus(null) == ClusterStatus.Active) {
- if (log.isInfoEnabled()) {
- log.info("Publishing Cluster in-activate event for [application]: "
- + monitor.getAppId() + " [cluster]: " + clusterId);
- }
- ClusterStatusEventPublisher.sendClusterInActivateEvent(monitor.getAppId(),
- monitor.getServiceId(), clusterId, instanceId);
- } else {
- log.info("Cluster has non terminated [members] and in the [status] "
- + cluster.getStatus(null).toString());
- }
- }
- } finally {
- ApplicationHolder.releaseReadLock();
- }
- }
- }
-
-
- } finally {
- TopologyManager.releaseReadLockForCluster(monitor.getServiceId(), monitor.getClusterId());
-
- }
- }
- };
- Thread groupThread = new Thread(group);
- groupThread.start();
-
- }
-
- /**
- * Calculate whether the cluster is active based on the minimum count available in each partition
- *
- * @param monitor Cluster monitor which has the member
- * @return whether cluster is active or not
- */
- private boolean clusterActive(VMClusterMonitor monitor) {
- boolean clusterActive = false;
- /*for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) {
- //minimum check per partition
- for (ClusterLevelPartitionContext clusterMonitorPartitionContext : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) {
- if (clusterMonitorPartitionContext.getMinimumMemberCount() == clusterMonitorPartitionContext.getActiveMemberCount()) {
- clusterActive = true;
- } else if (clusterMonitorPartitionContext.getActiveMemberCount() > clusterMonitorPartitionContext.getMinimumMemberCount()) {
- log.info("cluster already activated...");
- clusterActive = true;
- } else {
- return false;
- }
- }
- }*/
- return clusterActive;
- }
-
- /**
- * Find out whether cluster monitor has any non terminated members
- *
- * @param monitor the cluster monitor
- * @return whether has members or not
- */
- private boolean clusterMonitorHasMembers(VMClusterMonitor monitor) {
- boolean hasMember = false;
- for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) {
- //minimum check per partition
- /*for (ClusterLevelPartitionContext partitionContext : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) {
- if (partitionContext.getNonTerminatedMemberCount() > 0) {
- hasMember = true;
- } else {
- hasMember = false;
- }
- }*/
- }
- return hasMember;
- }
-
- /**
- * This will calculate the status of the cluster upon a member fault event
- *
- * @param clusterId id of the cluster
- * @param partitionId is to decide in which partition has less members while others have active members
- */
- public void onMemberFaultEvent(final String clusterId, final String partitionId, final String instanceId) {
- Runnable group = new Runnable() {
- public void run() {
- VMClusterMonitor monitor = (VMClusterMonitor) AutoscalerContext.getInstance().getClusterMonitor(clusterId);
- boolean clusterInActive = getClusterInactive(monitor, partitionId);
- String appId = monitor.getAppId();
- if (clusterInActive) {
- //if the monitor is dependent, temporarily pausing it
- if (monitor.hasStartupDependents()) {
- monitor.setHasFaultyMember(true);
- }
- if (log.isInfoEnabled()) {
- log.info("Publishing Cluster in-activate event for [application]: "
- + monitor.getAppId() + " [cluster]: " + clusterId);
- }
- //send cluster In-Active event to cluster status topic
- ClusterStatusEventPublisher.sendClusterInActivateEvent(appId,
- monitor.getServiceId(), clusterId, instanceId);
-
- } else {
- boolean clusterActive = clusterActive(monitor);
- if (clusterActive) {
- if (log.isInfoEnabled()) {
- log.info("Publishing Cluster active event for [application]: "
- + monitor.getAppId() + " [cluster]: " + clusterId);
- }
- ClusterStatusEventPublisher.sendClusterActivatedEvent(appId, monitor.getServiceId(), clusterId);
- }
- }
-
- }
- };
- Thread groupThread = new Thread(group);
- groupThread.start();
- }
-
- /**
- * This will calculate whether all the minimum of partition in a cluster satisfy in order
- * to decide on the cluster status.
- *
- * @param monitor Cluster monitor of which the status needs to be calculated
- * @param partitionId partition which got the faulty member
- * @return whether cluster inActive or not
- */
- private boolean getClusterInactive(VMClusterMonitor monitor, String partitionId) {
- boolean clusterInActive = false;
- for (ClusterLevelNetworkPartitionContext clusterLevelNetworkPartitionContext : monitor.getAllNetworkPartitionCtxts().values()) {
- /*for (ClusterLevelPartitionContext partition : clusterLevelNetworkPartitionContext.getPartitionCtxts().values()) {
- if (partitionId.equals(partition.getPartitionId()) &&
- partition.getActiveMemberCount() <= partition.getMinimumMemberCount()) {
- clusterInActive = true;
- return clusterInActive;
- }
- }*/
-
- }
- return clusterInActive;
- }
-
- /**
- * This will use to calculate whether all children of a particular component is active by traversing Top
- *
- * @param appId application id
- * @param idOfComponent id of the component to which calculate the status
- * @param idOfChild children of the component as groups
- */
- public void onChildStatusChange(String idOfChild, String idOfComponent, String appId, String instanceId) {
- ParentComponent component;
- Map<String, Group> groups;
- Map<String, ClusterDataHolder> clusterData;
-
- if (log.isInfoEnabled()) {
- log.info("StatusChecker calculating the status for the group [ " + idOfChild + " ]");
- }
-
- try {
- ApplicationHolder.acquireWriteLock();
- if (idOfComponent.equals(appId)) {
- //it is an application
- component = ApplicationHolder.getApplications().
- getApplication(appId);
- } else {
- //it is a group
- component = ApplicationHolder.getApplications().
- getApplication(appId).getGroupRecursively(idOfComponent);
- }
- groups = component.getAliasToGroupMap();
- clusterData = component.getClusterDataMap();
-
- if(component.isGroupScalingEnabled()) {
- //TODO
- handleStateWithGroupScalingEnabled();
- } else {
- handleStateChangeGroupScalingDisabled(component, appId, instanceId, groups, clusterData);
- }
- } finally {
- ApplicationHolder.releaseWriteLock();
-
- }
-
- }
-
- private void handleStateWithGroupScalingEnabled() {
-
- }
-
- private void handleStateChangeGroupScalingDisabled(ParentComponent component, String appId,
- String instanceId,
- Map<String, Group> groups,
- Map<String, ClusterDataHolder> clusterData) {
- if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Active, instanceId) ||
- clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Active, instanceId) ||
- getAllClusterInSameState(clusterData, ClusterStatus.Active, instanceId) &&
- getAllGroupInSameState(groups, GroupStatus.Active, instanceId)) {
- //send activation event
- if (component instanceof Application) {
- //send application activated event
- if (((Application) component).getStatus(null) != ApplicationStatus.Active) {
- log.info("sending app activate: " + appId);
- ApplicationBuilder.handleApplicationActivatedEvent(appId, instanceId);
- }
- } else if (component instanceof Group) {
- //send activation to the parent
- if (((Group) component).getStatus(null) != GroupStatus.Active) {
- log.info("sending group activate: " + component.getUniqueIdentifier());
- ApplicationBuilder.handleGroupActivatedEvent(appId, component.getUniqueIdentifier(), instanceId);
- }
- }
- } else if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Terminated, instanceId) ||
- clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Terminated, instanceId) ||
- getAllClusterInSameState(clusterData, ClusterStatus.Terminated, instanceId) &&
- getAllGroupInSameState(groups, GroupStatus.Terminated, instanceId)) {
- //send the terminated event
- if (component instanceof Application) {
- log.info("sending app terminated: " + appId);
- ApplicationBuilder.handleApplicationTerminatedEvent(appId, null);
- } else if (component instanceof Group) {
- //send activation to the parent
- if (((Group) component).getStatus(null) != GroupStatus.Terminated) {
- log.info("sending group terminated : " + component.getUniqueIdentifier());
- ApplicationBuilder.handleGroupTerminatedEvent(appId, component.getUniqueIdentifier(), instanceId);
- }
- }
- } else if (groups.isEmpty() && getAllClusterInSameState(clusterData, ClusterStatus.Created, instanceId) ||
- clusterData.isEmpty() && getAllGroupInSameState(groups, GroupStatus.Created, instanceId) ||
- getAllClusterInSameState(clusterData, ClusterStatus.Created, instanceId) &&
- getAllGroupInSameState(groups, GroupStatus.Created, instanceId)) {
- if (component instanceof Application) {
- log.info("[Application] " + appId + "couldn't change to Created, since it is" +
- "already in " + ((Application) component).getStatus(null).toString());
- } else if (component instanceof Group) {
- //send activation to the parent
- if (((Group) component).getStatus(null) != GroupStatus.Created) {
- log.info("sending group created : " + component.getUniqueIdentifier());
- ApplicationBuilder.handleGroupCreatedEvent(appId, component.getUniqueIdentifier(), instanceId);
- }
- }
- } else if (groups.isEmpty() && getAllClusterInactive(clusterData) ||
- clusterData.isEmpty() && getAllGroupInActive(groups) ||
- getAllClusterInactive(clusterData) || getAllGroupInActive(groups)) {
- //send the in activation event
- if (component instanceof Application) {
- //send application activated event
- log.warn("Application can't be in in-active : " + appId);
- //StatusEventPublisher.sendApplicationInactivatedEvent(appId);
- } else if (component instanceof Group) {
- //send activation to the parent
- if (((Group) component).getStatus(null) != GroupStatus.Inactive) {
- log.info("sending group in-active: " + component.getUniqueIdentifier());
- ApplicationBuilder.handleGroupInActivateEvent(appId, component.getUniqueIdentifier(), instanceId);
- }
- }
- } else {
- if (component instanceof Application) {
- //send application activated event
- log.warn("Application can't be in in-active : " + appId);
- //StatusEventPublisher.sendApplicationInactivatedEvent(appId);
- } else if (component instanceof Group) {
- //send activation to the parent
- if (((Group) component).getStatus(null) != GroupStatus.Inactive) {
- log.info("sending group in-active: " + component.getUniqueIdentifier());
- ApplicationBuilder.handleGroupInActivateEvent(appId, component.getUniqueIdentifier(), "test*****");
- }
- }
- }
- }
-
- private boolean getAllInstancesOfGroupActive(Group group) {
- int activeGroupInstances = 0;
- for(GroupInstance context : group.getInstanceIdToInstanceContextMap().values()) {
- if(context.getStatus() == GroupStatus.Active) {
- activeGroupInstances++;
- }
- }
-
- return false;
- }
-
- /**
- * Find out whether all the any group is inActive
- *
- * @param groups groups of a group/application
- * @return whether inActive or not
- */
- private boolean getAllGroupInActive(Map<String, Group> groups) {
- boolean groupStat = false;
- for (Group group : groups.values()) {
- if (group.getStatus(null) == GroupStatus.Inactive) {
- groupStat = true;
- return groupStat;
- } else {
- groupStat = false;
- }
- }
- return groupStat;
- }
-
- /**
- * Find out whether all the groups of a group in the same state or not
- *
- * @param groups groups of a group/application
- * @param status the state to check in all groups
- * @return whether groups in the given state or not
- */
- private boolean getAllGroupInSameState(Map<String, Group> groups, GroupStatus status, String instanceId) {
- boolean groupStat = false;
- for (Group group : groups.values()) {
- GroupInstance context = group.getInstanceContexts(instanceId);
- if(context != null) {
- if(context.getStatus() == status) {
- groupStat = true;
- } else {
- groupStat = false;
- return groupStat;
- }
- } else {
- groupStat = false;
- return groupStat;
- }
- }
- return groupStat;
- }
-
-
- /**
- * Find out whether any of the clusters of a group in the InActive state
- *
- * @param clusterData clusters of the group
- * @return whether inActive or not
- */
- private boolean getAllClusterInactive(Map<String, ClusterDataHolder> clusterData) {
- boolean clusterStat = false;
- for (Map.Entry<String, ClusterDataHolder> clusterDataHolderEntry : clusterData.entrySet()) {
- Service service = TopologyManager.getTopology().getService(clusterDataHolderEntry.getValue().getServiceType());
- Cluster cluster = service.getCluster(clusterDataHolderEntry.getValue().getClusterId());
- if (cluster.getStatus(null) == ClusterStatus.Inactive) {
- clusterStat = true;
- return clusterStat;
- } else {
- clusterStat = false;
-
- }
- }
- return clusterStat;
- }
-
- /**
- * Find out whether all the clusters of a group are in the same state
- *
- * @param clusterData clusters of the group
- * @param status the status to check of the group
- * @return whether all groups in the same state or not
- */
- private boolean getAllClusterInSameState(Map<String, ClusterDataHolder> clusterData,
- ClusterStatus status, String instanceId) {
- boolean clusterStat = false;
- for (Map.Entry<String, ClusterDataHolder> clusterDataHolderEntry : clusterData.entrySet()) {
- String serviceName = clusterDataHolderEntry.getValue().getServiceType();
- String clusterId = clusterDataHolderEntry.getValue().getClusterId();
- TopologyManager.acquireReadLockForCluster(serviceName, clusterId);
- try {
- Service service = TopologyManager.getTopology().getService(serviceName);
- Cluster cluster = service.getCluster(clusterId);
- ClusterInstance context = cluster.getInstanceContexts(instanceId);
- if (context.getStatus() == status) {
- clusterStat = true;
- } else {
- clusterStat = false;
- return clusterStat;
- }
- } finally {
- TopologyManager.releaseReadLockForCluster(serviceName, clusterId);
- }
-
- }
- return clusterStat;
- }
-
- private static class Holder {
- private static final StatusChecker INSTANCE = new StatusChecker();
- }
-
-}