You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:39 UTC

[2/9] stratos git commit: merge with new changes

merge with new changes


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/9e6e91d6
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/9e6e91d6
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/9e6e91d6

Branch: refs/heads/master
Commit: 9e6e91d6280ca0e9d38f96dfeeaa9eb9d2f561cb
Parents: bfc263a
Author: gayan <ga...@puppet.gayan.org>
Authored: Tue Dec 2 16:33:45 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 16:33:45 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 260 ++++++++-----------
 1 file changed, 105 insertions(+), 155 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/9e6e91d6/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index d59fa7d..bfdf30b 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -21,19 +21,20 @@ package org.apache.stratos.autoscaler.event.receiver.topology;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.autoscaler.applications.ApplicationHolder;
 import org.apache.stratos.autoscaler.context.AutoscalerContext;
 import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
 import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
 import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
 import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
-import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
 import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
 import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
-import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException;
-import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
-import org.apache.stratos.autoscaler.monitor.MonitorFactory;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
+import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
+import org.apache.stratos.autoscaler.pojo.policy.PolicyManager;
+import org.apache.stratos.autoscaler.pojo.policy.deployment.DeploymentPolicy;
+import org.apache.stratos.autoscaler.util.AutoscalerUtil;
 import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
 import org.apache.stratos.messaging.domain.applications.Application;
 import org.apache.stratos.messaging.domain.applications.Applications;
@@ -71,7 +72,7 @@ public class AutoscalerTopologyEventReceiver{
     public void execute() {
         //FIXME this activated before autoscaler deployer activated.
 
-	    topologyEventReceiver.setExecutorService(executorService);
+	    topologyEventReceiver.setExecutorService(getExecutorService());
 	    topologyEventReceiver.execute();
 
 	    if (log.isInfoEnabled()) {
@@ -143,7 +144,17 @@ public class AutoscalerTopologyEventReceiver{
                         if (applications != null) {
                             for (Application application : applications.getApplications().values()) {
                                 if (allClustersInitialized(application)) {
-                                    startApplicationMonitor(application.getUniqueIdentifier());
+                                    DeploymentPolicy policy = PolicyManager.getInstance().
+                                            getDeploymentPolicyByApplication(
+                                                    application.getUniqueIdentifier());
+                                    if (policy != null) {
+                                        AutoscalerUtil.getInstance().
+                                                startApplicationMonitor(application.getUniqueIdentifier());
+                                    } else {
+                                        log.info("The relevant application policy is not yet " +
+                                                "deployed for this [application] " +
+                                                application.getUniqueIdentifier());
+                                    }
                                 } else {
                                     log.error("Complete Topology is not consistent with the applications " +
                                             "which got persisted");
@@ -175,7 +186,7 @@ public class AutoscalerTopologyEventReceiver{
                         //acquire read lock
                         ApplicationHolder.acquireReadLock();
                         //start the application monitor
-                        startApplicationMonitor(appId);
+                        //startApplicationMonitor(appId);
                     } catch (Exception e) {
                         String msg = "Error processing event " + e.getLocalizedMessage();
                         log.error(msg, e);
@@ -204,7 +215,7 @@ public class AutoscalerTopologyEventReceiver{
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                + "[cluster] %s", clusterId));
+                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
@@ -225,7 +236,7 @@ public class AutoscalerTopologyEventReceiver{
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                + "[cluster] %s", clusterId));
+                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
@@ -255,7 +266,7 @@ public class AutoscalerTopologyEventReceiver{
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                + "[cluster] %s", clusterId));
+                                + "[cluster] %s", clusterId));
                     }
                     return;
                 }
@@ -277,7 +288,7 @@ public class AutoscalerTopologyEventReceiver{
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                + "[cluster] %s", clusterId));
+                                + "[cluster] %s", clusterId));
                     }
                     // if monitor does not exist, send cluster terminated event
                     ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
@@ -286,12 +297,12 @@ public class AutoscalerTopologyEventReceiver{
                 }
                 //changing the status in the monitor, will notify its parent monitor
                 if (monitor.getStatus() == ClusterStatus.Active) {
-                	// terminated gracefully
-                	monitor.setStatus(ClusterStatus.Terminating);
-                	InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+                    // terminated gracefully
+                    monitor.setStatus(ClusterStatus.Terminating);
+                    InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
                 } else {
-                	monitor.setStatus(ClusterStatus.Terminating);
-                	monitor.terminateAllMembers();
+                    monitor.setStatus(ClusterStatus.Terminating);
+                    monitor.terminateAllMembers();
                 }
                 ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
                         process("", clusterId, instanceId);
@@ -310,11 +321,11 @@ public class AutoscalerTopologyEventReceiver{
                 if (null == monitor) {
                     if (log.isDebugEnabled()) {
                         log.debug(String.format("A cluster monitor is not found in autoscaler context "
-                                                + "[cluster] %s", clusterId));
+                                + "[cluster] %s", clusterId));
                     }
                     // if the cluster monitor is null, assume that its termianted
                     ApplicationMonitor appMonitor = AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
-                    if (appMonitor != null)  {
+                    if (appMonitor != null) {
                         appMonitor.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null));
                     }
                     return;
@@ -355,7 +366,7 @@ public class AutoscalerTopologyEventReceiver{
         topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
             @Override
             protected void onEvent(Event event) {
-            	
+
             }
         });
 
@@ -432,153 +443,92 @@ public class AutoscalerTopologyEventReceiver{
         });
 
         topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-
-                ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
-                        (ClusterInstanceCreatedEvent) event;
-                AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
-                        getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
-
-                if (clusterMonitor != null) {
-                    TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-                            clusterInstanceCreatedEvent.getClusterId());
-
-                    try {
-                        Service service = TopologyManager.getTopology().
-                                getService(clusterInstanceCreatedEvent.getServiceName());
-
-                        if (service != null) {
-                            Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
-                            if (cluster != null) {
-                                // create and add Cluster Context
-                                try {
-                                    if (cluster.isKubernetesCluster()) {
-                                        clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
-                                                ClusterContextFactory.getKubernetesClusterContext(cluster));
-                                    } else if (cluster.isLbCluster()) {
-                                        clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
-                                                ClusterContextFactory.getVMLBClusterContext(cluster));
-                                    } else {
-                                        clusterMonitor.addClusterContextForInstance(clusterInstanceCreatedEvent.getInstanceId(),
-                                                ClusterContextFactory.getVMServiceClusterContext(cluster));
-                                    }
+           @Override
+           protected void onEvent(Event event) {
+
+               ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+                       (ClusterInstanceCreatedEvent) event;
+               AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
+                       getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+               String instanceId = ((ClusterInstanceCreatedEvent) event).getInstanceId();
+               //FIXME to take lock when clusterMonitor is running
+               if (clusterMonitor != null) {
+                   TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+                           clusterInstanceCreatedEvent.getClusterId());
+
+                   try {
+                       Service service = TopologyManager.getTopology().
+                               getService(clusterInstanceCreatedEvent.getServiceName());
+
+                       if (service != null) {
+                           Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+                           if (cluster != null) {
+                               try {
+                                   if (cluster.isKubernetesCluster()) {
+                                       clusterMonitor.setClusterContext(
+                                               ClusterContextFactory.getKubernetesClusterContext(
+                                                       instanceId,
+                                                       cluster));
+                                   } else {
+                                       VMClusterContext clusterContext =
+                                               (VMClusterContext) clusterMonitor.getClusterContext();
+                                       if (clusterContext == null) {
+                                           clusterMonitor.setClusterContext(
+                                                   ClusterContextFactory.
+                                                           getVMClusterContext(instanceId,
+                                                                   cluster));
+                                       }
+                                       clusterContext.addInstanceContext(instanceId, cluster);
+
+
+                                   }
+                                   if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+                                       clusterMonitor.startScheduler();
+                                       log.info("Monitoring task for Cluster Monitor with cluster id " +
+                                               clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+                                   }
+                               } catch (PolicyValidationException e) {
+                                   log.error(e.getMessage(), e);
+                               } catch (PartitionValidationException e) {
+                                   log.error(e.getMessage(), e);
+                               }
+                           }
 
-                                    if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
-                                        clusterMonitor.startScheduler();
-                                        log.info("Monitoring task for Cluster Monitor with cluster id " +
-                                                clusterInstanceCreatedEvent.getClusterId() + " started successfully");
-                                    }
-
-                                } catch (PolicyValidationException e) {
-                                    log.error(e.getMessage(), e);
-                                } catch (PartitionValidationException e) {
-                                    log.error(e.getMessage(), e);
-                                }
-
-                            } else {
-                                log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() +
-                                        ", no cluster instance added to ClusterMonitor " +
-                                        clusterInstanceCreatedEvent.getClusterId());
-                            }
                         } else {
-                            log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
-                                    " not found, no cluster instance added to ClusterMonitor " +
-                            clusterInstanceCreatedEvent.getClusterId());
-                        }
-
-                    } finally {
-                        TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-                                clusterInstanceCreatedEvent.getClusterId());
-                    }
-
-                } else {
-                    log.error("No Cluster Monitor found for cluster id " +
-                            clusterInstanceCreatedEvent.getClusterId());
-                }
-            }
-        });
+                           log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
+                                   " not found, no cluster instance added to ClusterMonitor " +
+                                   clusterInstanceCreatedEvent.getClusterId());
+                       }
+
+                   } finally {
+                       TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+                               clusterInstanceCreatedEvent.getClusterId());
+                   }
+
+               } else {
+                   log.error("No Cluster Monitor found for cluster id " +
+                             clusterInstanceCreatedEvent.getClusterId());
+               }
+           }
+       }
+
+        );
     }
 
     /**
      * Terminate load balancer topology receiver thread.
      */
+
     public void terminate() {
         topologyEventReceiver.terminate();
         terminated = true;
     }
 
-    protected synchronized void startApplicationMonitor(String applicationId) {
-        Thread th = null;
-        if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
-            th = new Thread(new ApplicationMonitorAdder(applicationId));
-        }
-        if (th != null) {
-            th.start();
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String
-                        .format("Application monitor thread already exists: " +
-                                "[application] %s ", applicationId));
-            }
-        }
-    }
-
-    private class ApplicationMonitorAdder implements Runnable {
-        private String appId;
-
-        public ApplicationMonitorAdder(String appId) {
-            this.appId = appId;
-        }
-
-        public void run() {
-            ApplicationMonitor applicationMonitor = null;
-            int retries = 5;
-            boolean success = false;
-            do {
-                try {
-                    Thread.sleep(5000);
-                } catch (InterruptedException e1) {
-                }
-                try {
-                    long start = System.currentTimeMillis();
-                    if (log.isDebugEnabled()) {
-                        log.debug("application monitor is going to be started for [application] " +
-                                appId);
-                    }
-                    try {
-                        applicationMonitor = MonitorFactory.getApplicationMonitor(appId);
-                    } catch (PolicyValidationException e) {
-                        String msg = "Application monitor creation failed for Application: ";
-                        log.warn(msg, e);
-                        retries--;
-                    }
-                    long end = System.currentTimeMillis();
-                    log.info("Time taken to start app monitor: " + (end - start) / 1000);
-                    success = true;
-                } catch (DependencyBuilderException e) {
-                    String msg = "Application monitor creation failed for Application: ";
-                    log.warn(msg, e);
-                    retries--;
-                } catch (TopologyInConsistentException e) {
-                    String msg = "Application monitor creation failed for Application: ";
-                    log.warn(msg, e);
-                    retries--;
-                }
-            } while (!success && retries != 0);
-
-            if (applicationMonitor == null) {
-                String msg = "Application monitor creation failed, even after retrying for 5 times, "
-                        + "for Application: " + appId;
-                log.error(msg);
-                throw new RuntimeException(msg);
-            }
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
 
-            AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
-            if (log.isInfoEnabled()) {
-                log.info(String.format("Application monitor has been added successfully: " +
-                        "[application] %s", applicationMonitor.getId()));
-            }
-        }
-    }
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
 }