You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:39 UTC
[2/9] stratos git commit: merge with new changes
merge with new changes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/9e6e91d6
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/9e6e91d6
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/9e6e91d6
Branch: refs/heads/master
Commit: 9e6e91d6280ca0e9d38f96dfeeaa9eb9d2f561cb
Parents: bfc263a
Author: gayan <ga...@puppet.gayan.org>
Authored: Tue Dec 2 16:33:45 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 16:33:45 2014 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 260 ++++++++-----------
1 file changed, 105 insertions(+), 155 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/9e6e91d6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index d59fa7d..bfdf30b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -21,19 +21,20 @@ package org.apache.stratos.autoscaler.event.receiver.topology;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
import org.apache.stratos.autoscaler.context.AutoscalerContext;
import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
-import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException;
-import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
-import org.apache.stratos.autoscaler.monitor.MonitorFactory;
import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
+import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
+import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.Applications;
@@ -71,7 +72,7 @@ public class AutoscalerTopologyEventReceiver{
public void execute() {
//FIXME this activated before autoscaler deployer activated.
- topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.setExecutorService(getExecutorService());
topologyEventReceiver.execute();
if (log.isInfoEnabled()) {
@@ -143,7 +144,17 @@ public class AutoscalerTopologyEventReceiver{
if (applications != null) {
for (Application application : applications.getApplications().values()) {
if (allClustersInitialized(application)) {
- startApplicationMonitor(application.getUniqueIdentifier());
+ DeploymentPolicy policy = PolicyManager.getInstance().
+ getDeploymentPolicyByApplication(
+ application.getUniqueIdentifier());
+ if (policy != null) {
+ AutoscalerUtil.getInstance().
+ startApplicationMonitor(application.getUniqueIdentifier());
+ } else {
+ log.info("The relevant application policy is not yet " +
+ "deployed for this [application] " +
+ application.getUniqueIdentifier());
+ }
} else {
log.error("Complete Topology is not consistent with the applications " +
"which got persisted");
@@ -175,7 +186,7 @@ public class AutoscalerTopologyEventReceiver{
//acquire read lock
ApplicationHolder.acquireReadLock();
//start the application monitor
- startApplicationMonitor(appId);
+ //startApplicationMonitor(appId);
} catch (Exception e) {
String msg = "Error processing event " + e.getLocalizedMessage();
log.error(msg, e);
@@ -204,7 +215,7 @@ public class AutoscalerTopologyEventReceiver{
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
@@ -225,7 +236,7 @@ public class AutoscalerTopologyEventReceiver{
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
@@ -255,7 +266,7 @@ public class AutoscalerTopologyEventReceiver{
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
return;
}
@@ -277,7 +288,7 @@ public class AutoscalerTopologyEventReceiver{
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
// if monitor does not exist, send cluster terminated event
ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
@@ -286,12 +297,12 @@ public class AutoscalerTopologyEventReceiver{
}
//changing the status in the monitor, will notify its parent monitor
if (monitor.getStatus() == ClusterStatus.Active) {
- // terminated gracefully
- monitor.setStatus(ClusterStatus.Terminating);
- InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+ // terminated gracefully
+ monitor.setStatus(ClusterStatus.Terminating);
+ InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
} else {
- monitor.setStatus(ClusterStatus.Terminating);
- monitor.terminateAllMembers();
+ monitor.setStatus(ClusterStatus.Terminating);
+ monitor.terminateAllMembers();
}
ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
process("", clusterId, instanceId);
@@ -310,11 +321,11 @@ public class AutoscalerTopologyEventReceiver{
if (null == monitor) {
if (log.isDebugEnabled()) {
log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
+ + "[cluster] %s", clusterId));
}
// if the cluster monitor is null, assume that its termianted
ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
- if (appMonitor != null) {
+ if (appMonitor != null) {
appMonitor.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null));
}
return;
@@ -355,7 +366,7 @@ public class AutoscalerTopologyEventReceiver{
topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
@Override
protected void onEvent(Event event) {
-
+
}
});
@@ -432,153 +443,92 @@ public class AutoscalerTopologyEventReceiver{
});
topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- (ClusterInstanceCreatedEvent) event;
- AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
- getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
-
- if (clusterMonitor != null) {
- TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
-
- try {
- Service service = TopologyManager.getTopology().
- getService(clusterInstanceCreatedEvent.getServiceName());
-
- if (service != null) {
- Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
- if (cluster != null) {
- // create and add Cluster Context
- try {
- if (cluster.isKubernetesCluster()) {
- clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
- ClusterContextFactory.getKubernetesClusterContext(cluster));
- } else if (cluster.isLbCluster()) {
- clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
- ClusterContextFactory.getVMLBClusterContext(cluster));
- } else {
- clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
- ClusterContextFactory.getVMServiceClusterContext(cluster));
- }
+ @Override
+ protected void onEvent(Event event) {
+
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+ (ClusterInstanceCreatedEvent) event;
+ AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
+ getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+ String instanceId = ((ClusterInstanceCreatedEvent) event).getInstanceId();
+ //FIXME to take lock when clusterMonitor is running
+ if (clusterMonitor != null) {
+ TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+
+ try {
+ Service service = TopologyManager.getTopology().
+ getService(clusterInstanceCreatedEvent.getServiceName());
+
+ if (service != null) {
+ Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+ if (cluster != null) {
+ try {
+ if (cluster.isKubernetesCluster()) {
+ clusterMonitor.setClusterContext(
+ ClusterContextFactory.getKubernetesClusterContext(
+ instanceId,
+ cluster));
+ } else {
+ VMClusterContext clusterContext =
+ (VMClusterContext) clusterMonitor.getClusterContext();
+ if (clusterContext == null) {
+ clusterMonitor.setClusterContext(
+ ClusterContextFactory.
+ getVMClusterContext(instanceId,
+ cluster));
+ }
+ clusterContext.addInstanceContext(instanceId, cluster);
+
+
+ }
+ if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+ clusterMonitor.startScheduler();
+ log.info("Monitoring task for Cluster Monitor with cluster id " +
+ clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+ }
+ } catch (PolicyValidationException e) {
+ log.error(e.getMessage(), e);
+ } catch (PartitionValidationException e) {
+ log.error(e.getMessage(), e);
+ }
+ }
- if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
- clusterMonitor.startScheduler();
- log.info("Monitoring task for Cluster Monitor with cluster id " +
- clusterInstanceCreatedEvent.getClusterId() + " started successfully");
- }
-
- } catch (PolicyValidationException e) {
- log.error(e.getMessage(), e);
- } catch (PartitionValidationException e) {
- log.error(e.getMessage(), e);
- }
-
- } else {
- log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() +
- ", no cluster instance added to ClusterMonitor " +
- clusterInstanceCreatedEvent.getClusterId());
- }
} else {
- log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
- " not found, no cluster instance added to ClusterMonitor " +
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } finally {
- TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } else {
- log.error("No Cluster Monitor found for cluster id " +
- clusterInstanceCreatedEvent.getClusterId());
- }
- }
- });
+ log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
+ " not found, no cluster instance added to ClusterMonitor " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } finally {
+ TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } else {
+ log.error("No Cluster Monitor found for cluster id " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+ }
+ }
+
+ );
}
/**
* Terminate load balancer topology receiver thread.
*/
+
public void terminate() {
topologyEventReceiver.terminate();
terminated = true;
}
- protected synchronized void startApplicationMonitor(String applicationId) {
- Thread th = null;
- if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
- th = new Thread(new ApplicationMonitorAdder(applicationId));
- }
- if (th != null) {
- th.start();
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Application monitor thread already exists: " +
- "[application] %s ", applicationId));
- }
- }
- }
-
- private class ApplicationMonitorAdder implements Runnable {
- private String appId;
-
- public ApplicationMonitorAdder(String appId) {
- this.appId = appId;
- }
-
- public void run() {
- ApplicationMonitor applicationMonitor = null;
- int retries = 5;
- boolean success = false;
- do {
- try {
- Thread.sleep(5000);
- } catch (InterruptedException e1) {
- }
- try {
- long start = System.currentTimeMillis();
- if (log.isDebugEnabled()) {
- log.debug("application monitor is going to be started for [application] " +
- appId);
- }
- try {
- applicationMonitor = MonitorFactory.getApplicationMonitor(appId);
- } catch (PolicyValidationException e) {
- String msg = "Application monitor creation failed for Application: ";
- log.warn(msg, e);
- retries--;
- }
- long end = System.currentTimeMillis();
- log.info("Time taken to start app monitor: " + (end - start) / 1000);
- success = true;
- } catch (DependencyBuilderException e) {
- String msg = "Application monitor creation failed for Application: ";
- log.warn(msg, e);
- retries--;
- } catch (TopologyInConsistentException e) {
- String msg = "Application monitor creation failed for Application: ";
- log.warn(msg, e);
- retries--;
- }
- } while (!success && retries != 0);
-
- if (applicationMonitor == null) {
- String msg = "Application monitor creation failed, even after retrying for 5 times, "
- + "for Application: " + appId;
- log.error(msg);
- throw new RuntimeException(msg);
- }
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
- AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
- if (log.isInfoEnabled()) {
- log.info(String.format("Application monitor has been added successfully: " +
- "[application] %s", applicationMonitor.getId()));
- }
- }
- }
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
}