You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/13 13:42:32 UTC

[2/5] stratos git commit: update the threads with executor service

update the threads with executor service


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d9cdf102
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d9cdf102
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d9cdf102

Branch: refs/heads/4.1.0-test
Commit: d9cdf1025992cc7e323f0d57c723811f2a2f9381
Parents: 8dc3b56
Author: gayan <ga...@puppet.gayan.org>
Authored: Fri Dec 12 17:06:13 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sat Dec 13 18:02:57 2014 +0530

----------------------------------------------------------------------
 .../component/ParentComponentMonitor.java       | 192 ++++++++++---------
 .../stratos/autoscaler/util/AutoscalerUtil.java |  47 +++--
 2 files changed, 130 insertions(+), 109 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/d9cdf102/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
index f694e23..f314c27 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
@@ -54,17 +54,19 @@ import java.util.ArrayList;
 import java.util.Collection;
 import java.util.List;
 import java.util.Map;
-import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
 
 /**
  * Monitor is to monitor it's child monitors and
  * control them according to the dependencies respectively.
  */
 public abstract class ParentComponentMonitor extends Monitor {
-    private static final Log log = LogFactory.getLog(ParentComponentMonitor.class);
+	private static final Log log = LogFactory.getLog(ParentComponentMonitor.class);
+	public static final String IDENTIFIER = "Auto-Scaler";
+	public static final int THREAD_POOL_SIZE = 10;
 
-    //The monitors dependency tree with all the start-able/kill-able dependencies
+	//The monitors dependency tree with all the start-able/kill-able dependencies
     protected DependencyTree startupDependencyTree;
     //The monitors dependency tree with all the scaling dependencies
     protected Set<String> scalingDependencies;
@@ -76,6 +78,8 @@ public abstract class ParentComponentMonitor extends Monitor {
     protected Map<String, List<String>> inactiveInstancesMap;
     //terminating map, key=alias, value instanceIds
     protected Map<String, List<String>> terminatingInstancesMap;
+	//Executor service to maintain the thread pool
+	private ExecutorService executorService;
 
     public ParentComponentMonitor(ParentComponent component) throws DependencyBuilderException {
         aliasToActiveMonitorsMap = new ConcurrentHashMap<String, Monitor>();
@@ -86,7 +90,10 @@ public abstract class ParentComponentMonitor extends Monitor {
         this.id = component.getUniqueIdentifier();
         //Building the startup dependencies for this monitor within the immediate children
         startupDependencyTree = DependencyBuilder.getInstance().buildDependency(component);
-        scalingDependencies  =  DependencyBuilder.getInstance().buildScalingDependencies(component);
+        //Building the scaling dependencies for this monitor within the immediate children
+        scalingDependencyTree = DependencyBuilder.getInstance().buildDependency(component);
+		//Create the executor service with identifier and thread pool size
+	    executorService= StratosThreadPool.getExecutorService(IDENTIFIER, THREAD_POOL_SIZE);
     }
 
     /**
@@ -483,8 +490,8 @@ public abstract class ParentComponentMonitor extends Monitor {
                     TopologyManager.acquireReadLockForCluster(monitor1.getServiceId(),
                             monitor1.getClusterId());
                     try {
-                        if (((ClusterInstance) monitor1.getInstance(instanceId)).getStatus()
-                                == ClusterStatus.Active) {
+                        if (((ClusterInstance)monitor1.getInstance(instanceId)).getStatus()
+                                                    == ClusterStatus.Active) {
                             parentsActive = true;
                         }
                     } finally {
@@ -500,7 +507,7 @@ public abstract class ParentComponentMonitor extends Monitor {
 
     // move to inactive monitors list to use in the Terminated event
     protected synchronized void markInstanceAsInactive(String childId, String instanceId) {
-        if (this.inactiveInstancesMap.containsKey(childId)) {
+        if (!this.inactiveInstancesMap.containsKey(childId)) {
             this.inactiveInstancesMap.get(childId).add(instanceId);
         } else {
             List<String> instanceIds = new ArrayList<String>();
@@ -514,7 +521,7 @@ public abstract class ParentComponentMonitor extends Monitor {
         if (this.inactiveInstancesMap.containsKey(childId) &&
                 this.inactiveInstancesMap.get(childId).contains(instanceId)) {
             this.inactiveInstancesMap.get(childId).remove(instanceId);
-            if (this.inactiveInstancesMap.get(childId).isEmpty()) {
+            if(this.inactiveInstancesMap.get(childId).isEmpty()) {
                 this.inactiveInstancesMap.remove(childId);
             }
         }
@@ -525,7 +532,7 @@ public abstract class ParentComponentMonitor extends Monitor {
         if (this.terminatingInstancesMap.containsKey(childId) &&
                 this.terminatingInstancesMap.get(childId).contains(instanceId)) {
             this.terminatingInstancesMap.get(childId).remove(instanceId);
-            if (this.terminatingInstancesMap.get(childId).isEmpty()) {
+            if(this.terminatingInstancesMap.get(childId).isEmpty()) {
                 this.terminatingInstancesMap.remove(childId);
             }
         }
@@ -533,38 +540,34 @@ public abstract class ParentComponentMonitor extends Monitor {
 
     // move to inactive monitors list to use in the Terminated event
     protected synchronized void markInstanceAsTerminating(String childId, String instanceId) {
-        if (this.inactiveInstancesMap.containsKey(childId) &&
-                this.inactiveInstancesMap.get(childId).contains(instanceId)) {
-            this.inactiveInstancesMap.get(childId).remove(instanceId);
-        }
-        if (this.terminatingInstancesMap.containsKey(childId)) {
-            this.terminatingInstancesMap.get(childId).add(instanceId);
-        } else {
-            List<String> instanceIds = new ArrayList<String>();
-            instanceIds.add(instanceId);
-            this.terminatingInstancesMap.put(childId, instanceIds);
+        if (!this.terminatingInstancesMap.containsKey(childId)) {
+            if (this.inactiveInstancesMap.containsKey(childId) &&
+                    this.inactiveInstancesMap.get(childId).contains(instanceId)) {
+                this.inactiveInstancesMap.get(childId).remove(instanceId);
+                this.terminatingInstancesMap.get(childId).add(instanceId);
+            } else {
+                if (this.inactiveInstancesMap.containsKey(childId) &&
+                        this.inactiveInstancesMap.get(childId).contains(instanceId)) {
+                    this.inactiveInstancesMap.get(childId).remove(instanceId);
+                }
+                List<String> instanceIds = new ArrayList<String>();
+                instanceIds.add(instanceId);
+                this.terminatingInstancesMap.put(childId, instanceIds);
+            }
         }
     }
 
     protected synchronized void startMonitor(ParentComponentMonitor parent,
                                              ApplicationChildContext context, List<String> instanceId) {
-        Thread th = null;
-        if (!this.aliasToActiveMonitorsMap.containsKey(context.getId())) {
-            pendingMonitorsList.add(context.getId());
-            th = new Thread(
-                    new MonitorAdder(parent, context, this.appId, instanceId));
-            if (log.isDebugEnabled()) {
-                log.debug(String
-                        .format("Monitor Adder has been added: [cluster] %s ",
-                                context.getId()));
-            }
-        }
-        if (th != null) {
-            th.start();
-            log.info(String
-                    .format("Monitor thread has been started successfully: [cluster] %s ",
-                            context.getId()));
-        }
+
+	    if (!this.aliasToActiveMonitorsMap.containsKey(context.getId())) {
+		    pendingMonitorsList.add(context.getId());
+		    executorService.submit(new MonitorAdder(parent, context, this.appId, instanceId));
+		    if (log.isDebugEnabled()) {
+			    log.debug(String.format("Monitor Adder has been added: [cluster] %s ",context.getId()));
+		    }
+	    }
+
     }
 
     public Map<String, Monitor> getAliasToActiveMonitorsMap() {
@@ -638,74 +641,79 @@ public abstract class ParentComponentMonitor extends Monitor {
         return autoscaleAlgorithm;
     }
 
-private class MonitorAdder implements Runnable {
-    private ApplicationChildContext context;
-    private ParentComponentMonitor parent;
-    private String appId;
-    private List<String> instanceId;
+    private class MonitorAdder implements Runnable {
+        private ApplicationChildContext context;
+        private ParentComponentMonitor parent;
+        private String appId;
+        private List<String> instanceId;
 
-    public MonitorAdder(ParentComponentMonitor parent, ApplicationChildContext context,
-                        String appId, List<String> instanceId) {
-        this.parent = parent;
-        this.context = context;
-        this.appId = appId;
-        this.instanceId = instanceId;
-    }
+        public MonitorAdder(ParentComponentMonitor parent, ApplicationChildContext context,
+                            String appId, List<String> instanceId) {
+            this.parent = parent;
+            this.context = context;
+            this.appId = appId;
+            this.instanceId = instanceId;
+        }
 
-    public void run() {
-        Monitor monitor = null;
-        int retries = 5;
-        boolean success = false;
-        while (!success && retries != 0) {
-                /*//TODO remove thread.sleep, exectutor service
+        public void run() {
+	        Monitor monitor = null;
+	        int retries = 5;
+	        boolean success = false;
+	        while (!success && retries != 0) {
+	            /*//TODO remove thread.sleep, exectutor service
                 try {
                     Thread.sleep(5000);
                 } catch (InterruptedException e1) {
                 }*/
 
-            if (log.isInfoEnabled()) {
-                log.info("Monitor is going to be started for [group/cluster] "
-                        + context.getId());
-            }
-            try {
-                monitor = MonitorFactory.getMonitor(parent, context, appId, instanceId);
-            } catch (DependencyBuilderException e) {
-                String msg = "Monitor creation failed for: " + context.getId();
-                log.warn(msg, e);
-                retries--;
-            } catch (TopologyInConsistentException e) {
-                String msg = "Monitor creation failed for: " + context.getId();
-                log.warn(msg, e);
-                retries--;
-            } catch (PolicyValidationException e) {
-                String msg = "Monitor creation failed for: " + context.getId();
-                log.warn(msg, e);
-                retries--;
-            } catch (PartitionValidationException e) {
-                String msg = "Monitor creation failed for: " + context.getId();
-                log.warn(msg, e);
-                retries--;
+		        if (log.isInfoEnabled()) {
+			        log.info("Monitor is going to be started for [group/cluster] "
+			                 + context.getId());
+		        }
+		        try {
+			        monitor = MonitorFactory.getMonitor(parent, context, appId, instanceId);
+		        } catch (DependencyBuilderException e) {
+			        String msg = "Monitor creation failed for: " + context.getId();
+			        log.warn(msg, e);
+			        retries--;
+		        } catch (TopologyInConsistentException e) {
+			        String msg = "Monitor creation failed for: " + context.getId();
+			        log.warn(msg, e);
+			        retries--;
+		        } catch (PolicyValidationException e) {
+			        String msg = "Monitor creation failed for: " + context.getId();
+			        log.warn(msg, e);
+			        retries--;
+		        } catch (PartitionValidationException e) {
+			        String msg = "Monitor creation failed for: " + context.getId();
+			        log.warn(msg, e);
+			        retries--;
+		        }
+		        success = true;
+		        if (log.isInfoEnabled()) {
+			        log.info(String
+					                 .format("Monitor thread has been started successfully: [cluster] %s ",
+					                         context.getId()));
+		        }
             }
-            success = true;
-        }
 
-        if (monitor == null) {
-            String msg = "Monitor creation failed, even after retrying for 5 times, "
-                    + "for : " + context.getId();
-            log.error(msg);
-            //TODO parent.notify();
-            throw new RuntimeException(msg);
-        }
+            if (monitor == null) {
+                String msg = "Monitor creation failed, even after retrying for 5 times, "
+                        + "for : " + context.getId();
+                log.error(msg);
+                //TODO parent.notify();
+                throw new RuntimeException(msg);
+            }
 
-        aliasToActiveMonitorsMap.put(context.getId(), monitor);
-        pendingMonitorsList.remove(context.getId());
-        // ApplicationBuilder.
-        if (log.isInfoEnabled()) {
-            log.info(String.format("Monitor has been added successfully for: %s",
-                    context.getId()));
+            aliasToActiveMonitorsMap.put(context.getId(), monitor);
+            pendingMonitorsList.remove(context.getId());
+            // ApplicationBuilder.
+            if (log.isInfoEnabled()) {
+                log.info(String.format("Monitor has been added successfully for: %s",
+                        context.getId()));
+            }
         }
     }
-}
 
 	public Set<String> getScalingDependencies() {
 		return scalingDependencies;

http://git-wip-us.apache.org/repos/asf/stratos/blob/d9cdf102/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index b0b8ebc..1d3ca4a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -22,6 +22,9 @@ package org.apache.stratos.autoscaler.util;
 import java.util.ArrayList;
 import java.util.Iterator;
 import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
 
 import javax.xml.namespace.QName;
 
@@ -37,6 +40,7 @@ import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.registry.RegistryManager;
 import org.apache.stratos.common.Properties;
 import org.apache.stratos.common.Property;
+import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.messaging.domain.applications.Application;
 import org.apache.stratos.messaging.domain.applications.Applications;
 import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
@@ -50,7 +54,10 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
  */
 public class AutoscalerUtil {
 
-    private static final Log log = LogFactory.getLog(AutoscalerUtil.class);
+	private static final Log log = LogFactory.getLog(AutoscalerUtil.class);
+	public static final String IDENTIFIER = "Auto-Scaler";
+	public static final int THREAD_POOL_SIZE = 10;
+	private ExecutorService executorService= StratosThreadPool.getExecutorService(IDENTIFIER, THREAD_POOL_SIZE);
 
     private AutoscalerUtil() {
 
@@ -59,7 +66,16 @@ public class AutoscalerUtil {
     public static AutoscalerUtil getInstance() {
         return Holder.INSTANCE;
     }
-    private static class Holder {
+
+	public ExecutorService getExecutorService() {
+		return executorService;
+	}
+
+	public void setExecutorService(ExecutorService executorService) {
+		this.executorService = executorService;
+	}
+
+	private static class Holder {
         private static final AutoscalerUtil INSTANCE = new AutoscalerUtil();
     }
 
@@ -363,21 +379,18 @@ public class AutoscalerUtil {
     }
 
     public synchronized void startApplicationMonitor(String applicationId) {
-        Thread th = null;
-        AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
-        if (autoscalerContext.getAppMonitor(applicationId) == null) {
-            autoscalerContext.addPendingMonitor(applicationId);
-            th = new Thread(new ApplicationMonitorAdder(applicationId));
-        }
-        if (th != null) {
-            th.start();
-        } else {
-            if (log.isDebugEnabled()) {
-                log.debug(String
-                        .format("Application monitor thread already exists: " +
-                                "[application] %s ", applicationId));
-            }
-        }
+
+	    AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
+	    if (autoscalerContext.getAppMonitor(applicationId) == null) {
+		    autoscalerContext.addPendingMonitor(applicationId);
+		    executorService.submit(new ApplicationMonitorAdder(applicationId));
+	    } else {
+		    if (log.isDebugEnabled()) {
+			    log.debug(String
+					              .format("Application monitor thread already exists: " +
+					                      "[application] %s ", applicationId));
+		    }
+	    }
     }
 
     private class ApplicationMonitorAdder implements Runnable {