You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/12/13 13:42:32 UTC
[2/5] stratos git commit: update the threads with executor service
update the threads with executor service
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/d9cdf102
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/d9cdf102
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/d9cdf102
Branch: refs/heads/4.1.0-test
Commit: d9cdf1025992cc7e323f0d57c723811f2a2f9381
Parents: 8dc3b56
Author: gayan <ga...@puppet.gayan.org>
Authored: Fri Dec 12 17:06:13 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sat Dec 13 18:02:57 2014 +0530
----------------------------------------------------------------------
.../component/ParentComponentMonitor.java | 192 ++++++++++---------
.../stratos/autoscaler/util/AutoscalerUtil.java | 47 +++--
2 files changed, 130 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9cdf102/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
index f694e23..f314c27 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/monitor/component/ParentComponentMonitor.java
@@ -54,17 +54,19 @@ import java.util.ArrayList;
import java.util.Collection;
import java.util.List;
import java.util.Map;
-import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.ExecutorService;
/**
* Monitor is to monitor it's child monitors and
* control them according to the dependencies respectively.
*/
public abstract class ParentComponentMonitor extends Monitor {
- private static final Log log = LogFactory.getLog(ParentComponentMonitor.class);
+ private static final Log log = LogFactory.getLog(ParentComponentMonitor.class);
+ public static final String IDENTIFIER = "Auto-Scaler";
+ public static final int THREAD_POOL_SIZE = 10;
- //The monitors dependency tree with all the start-able/kill-able dependencies
+ //The monitors dependency tree with all the start-able/kill-able dependencies
protected DependencyTree startupDependencyTree;
//The monitors dependency tree with all the scaling dependencies
protected Set<String> scalingDependencies;
@@ -76,6 +78,8 @@ public abstract class ParentComponentMonitor extends Monitor {
protected Map<String, List<String>> inactiveInstancesMap;
//terminating map, key=alias, value instanceIds
protected Map<String, List<String>> terminatingInstancesMap;
+ //Executor service to maintain the thread pool
+ private ExecutorService executorService;
public ParentComponentMonitor(ParentComponent component) throws DependencyBuilderException {
aliasToActiveMonitorsMap = new ConcurrentHashMap<String, Monitor>();
@@ -86,7 +90,10 @@ public abstract class ParentComponentMonitor extends Monitor {
this.id = component.getUniqueIdentifier();
//Building the startup dependencies for this monitor within the immediate children
startupDependencyTree = DependencyBuilder.getInstance().buildDependency(component);
- scalingDependencies = DependencyBuilder.getInstance().buildScalingDependencies(component);
+ //Building the scaling dependencies for this monitor within the immediate children
+ scalingDependencyTree = DependencyBuilder.getInstance().buildDependency(component);
+ //Create the executor service with identifier and thread pool size
+ executorService= StratosThreadPool.getExecutorService(IDENTIFIER, THREAD_POOL_SIZE);
}
/**
@@ -483,8 +490,8 @@ public abstract class ParentComponentMonitor extends Monitor {
TopologyManager.acquireReadLockForCluster(monitor1.getServiceId(),
monitor1.getClusterId());
try {
- if (((ClusterInstance) monitor1.getInstance(instanceId)).getStatus()
- == ClusterStatus.Active) {
+ if (((ClusterInstance)monitor1.getInstance(instanceId)).getStatus()
+ == ClusterStatus.Active) {
parentsActive = true;
}
} finally {
@@ -500,7 +507,7 @@ public abstract class ParentComponentMonitor extends Monitor {
// move to inactive monitors list to use in the Terminated event
protected synchronized void markInstanceAsInactive(String childId, String instanceId) {
- if (this.inactiveInstancesMap.containsKey(childId)) {
+ if (!this.inactiveInstancesMap.containsKey(childId)) {
this.inactiveInstancesMap.get(childId).add(instanceId);
} else {
List<String> instanceIds = new ArrayList<String>();
@@ -514,7 +521,7 @@ public abstract class ParentComponentMonitor extends Monitor {
if (this.inactiveInstancesMap.containsKey(childId) &&
this.inactiveInstancesMap.get(childId).contains(instanceId)) {
this.inactiveInstancesMap.get(childId).remove(instanceId);
- if (this.inactiveInstancesMap.get(childId).isEmpty()) {
+ if(this.inactiveInstancesMap.get(childId).isEmpty()) {
this.inactiveInstancesMap.remove(childId);
}
}
@@ -525,7 +532,7 @@ public abstract class ParentComponentMonitor extends Monitor {
if (this.terminatingInstancesMap.containsKey(childId) &&
this.terminatingInstancesMap.get(childId).contains(instanceId)) {
this.terminatingInstancesMap.get(childId).remove(instanceId);
- if (this.terminatingInstancesMap.get(childId).isEmpty()) {
+ if(this.terminatingInstancesMap.get(childId).isEmpty()) {
this.terminatingInstancesMap.remove(childId);
}
}
@@ -533,38 +540,34 @@ public abstract class ParentComponentMonitor extends Monitor {
// move to inactive monitors list to use in the Terminated event
protected synchronized void markInstanceAsTerminating(String childId, String instanceId) {
- if (this.inactiveInstancesMap.containsKey(childId) &&
- this.inactiveInstancesMap.get(childId).contains(instanceId)) {
- this.inactiveInstancesMap.get(childId).remove(instanceId);
- }
- if (this.terminatingInstancesMap.containsKey(childId)) {
- this.terminatingInstancesMap.get(childId).add(instanceId);
- } else {
- List<String> instanceIds = new ArrayList<String>();
- instanceIds.add(instanceId);
- this.terminatingInstancesMap.put(childId, instanceIds);
+ if (!this.terminatingInstancesMap.containsKey(childId)) {
+ if (this.inactiveInstancesMap.containsKey(childId) &&
+ this.inactiveInstancesMap.get(childId).contains(instanceId)) {
+ this.inactiveInstancesMap.get(childId).remove(instanceId);
+ this.terminatingInstancesMap.get(childId).add(instanceId);
+ } else {
+ if (this.inactiveInstancesMap.containsKey(childId) &&
+ this.inactiveInstancesMap.get(childId).contains(instanceId)) {
+ this.inactiveInstancesMap.get(childId).remove(instanceId);
+ }
+ List<String> instanceIds = new ArrayList<String>();
+ instanceIds.add(instanceId);
+ this.terminatingInstancesMap.put(childId, instanceIds);
+ }
}
}
protected synchronized void startMonitor(ParentComponentMonitor parent,
ApplicationChildContext context, List<String> instanceId) {
- Thread th = null;
- if (!this.aliasToActiveMonitorsMap.containsKey(context.getId())) {
- pendingMonitorsList.add(context.getId());
- th = new Thread(
- new MonitorAdder(parent, context, this.appId, instanceId));
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Monitor Adder has been added: [cluster] %s ",
- context.getId()));
- }
- }
- if (th != null) {
- th.start();
- log.info(String
- .format("Monitor thread has been started successfully: [cluster] %s ",
- context.getId()));
- }
+
+ if (!this.aliasToActiveMonitorsMap.containsKey(context.getId())) {
+ pendingMonitorsList.add(context.getId());
+ executorService.submit(new MonitorAdder(parent, context, this.appId, instanceId));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Monitor Adder has been added: [cluster] %s ",context.getId()));
+ }
+ }
+
}
public Map<String, Monitor> getAliasToActiveMonitorsMap() {
@@ -638,74 +641,79 @@ public abstract class ParentComponentMonitor extends Monitor {
return autoscaleAlgorithm;
}
-private class MonitorAdder implements Runnable {
- private ApplicationChildContext context;
- private ParentComponentMonitor parent;
- private String appId;
- private List<String> instanceId;
+ private class MonitorAdder implements Runnable {
+ private ApplicationChildContext context;
+ private ParentComponentMonitor parent;
+ private String appId;
+ private List<String> instanceId;
- public MonitorAdder(ParentComponentMonitor parent, ApplicationChildContext context,
- String appId, List<String> instanceId) {
- this.parent = parent;
- this.context = context;
- this.appId = appId;
- this.instanceId = instanceId;
- }
+ public MonitorAdder(ParentComponentMonitor parent, ApplicationChildContext context,
+ String appId, List<String> instanceId) {
+ this.parent = parent;
+ this.context = context;
+ this.appId = appId;
+ this.instanceId = instanceId;
+ }
- public void run() {
- Monitor monitor = null;
- int retries = 5;
- boolean success = false;
- while (!success && retries != 0) {
- /*//TODO remove thread.sleep, exectutor service
+ public void run() {
+ Monitor monitor = null;
+ int retries = 5;
+ boolean success = false;
+ while (!success && retries != 0) {
+ /*//TODO remove thread.sleep, exectutor service
try {
Thread.sleep(5000);
} catch (InterruptedException e1) {
}*/
- if (log.isInfoEnabled()) {
- log.info("Monitor is going to be started for [group/cluster] "
- + context.getId());
- }
- try {
- monitor = MonitorFactory.getMonitor(parent, context, appId, instanceId);
- } catch (DependencyBuilderException e) {
- String msg = "Monitor creation failed for: " + context.getId();
- log.warn(msg, e);
- retries--;
- } catch (TopologyInConsistentException e) {
- String msg = "Monitor creation failed for: " + context.getId();
- log.warn(msg, e);
- retries--;
- } catch (PolicyValidationException e) {
- String msg = "Monitor creation failed for: " + context.getId();
- log.warn(msg, e);
- retries--;
- } catch (PartitionValidationException e) {
- String msg = "Monitor creation failed for: " + context.getId();
- log.warn(msg, e);
- retries--;
+ if (log.isInfoEnabled()) {
+ log.info("Monitor is going to be started for [group/cluster] "
+ + context.getId());
+ }
+ try {
+ monitor = MonitorFactory.getMonitor(parent, context, appId, instanceId);
+ } catch (DependencyBuilderException e) {
+ String msg = "Monitor creation failed for: " + context.getId();
+ log.warn(msg, e);
+ retries--;
+ } catch (TopologyInConsistentException e) {
+ String msg = "Monitor creation failed for: " + context.getId();
+ log.warn(msg, e);
+ retries--;
+ } catch (PolicyValidationException e) {
+ String msg = "Monitor creation failed for: " + context.getId();
+ log.warn(msg, e);
+ retries--;
+ } catch (PartitionValidationException e) {
+ String msg = "Monitor creation failed for: " + context.getId();
+ log.warn(msg, e);
+ retries--;
+ }
+ success = true;
+ if (log.isInfoEnabled()) {
+ log.info(String
+ .format("Monitor thread has been started successfully: [cluster] %s ",
+ context.getId()));
+ }
}
- success = true;
- }
- if (monitor == null) {
- String msg = "Monitor creation failed, even after retrying for 5 times, "
- + "for : " + context.getId();
- log.error(msg);
- //TODO parent.notify();
- throw new RuntimeException(msg);
- }
+ if (monitor == null) {
+ String msg = "Monitor creation failed, even after retrying for 5 times, "
+ + "for : " + context.getId();
+ log.error(msg);
+ //TODO parent.notify();
+ throw new RuntimeException(msg);
+ }
- aliasToActiveMonitorsMap.put(context.getId(), monitor);
- pendingMonitorsList.remove(context.getId());
- // ApplicationBuilder.
- if (log.isInfoEnabled()) {
- log.info(String.format("Monitor has been added successfully for: %s",
- context.getId()));
+ aliasToActiveMonitorsMap.put(context.getId(), monitor);
+ pendingMonitorsList.remove(context.getId());
+ // ApplicationBuilder.
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Monitor has been added successfully for: %s",
+ context.getId()));
+ }
}
}
-}
public Set<String> getScalingDependencies() {
return scalingDependencies;
http://git-wip-us.apache.org/repos/asf/stratos/blob/d9cdf102/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
index b0b8ebc..1d3ca4a 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/util/AutoscalerUtil.java
@@ -22,6 +22,9 @@ package org.apache.stratos.autoscaler.util;
import java.util.ArrayList;
import java.util.Iterator;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
import javax.xml.namespace.QName;
@@ -37,6 +40,7 @@ import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
import org.apache.stratos.autoscaler.registry.RegistryManager;
import org.apache.stratos.common.Properties;
import org.apache.stratos.common.Property;
+import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.messaging.domain.applications.Application;
import org.apache.stratos.messaging.domain.applications.Applications;
import org.apache.stratos.messaging.domain.applications.ClusterDataHolder;
@@ -50,7 +54,10 @@ import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
*/
public class AutoscalerUtil {
- private static final Log log = LogFactory.getLog(AutoscalerUtil.class);
+ private static final Log log = LogFactory.getLog(AutoscalerUtil.class);
+ public static final String IDENTIFIER = "Auto-Scaler";
+ public static final int THREAD_POOL_SIZE = 10;
+ private ExecutorService executorService= StratosThreadPool.getExecutorService(IDENTIFIER, THREAD_POOL_SIZE);
private AutoscalerUtil() {
@@ -59,7 +66,16 @@ public class AutoscalerUtil {
public static AutoscalerUtil getInstance() {
return Holder.INSTANCE;
}
- private static class Holder {
+
+ public ExecutorService getExecutorService() {
+ return executorService;
+ }
+
+ public void setExecutorService(ExecutorService executorService) {
+ this.executorService = executorService;
+ }
+
+ private static class Holder {
private static final AutoscalerUtil INSTANCE = new AutoscalerUtil();
}
@@ -363,21 +379,18 @@ public class AutoscalerUtil {
}
public synchronized void startApplicationMonitor(String applicationId) {
- Thread th = null;
- AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
- if (autoscalerContext.getAppMonitor(applicationId) == null) {
- autoscalerContext.addPendingMonitor(applicationId);
- th = new Thread(new ApplicationMonitorAdder(applicationId));
- }
- if (th != null) {
- th.start();
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Application monitor thread already exists: " +
- "[application] %s ", applicationId));
- }
- }
+
+ AutoscalerContext autoscalerContext = AutoscalerContext.getInstance();
+ if (autoscalerContext.getAppMonitor(applicationId) == null) {
+ autoscalerContext.addPendingMonitor(applicationId);
+ executorService.submit(new ApplicationMonitorAdder(applicationId));
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Application monitor thread already exists: " +
+ "[application] %s ", applicationId));
+ }
+ }
}
private class ApplicationMonitorAdder implements Runnable {