You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:40 UTC

[3/9] stratos git commit: Remove unnessary threads in messaging model

Remove unnessary threads in messaging model

Conflicts:
	components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
	components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
	components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
	extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8012f8c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8012f8c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8012f8c8

Branch: refs/heads/master
Commit: 8012f8c8d1b7ef808fae55d0f18826e651c04ed9
Parents: 9e6e91d
Author: gayan <ga...@puppet.gayan.org>
Authored: Mon Dec 1 16:25:22 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 16:36:37 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 527 ++++++++++++++++++-
 .../internal/AutoscalerServerComponent.java     | 131 ++++-
 .../CloudControllerServiceComponent.java        | 142 ++++-
 .../application/ApplicationTopicReceiver.java   |  65 ++-
 .../status/ClusterStatusTopicReceiver.java      | 110 ++--
 .../status/InstanceStatusTopicReceiver.java     | 128 +++--
 .../extension/api/LoadBalancerExtension.java    | 256 +++++----
 .../extension/FaultHandlingWindowProcessor.java | 307 +++++++++--
 .../apache/stratos/haproxy/extension/Main.java  |  48 +-
 9 files changed, 1332 insertions(+), 382 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index bfdf30b..1f14542 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -54,15 +54,16 @@ import java.util.concurrent.ExecutorService;
 /**
  * Autoscaler topology receiver.
  */
-public class AutoscalerTopologyEventReceiver{
+public class AutoscalerTopologyEventReceiver {
 
-    private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
+	private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
 
-    private TopologyEventReceiver topologyEventReceiver;
-    private boolean terminated;
-    private boolean topologyInitialized;
+	private TopologyEventReceiver topologyEventReceiver;
+	private boolean terminated;
+	private boolean topologyInitialized;
 	private ExecutorService executorService;
 
+<<<<<<< HEAD
     public AutoscalerTopologyEventReceiver() {
         this.topologyEventReceiver = new TopologyEventReceiver();
         addEventListeners();
@@ -523,6 +524,461 @@ public class AutoscalerTopologyEventReceiver{
         topologyEventReceiver.terminate();
         terminated = true;
     }
+=======
+	public AutoscalerTopologyEventReceiver() {
+		this.topologyEventReceiver = new TopologyEventReceiver();
+		addEventListeners();
+	}
+
+	public void execute() {
+		//FIXME this activated before autoscaler deployer activated.
+
+		topologyEventReceiver.setExecutorService(executorService);
+		topologyEventReceiver.execute();
+
+		if (log.isInfoEnabled()) {
+			log.info("Autoscaler topology receiver thread started");
+		}
+
+	}
+
+	private boolean allClustersInitialized(Application application) {
+		boolean allClustersInitialized = false;
+		for (ClusterDataHolder holder : application.getClusterDataRecursively()) {
+			TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
+			                                          holder.getClusterId());
+
+			try {
+				Topology topology = TopologyManager.getTopology();
+				if (topology != null) {
+					Service service = topology.getService(holder.getServiceType());
+					if (service != null) {
+						if (service.clusterExists(holder.getClusterId())) {
+							allClustersInitialized = true;
+							return allClustersInitialized;
+						} else {
+							if (log.isDebugEnabled()) {
+								log.debug("[Cluster] " + holder.getClusterId() + " is not found in " +
+								          "the Topology");
+							}
+							allClustersInitialized = false;
+						}
+					} else {
+						if (log.isDebugEnabled()) {
+							log.debug("Service is null in the CompleteTopologyEvent");
+						}
+					}
+				} else {
+					if (log.isDebugEnabled()) {
+						log.debug("Topology is null in the CompleteTopologyEvent");
+					}
+				}
+			} finally {
+				TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
+				                                          holder.getClusterId());
+			}
+		}
+		return allClustersInitialized;
+	}
+
+	private void addEventListeners() {
+		// Listen to topology events that affect clusters
+		topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				if (!topologyInitialized) {
+					log.info("[CompleteTopologyEvent] Received: " + event.getClass());
+					ApplicationHolder.acquireReadLock();
+					try {
+						Applications applications = ApplicationHolder.getApplications();
+						if (applications != null) {
+							for (Application application : applications.getApplications().values()) {
+								if (allClustersInitialized(application)) {
+									startApplicationMonitor(application.getUniqueIdentifier());
+								} else {
+									log.error("Complete Topology is not consistent with the applications " +
+									          "which got persisted");
+								}
+							}
+							topologyInitialized = true;
+						} else {
+							log.info("No applications found in the complete topology");
+						}
+					} catch (Exception e) {
+						log.error("Error processing event", e);
+					} finally {
+						ApplicationHolder.releaseReadLock();
+					}
+				}
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				try {
+					log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass());
+					ApplicationClustersCreatedEvent applicationClustersCreatedEvent =
+							(ApplicationClustersCreatedEvent) event;
+					String appId = applicationClustersCreatedEvent.getAppId();
+					try {
+						//acquire read lock
+						ApplicationHolder.acquireReadLock();
+						//start the application monitor
+						startApplicationMonitor(appId);
+					} catch (Exception e) {
+						String msg = "Error processing event " + e.getLocalizedMessage();
+						log.error(msg, e);
+					} finally {
+						//release read lock
+						ApplicationHolder.releaseReadLock();
+
+					}
+				} catch (ClassCastException e) {
+					String msg = "Error while casting the event " + e.getLocalizedMessage();
+					log.error(msg, e);
+				}
+
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				log.info("[ClusterActivatedEvent] Received: " + event.getClass());
+				ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event;
+				String clusterId = clusterActivatedEvent.getClusterId();
+				AutoscalerContext asCtx = AutoscalerContext.getInstance();
+				AbstractClusterMonitor monitor;
+				monitor = asCtx.getClusterMonitor(clusterId);
+				if (null == monitor) {
+					if (log.isDebugEnabled()) {
+						log.debug(String.format("A cluster monitor is not found in autoscaler context "
+						                        + "[cluster] %s", clusterId));
+					}
+					return;
+				}
+				//changing the status in the monitor, will notify its parent monitor
+
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ClusterResetEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				log.info("[ClusterCreatedEvent] Received: " + event.getClass());
+				ClusterResetEvent clusterResetEvent = (ClusterResetEvent) event;
+				String clusterId = clusterResetEvent.getClusterId();
+				AutoscalerContext asCtx = AutoscalerContext.getInstance();
+				AbstractClusterMonitor monitor;
+				monitor = asCtx.getClusterMonitor(clusterId);
+				if (null == monitor) {
+					if (log.isDebugEnabled()) {
+						log.debug(String.format("A cluster monitor is not found in autoscaler context "
+						                        + "[cluster] %s", clusterId));
+					}
+					return;
+				}
+				//changing the status in the monitor, will notify its parent monitor
+				monitor.destroy();
+				monitor.setStatus(ClusterStatus.Created);
+
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				log.info("[ClusterCreatedEvent] Received: " + event.getClass());
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				log.info("[ClusterInActivateEvent] Received: " + event.getClass());
+				ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event;
+				String clusterId = clusterInactivateEvent.getClusterId();
+				AutoscalerContext asCtx = AutoscalerContext.getInstance();
+				AbstractClusterMonitor monitor;
+				monitor = asCtx.getClusterMonitor(clusterId);
+				if (null == monitor) {
+					if (log.isDebugEnabled()) {
+						log.debug(String.format("A cluster monitor is not found in autoscaler context "
+						                        + "[cluster] %s", clusterId));
+					}
+					return;
+				}
+				//changing the status in the monitor, will notify its parent monitor
+				monitor.setStatus(ClusterStatus.Inactive);
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				log.info("[ClusterTerminatingEvent] Received: " + event.getClass());
+				ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event;
+				String clusterId = clusterTerminatingEvent.getClusterId();
+				String instanceId = clusterTerminatingEvent.getInstanceId();
+				AutoscalerContext asCtx = AutoscalerContext.getInstance();
+				AbstractClusterMonitor monitor;
+				monitor = asCtx.getClusterMonitor(clusterId);
+				if (null == monitor) {
+					if (log.isDebugEnabled()) {
+						log.debug(String.format("A cluster monitor is not found in autoscaler context "
+						                        + "[cluster] %s", clusterId));
+					}
+					// if monitor does not exist, send cluster terminated event
+					ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
+					                                                       clusterTerminatingEvent.getServiceName(),
+					                                                       clusterId, instanceId);
+					return;
+				}
+				//changing the status in the monitor, will notify its parent monitor
+				if (monitor.getStatus() == ClusterStatus.Active) {
+					// terminated gracefully
+					monitor.setStatus(ClusterStatus.Terminating);
+					InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+				} else {
+					monitor.setStatus(ClusterStatus.Terminating);
+					monitor.terminateAllMembers();
+				}
+				ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
+						process("", clusterId, instanceId);
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				log.info("[ClusterTerminatedEvent] Received: " + event.getClass());
+				ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event;
+				String clusterId = clusterTerminatedEvent.getClusterId();
+				AutoscalerContext asCtx = AutoscalerContext.getInstance();
+				AbstractClusterMonitor monitor;
+				monitor = asCtx.getClusterMonitor(clusterId);
+				if (null == monitor) {
+					if (log.isDebugEnabled()) {
+						log.debug(String.format("A cluster monitor is not found in autoscaler context "
+						                        + "[cluster] %s", clusterId));
+					}
+					// if the cluster monitor is null, assume that its termianted
+					ApplicationMonitor appMonitor =
+							AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
+					if (appMonitor != null) {
+						appMonitor
+								.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null));
+					}
+					return;
+				}
+				//changing the status in the monitor, will notify its parent monitor
+				monitor.setStatus(ClusterStatus.Terminated);
+				//Destroying and Removing the Cluster monitor
+				monitor.destroy();
+				AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				try {
+					MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+					String clusterId = memberReadyToShutdownEvent.getClusterId();
+					AutoscalerContext asCtx = AutoscalerContext.getInstance();
+					AbstractClusterMonitor monitor;
+					monitor = asCtx.getClusterMonitor(clusterId);
+					if (null == monitor) {
+						if (log.isDebugEnabled()) {
+							log.debug(String.format("A cluster monitor is not found in autoscaler context "
+							                        + "[cluster] %s", clusterId));
+						}
+						return;
+					}
+					monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+				} catch (Exception e) {
+					String msg = "Error processing event " + e.getLocalizedMessage();
+					log.error(msg, e);
+				}
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				try {
+					MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+					String clusterId = memberTerminatedEvent.getClusterId();
+					AbstractClusterMonitor monitor;
+					AutoscalerContext asCtx = AutoscalerContext.getInstance();
+					monitor = asCtx.getClusterMonitor(clusterId);
+					if (null == monitor) {
+						if (log.isDebugEnabled()) {
+							log.debug(String.format("A cluster monitor is not found in autoscaler context "
+							                        + "[cluster] %s", clusterId));
+						}
+						return;
+					}
+					monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+				} catch (Exception e) {
+					String msg = "Error processing event " + e.getLocalizedMessage();
+					log.error(msg, e);
+				}
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				try {
+					MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+					String clusterId = memberActivatedEvent.getClusterId();
+					AbstractClusterMonitor monitor;
+					AutoscalerContext asCtx = AutoscalerContext.getInstance();
+					monitor = asCtx.getClusterMonitor(clusterId);
+					if (null == monitor) {
+						if (log.isDebugEnabled()) {
+							log.debug(String.format("A cluster monitor is not found in autoscaler context "
+							                        + "[cluster] %s", clusterId));
+						}
+						return;
+					}
+					monitor.handleMemberActivatedEvent(memberActivatedEvent);
+				} catch (Exception e) {
+					String msg = "Error processing event " + e.getLocalizedMessage();
+					log.error(msg, e);
+				}
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
+			@Override
+			protected void onEvent(Event event) {
+				try {
+					MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+					String clusterId = maintenanceModeEvent.getClusterId();
+					AbstractClusterMonitor monitor;
+					AutoscalerContext asCtx = AutoscalerContext.getInstance();
+					monitor = asCtx.getClusterMonitor(clusterId);
+					if (null == monitor) {
+						if (log.isDebugEnabled()) {
+							log.debug(String.format("A cluster monitor is not found in autoscaler context "
+							                        + "[cluster] %s", clusterId));
+						}
+						return;
+					}
+					monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
+				} catch (Exception e) {
+					String msg = "Error processing event " + e.getLocalizedMessage();
+					log.error(msg, e);
+				}
+			}
+		});
+
+		topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+
+				ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+						(ClusterInstanceCreatedEvent) event;
+				AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
+						getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+
+				if (clusterMonitor != null) {
+					TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+					                                          clusterInstanceCreatedEvent.getClusterId());
+
+					try {
+						Service service = TopologyManager.getTopology().
+								getService(clusterInstanceCreatedEvent.getServiceName());
+
+						if (service != null) {
+							Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+							if (cluster != null) {
+								// create and add Cluster Context
+								try {
+									if (cluster.isKubernetesCluster()) {
+										clusterMonitor.addClusterContextForInstance(
+												clusterInstanceCreatedEvent.getInstanceId(),
+												ClusterContextFactory.getKubernetesClusterContext(cluster));
+									} else if (cluster.isLbCluster()) {
+										clusterMonitor.addClusterContextForInstance(
+												clusterInstanceCreatedEvent.getInstanceId(),
+												ClusterContextFactory.getVMLBClusterContext(cluster));
+									} else {
+										clusterMonitor.addClusterContextForInstance(
+												clusterInstanceCreatedEvent.getInstanceId(),
+												ClusterContextFactory.getVMServiceClusterContext(cluster));
+									}
+
+									if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+										clusterMonitor.startScheduler();
+										log.info("Monitoring task for Cluster Monitor with cluster id " +
+										         clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+									}
+
+								} catch (PolicyValidationException e) {
+									log.error(e.getMessage(), e);
+								} catch (PartitionValidationException e) {
+									log.error(e.getMessage(), e);
+								}
+
+							} else {
+								log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() +
+								          ", no cluster instance added to ClusterMonitor " +
+								          clusterInstanceCreatedEvent.getClusterId());
+							}
+						} else {
+							log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
+							          " not found, no cluster instance added to ClusterMonitor " +
+							          clusterInstanceCreatedEvent.getClusterId());
+						}
+
+					} finally {
+						TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+						                                          clusterInstanceCreatedEvent.getClusterId());
+					}
+
+				} else {
+					log.error("No Cluster Monitor found for cluster id " +
+					          clusterInstanceCreatedEvent.getClusterId());
+				}
+			}
+		});
+	}
+
+	/**
+	 * Terminate load balancer topology receiver thread.
+	 */
+	public void terminate() {
+		topologyEventReceiver.terminate();
+		terminated = true;
+	}
+
+	protected synchronized void startApplicationMonitor(String applicationId) {
+		Thread th = null;
+		if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
+			th = new Thread(new ApplicationMonitorAdder(applicationId));
+		}
+		if (th != null) {
+			th.start();
+		} else {
+			if (log.isDebugEnabled()) {
+				log.debug(String
+						          .format("Application monitor thread already exists: " +
+						                  "[application] %s ", applicationId));
+			}
+		}
+	}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 
 	public ExecutorService getExecutorService() {
 		return executorService;
@@ -531,4 +987,65 @@ public class AutoscalerTopologyEventReceiver{
 	public void setExecutorService(ExecutorService executorService) {
 		this.executorService = executorService;
 	}
+<<<<<<< HEAD
+=======
+
+	private class ApplicationMonitorAdder implements Runnable {
+		private String appId;
+
+		public ApplicationMonitorAdder(String appId) {
+			this.appId = appId;
+		}
+
+		public void run() {
+			ApplicationMonitor applicationMonitor = null;
+			int retries = 5;
+			boolean success = false;
+			do {
+				try {
+					Thread.sleep(5000);
+				} catch (InterruptedException e1) {
+				}
+				try {
+					long start = System.currentTimeMillis();
+					if (log.isDebugEnabled()) {
+						log.debug("application monitor is going to be started for [application] " +
+						          appId);
+					}
+					try {
+						applicationMonitor = MonitorFactory.getApplicationMonitor(appId);
+					} catch (PolicyValidationException e) {
+						String msg = "Application monitor creation failed for Application: ";
+						log.warn(msg, e);
+						retries--;
+					}
+					long end = System.currentTimeMillis();
+					log.info("Time taken to start app monitor: " + (end - start) / 1000);
+					success = true;
+				} catch (DependencyBuilderException e) {
+					String msg = "Application monitor creation failed for Application: ";
+					log.warn(msg, e);
+					retries--;
+				} catch (TopologyInConsistentException e) {
+					String msg = "Application monitor creation failed for Application: ";
+					log.warn(msg, e);
+					retries--;
+				}
+			} while (!success && retries != 0);
+
+			if (applicationMonitor == null) {
+				String msg = "Application monitor creation failed, even after retrying for 5 times, "
+				             + "for Application: " + appId;
+				log.error(msg);
+				throw new RuntimeException(msg);
+			}
+
+			AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+			if (log.isInfoEnabled()) {
+				log.info(String.format("Application monitor has been added successfully: " +
+				                       "[application] %s", applicationMonitor.getId()));
+			}
+		}
+	}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 3694066..2e443de 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -65,16 +65,16 @@ public class AutoscalerServerComponent {
 
 	private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
 	private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
-	private static final String THREAD_POOL_SIZE_KEY= "threadPool.autoscaler.threadPoolSize";
+	private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
 	private static final String COMPONENTS_CONFIG = "components-config";
 	private static final int THREAD_POOL_SIZE = 10;
 	private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
 
-
 	private AutoscalerTopologyEventReceiver asTopologyReceiver;
-    private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
+	private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
 
 	protected void activate(ComponentContext componentContext) throws Exception {
+<<<<<<< HEAD
         try {
             // Start topology receiver
 	        XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
@@ -197,3 +197,128 @@ public class AutoscalerServerComponent {
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
 }
+=======
+		try {
+			// Start topology receiver
+			XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+			int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
+			String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+			ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
+			asTopologyReceiver = new AutoscalerTopologyEventReceiver();
+			asTopologyReceiver.setExecutorService(executorService);
+			asTopologyReceiver.execute();
+
+			if (log.isDebugEnabled()) {
+				log.debug("Topology receiver executor service started");
+			}
+
+			// Start health stat receiver
+			autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
+			Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
+			healthDelegatorThread.start();
+			if (log.isDebugEnabled()) {
+				log.debug("Health statistics receiver thread started");
+			}
+
+			// Adding the registry stored partitions to the information model
+			List<Partition> partitions = RegistryManager.getInstance().retrievePartitions();
+			Iterator<Partition> partitionIterator = partitions.iterator();
+			while (partitionIterator.hasNext()) {
+				Partition partition = partitionIterator.next();
+				PartitionManager.getInstance().addPartitionToInformationModel(partition);
+			}
+
+			// Adding the network partitions stored in registry to the information model
+			List<NetworkPartitionLbHolder> nwPartitionHolders =
+					RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
+			Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
+			while (nwPartitionIterator.hasNext()) {
+				NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
+				PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
+			}
+
+			List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
+			Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
+			while (asPolicyIterator.hasNext()) {
+				AutoscalePolicy asPolicy = asPolicyIterator.next();
+				PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
+			}
+
+			List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
+			Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
+			while (depPolicyIterator.hasNext()) {
+				DeploymentPolicy depPolicy = depPolicyIterator.next();
+				PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
+			}
+
+			// Adding KubernetesGroups stored in registry to the information model
+			List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
+			Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
+			while (kubernetesGroupIterator.hasNext()) {
+				KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
+				KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
+			}
+
+			//starting the processor chain
+			ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
+			ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
+
+			GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
+			ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
+
+			if (log.isInfoEnabled()) {
+				log.info("Scheduling tasks to publish applications");
+			}
+
+			ApplicationSynchronizerTaskScheduler
+					.schedule(ServiceReferenceHolder.getInstance()
+					                                .getTaskService());
+
+			if (log.isInfoEnabled()) {
+				log.info("Autoscaler server Component activated");
+			}
+		} catch (Throwable e) {
+			log.error("Error in activating the autoscaler component ", e);
+		}
+	}
+
+	protected void deactivate(ComponentContext context) {
+		asTopologyReceiver.terminate();
+		autoscalerHealthStatEventReceiver.terminate();
+	}
+
+	protected void setRegistryService(RegistryService registryService) {
+		if (log.isDebugEnabled()) {
+			log.debug("Setting the Registry Service");
+		}
+		try {
+			ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry());
+		} catch (RegistryException e) {
+			String msg = "Failed when retrieving Governance System Registry.";
+			log.error(msg, e);
+			throw new AutoScalerException(msg, e);
+		}
+	}
+
+	protected void unsetRegistryService(RegistryService registryService) {
+		if (log.isDebugEnabled()) {
+			log.debug("Un-setting the Registry Service");
+		}
+		ServiceReferenceHolder.getInstance().setRegistry(null);
+	}
+
+	protected void setTaskService(TaskService taskService) {
+		if (log.isDebugEnabled()) {
+			log.debug("Setting the Task Service");
+		}
+		ServiceReferenceHolder.getInstance().setTaskService(taskService);
+	}
+
+	protected void unsetTaskService(TaskService taskService) {
+		if (log.isDebugEnabled()) {
+			log.debug("Un-setting the Task Service");
+		}
+		ServiceReferenceHolder.getInstance().setTaskService(null);
+	}
+}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index a47e924..dfbf1ec 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,9 +20,12 @@ package org.apache.stratos.cloud.controller.internal;
  *
 */
 
+<<<<<<< HEAD
 
 import com.hazelcast.core.HazelcastInstance;
 
+=======
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
@@ -49,6 +52,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  * Registering Cloud Controller Service.
  *
  * @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
+<<<<<<< HEAD
  * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
  *                cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
  * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -57,42 +61,72 @@ import org.wso2.carbon.utils.ConfigurationContextService;
  *                cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
  * @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService"
  *                cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+=======
+ * @scr.reference name="distributedMapProvider" interface="org.wso2.carbon.caching.impl.DistributedMapProvider"
+ * cardinality="1..1" policy="dynamic" bind="setDistributedMapProvider" unbind="unsetDistributedMapProvider"
+ * @scr.reference name="ntask.component"
+ * interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
+ * @scr.reference name="registry.service"
+ * interface="org.wso2.carbon.registry.core.service.RegistryService"
+ * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
+ * @scr.reference name="config.context.service"
+ * interface="org.wso2.carbon.utils.ConfigurationContextService"
+ * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
  */
 public class CloudControllerServiceComponent {
 
-    private static final Log log = LogFactory.getLog(CloudControllerServiceComponent.class);
-    private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
-    private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
-    private ApplicationTopicReceiver applicationTopicReceiver;
+	private static final Log log = LogFactory.getLog(CloudControllerServiceComponent.class);
+	private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
+	private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
+	private ApplicationTopicReceiver applicationTopicReceiver;
 
-    protected void activate(ComponentContext context) {
-        try {
-            applicationTopicReceiver = new ApplicationTopicReceiver();
-            applicationTopicReceiver.execute();
+	protected void activate(ComponentContext context) {
+		try {
+			applicationTopicReceiver = new ApplicationTopicReceiver();
+			applicationTopicReceiver.execute();
 
+			if (log.isInfoEnabled()) {
+				log.info("Application Receiver thread started");
+			}
+
+<<<<<<< HEAD
             if (log.isInfoEnabled()) {
                 log.info("Application event receiver thread started");
             }
+=======
+			clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
+			clusterStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 
-            clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
-	        clusterStatusTopicReceiver.execute();
+			if (log.isInfoEnabled()) {
+				log.info("Cluster status Receiver thread started");
+			}
 
+<<<<<<< HEAD
             if (log.isInfoEnabled()) {
                 log.info("Cluster status receiver thread started");
             }
+=======
+			instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
+			instanceStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 
-            instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
-            instanceStatusTopicReceiver.execute();
+			if (log.isInfoEnabled()) {
+				log.info("Instance status message receiver thread started");
+			}
 
-            if(log.isInfoEnabled()) {
-                log.info("Instance status message receiver thread started");
-            }
+			// Register cloud controller service
+			BundleContext bundleContext = context.getBundleContext();
+			bundleContext.registerService(CloudControllerService.class.getName(),
+			                              new CloudControllerServiceImpl(), null);
 
-        	// Register cloud controller service
-            BundleContext bundleContext = context.getBundleContext();
-            bundleContext.registerService(CloudControllerService.class.getName(),
-                    new CloudControllerServiceImpl(), null);
+			if (log.isInfoEnabled()) {
+				log.info("Scheduling tasks");
+			}
 
+<<<<<<< HEAD
             if(log.isInfoEnabled()) {
                 log.info("Scheduling tasks");
             }
@@ -123,29 +157,71 @@ public class CloudControllerServiceComponent {
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
     
+=======
+			TopologySynchronizerTaskScheduler
+					.schedule(ServiceReferenceHolder.getInstance()
+					                                .getTaskService());
+
+		} catch (Throwable e) {
+			log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
+		}
+	}
+
+	protected void setTaskService(TaskService taskService) {
+		if (log.isDebugEnabled()) {
+			log.debug("Setting the Task Service");
+		}
+		ServiceReferenceHolder.getInstance().setTaskService(taskService);
+	}
+
+	protected void unsetTaskService(TaskService taskService) {
+		if (log.isDebugEnabled()) {
+			log.debug("Unsetting the Task Service");
+		}
+		ServiceReferenceHolder.getInstance().setTaskService(null);
+	}
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 	protected void setRegistryService(RegistryService registryService) {
 		if (log.isDebugEnabled()) {
 			log.debug("Setting the Registry Service");
 		}
-		
-		try {			
+
+		try {
 			UserRegistry registry = registryService.getGovernanceSystemRegistry();
+<<<<<<< HEAD
 	        ServiceReferenceHolder.getInstance().setRegistry(registry);
         } catch (RegistryException e) {
         	String msg = "Failed when retrieving Governance System Registry.";
         	log.error(msg, e);
         	throw new CloudControllerException(msg, e);
         } 
+=======
+			ServiceReferenceHolder.getInstance()
+			                      .setRegistry(registry);
+		} catch (RegistryException e) {
+			String msg = "Failed when retrieving Governance System Registry.";
+			log.error(msg, e);
+			throw new CloudControllerException(msg, e);
+		}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 	}
 
 	protected void unsetRegistryService(RegistryService registryService) {
 		if (log.isDebugEnabled()) {
+<<<<<<< HEAD
             log.debug("Un-setting the Registry Service");
         }
         ServiceReferenceHolder.getInstance().setRegistry(null);
+=======
+			log.debug("Unsetting the Registry Service");
+		}
+		ServiceReferenceHolder.getInstance().setRegistry(null);
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 	}
-	
+
 	protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
+<<<<<<< HEAD
         ServiceReferenceHolder.getInstance().setAxisConfiguration(
                 cfgCtxService.getServerConfigContext().getAxisConfiguration());
     }
@@ -162,8 +238,26 @@ public class CloudControllerServiceComponent {
         ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null);
     }
 	
+=======
+		ServiceReferenceHolder.getInstance().setAxisConfiguration(
+				cfgCtxService.getServerConfigContext().getAxisConfiguration());
+	}
+
+	protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
+		ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
+	}
+
+	protected void setDistributedMapProvider(DistributedMapProvider mapProvider) {
+		ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider);
+	}
+
+	protected void unsetDistributedMapProvider(DistributedMapProvider mapProvider) {
+		ServiceReferenceHolder.getInstance().setDistributedMapProvider(null);
+	}
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 	protected void deactivate(ComponentContext ctx) {
-        // Close event publisher connections to message broker
-        EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
+		// Close event publisher connections to message broker
+		EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
 	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index 87dfe6e..d65b7f5 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -29,45 +29,44 @@ import org.apache.stratos.messaging.message.receiver.applications.ApplicationsEv
 /**
  * This is to receive the application topic messages.
  */
-public class ApplicationTopicReceiver{
-    private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class);
-    private ApplicationsEventReceiver applicationsEventReceiver;
-    private boolean terminated;
+public class ApplicationTopicReceiver {
+	private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class);
+	private ApplicationsEventReceiver applicationsEventReceiver;
+	private boolean terminated;
 
-    public ApplicationTopicReceiver() {
-        this.applicationsEventReceiver = new ApplicationsEventReceiver();
-        addEventListeners();
+	public ApplicationTopicReceiver() {
+		this.applicationsEventReceiver = new ApplicationsEventReceiver();
+		addEventListeners();
 
-    }
+	}
 
-    
+	public void execute() {
 
-    public void execute() {
+		if (log.isInfoEnabled()) {
+			log.info("Cloud controller application status thread started");
+		}
+		applicationsEventReceiver.execute();
 
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread started");
-        }
-	    applicationsEventReceiver.execute();
+		if (log.isInfoEnabled()) {
+			log.info("Cloud controller application status thread terminated");
+		}
 
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread terminated");
-        }
+	}
 
-    }
-    private void addEventListeners() {
-        applicationsEventReceiver.addEventListener(new ApplicationTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                //Remove the application related data
-                ApplicationTerminatedEvent terminatedEvent = (ApplicationTerminatedEvent)event;
-                log.info("ApplicationTerminatedEvent received for [application] " + terminatedEvent.getAppId());
-                String appId = terminatedEvent.getAppId();
-                TopologyBuilder.handleApplicationClustersRemoved(appId, terminatedEvent.getClusterData());
-            }
-        });
-    }
+	private void addEventListeners() {
+		applicationsEventReceiver.addEventListener(new ApplicationTerminatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				//Remove the application related data
+				ApplicationTerminatedEvent terminatedEvent = (ApplicationTerminatedEvent) event;
+				log.info("ApplicationTerminatedEvent received for [application] " + terminatedEvent.getAppId());
+				String appId = terminatedEvent.getAppId();
+				TopologyBuilder.handleApplicationClustersRemoved(appId, terminatedEvent.getClusterData());
+			}
+		});
+	}
 
-    public void setTerminated(boolean terminated) {
-        this.terminated = terminated;
-    }
+	public void setTerminated(boolean terminated) {
+		this.terminated = terminated;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index bd2fbf0..ca6d4ad 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,16 +26,16 @@ import org.apache.stratos.messaging.event.cluster.status.*;
 import org.apache.stratos.messaging.listener.cluster.status.*;
 import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
 
-public class ClusterStatusTopicReceiver{
-    private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
+public class ClusterStatusTopicReceiver {
+	private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
 
-    private ClusterStatusEventReceiver statusEventReceiver;
-    private boolean terminated;
+	private ClusterStatusEventReceiver statusEventReceiver;
+	private boolean terminated;
 
-    public ClusterStatusTopicReceiver() {
-        this.statusEventReceiver = new ClusterStatusEventReceiver();
-        addEventListeners();
-    }
+	public ClusterStatusTopicReceiver() {
+		this.statusEventReceiver = new ClusterStatusEventReceiver();
+		addEventListeners();
+	}
 
 	public void execute() {
 
@@ -47,58 +47,58 @@ public class ClusterStatusTopicReceiver{
 	}
 
 	private void addEventListeners() {
-        // Listen to topology events that affect clusters
-        statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
-            }
-        });
+		// Listen to topology events that affect clusters
+		statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
+			}
+		});
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                //TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
-            }
-        });
+		statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				//TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
+			}
+		});
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterCreatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
-            }
-        });
+		statusEventReceiver.addEventListener(new ClusterStatusClusterCreatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
+			}
+		});
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event);
-            }
-        });
+		statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event);
+			}
+		});
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event);
-            }
-        });
+		statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event);
+			}
+		});
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event);
-            }
-        });
+		statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event);
+			}
+		});
 
-        statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent) event);
-            }
-        });
-    }
+		statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent) event);
+			}
+		});
+	}
 
-    public void setTerminated(boolean terminated) {
-        this.terminated = terminated;
-    }
+	public void setTerminated(boolean terminated) {
+		this.terminated = terminated;
+	}
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index d5475f0..42aabed 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -35,71 +35,67 @@ import org.apache.stratos.messaging.message.receiver.instance.status.InstanceSta
 /**
  * This will handle the instance status events
  */
-public class InstanceStatusTopicReceiver{
-    private static final Log log = LogFactory.getLog(InstanceStatusTopicReceiver.class);
-
-    private InstanceStatusEventReceiver statusEventReceiver;
-    private boolean terminated;
-
-    public InstanceStatusTopicReceiver() {
-        this.statusEventReceiver = new InstanceStatusEventReceiver();
-        addEventListeners();
-    }
-
-
-
-    public void execute() {
-        statusEventReceiver.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread started");
-        }
-
-
-        if (log.isInfoEnabled()) {
-            log.info("Cloud controller application status thread terminated");
-        }
-    }
-
-    private void addEventListeners() {
-        statusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new InstanceStartedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                TopologyBuilder.handleMemberStarted((InstanceStartedEvent) event);
-            }
-        });
-
-        statusEventReceiver.addEventListener(new InstanceReadyToShutdownEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) event);
-                } catch (Exception e) {
-                    String error = "Failed to retrieve the instance status event message";
-                    log.error(error, e);
-                }
-            }
-        });
-
-        statusEventReceiver.addEventListener(new InstanceMaintenanceListener() {
-            @Override
-            protected void onEvent(Event event) {
-                try {
-                    TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
-                } catch (Exception e) {
-                String error = "Failed to retrieve the instance status event message";
-                log.error(error, e);
-                }
-            }
-        });
-
-
-    }
+public class InstanceStatusTopicReceiver {
+	private static final Log log = LogFactory.getLog(InstanceStatusTopicReceiver.class);
+
+	private InstanceStatusEventReceiver statusEventReceiver;
+	private boolean terminated;
+
+	public InstanceStatusTopicReceiver() {
+		this.statusEventReceiver = new InstanceStatusEventReceiver();
+		addEventListeners();
+	}
+
+	public void execute() {
+		statusEventReceiver.execute();
+		if (log.isInfoEnabled()) {
+			log.info("Cloud controller application status thread started");
+		}
+
+		if (log.isInfoEnabled()) {
+			log.info("Cloud controller application status thread terminated");
+		}
+	}
+
+	private void addEventListeners() {
+		statusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event);
+			}
+		});
+
+		statusEventReceiver.addEventListener(new InstanceStartedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				TopologyBuilder.handleMemberStarted((InstanceStartedEvent) event);
+			}
+		});
+
+		statusEventReceiver.addEventListener(new InstanceReadyToShutdownEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				try {
+					TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) event);
+				} catch (Exception e) {
+					String error = "Failed to retrieve the instance status event message";
+					log.error(error, e);
+				}
+			}
+		});
+
+		statusEventReceiver.addEventListener(new InstanceMaintenanceListener() {
+			@Override
+			protected void onEvent(Event event) {
+				try {
+					TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
+				} catch (Exception e) {
+					String error = "Failed to retrieve the instance status event message";
+					log.error(error, e);
+				}
+			}
+		});
+
+	}
 
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e74721f..188b2ac 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -35,136 +35,134 @@ import java.util.concurrent.ExecutorService;
  * received from the message broker.
  */
 public class LoadBalancerExtension implements Runnable {
-    private static final Log log = LogFactory.getLog(LoadBalancerExtension.class);
-
-    private LoadBalancer loadBalancer;
-    private LoadBalancerStatisticsReader statsReader;
-    private boolean loadBalancerStarted;
-    private TopologyEventReceiver topologyEventReceiver;
-    private LoadBalancerStatisticsNotifier statisticsNotifier;
-    private boolean terminated;
+	private static final Log log = LogFactory.getLog(LoadBalancerExtension.class);
+
+	private LoadBalancer loadBalancer;
+	private LoadBalancerStatisticsReader statsReader;
+	private boolean loadBalancerStarted;
+	private TopologyEventReceiver topologyEventReceiver;
+	private LoadBalancerStatisticsNotifier statisticsNotifier;
+	private boolean terminated;
 	private ExecutorService executorService;
-    /**
-     * Load balancer extension constructor.
-     * @param loadBalancer Load balancer instance: Mandatory.
-     * @param statsReader Statistics reader: If null statistics notifier thread will not be started.
-     */
-    public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) {
-        this.loadBalancer = loadBalancer;
-        this.statsReader = statsReader;
-    }
-
-    @Override
-    public void run() {
-        try {
-            if(log.isInfoEnabled()) {
-                log.info("Load balancer extension started");
-            }
-
-            // Start topology receiver thread
-            topologyEventReceiver = new TopologyEventReceiver();
-            addEventListeners();
-	        topologyEventReceiver.setExecutorService(executorService);
-	        topologyEventReceiver.execute();
-
-
-            if(statsReader != null) {
-                // Start stats notifier thread
-                statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
-                Thread statsNotifierThread = new Thread(statisticsNotifier);
-                statsNotifierThread.start();
-            }
-            else {
-                if(log.isWarnEnabled()) {
-                    log.warn("Load balancer statistics reader not found");
-                }
-            }
-
-            // Keep the thread live until terminated
-            while (!terminated);
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not start load balancer extension", e);
-            }
-        }
-    }
-
-    private void addEventListeners() {
-        topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
-
-            @Override
-            protected void onEvent(Event event) {
-                try {
-
-                    if (!loadBalancerStarted) {
-                        // Configure load balancer
-                        loadBalancer.configure(TopologyManager.getTopology());
-
-                        // Start load balancer
-                        loadBalancer.start();
-                        loadBalancerStarted = true;
-                    }
-                } catch (Exception e) {
-                    if (log.isErrorEnabled()) {
-                        log.error("Could not start load balancer", e);
-                    }
-                    terminate();
-                }
-            }
-        });
-        topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-        topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
-            @Override
-            protected void onEvent(Event event) {
-                reloadConfiguration();
-            }
-        });
-    }
-
-    private void reloadConfiguration() {
-        try {
-            if (loadBalancerStarted) {
-                loadBalancer.reload(TopologyManager.getTopology());
-            }
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error("Could not reload load balancer configuration", e);
-            }
-        }
-    }
-
-    public void terminate() {
-        if(topologyEventReceiver != null) {
-            topologyEventReceiver.terminate();
-        }
-        if(statisticsNotifier != null) {
-            statisticsNotifier.terminate();
-        }
-        terminated = true;
-    }
+
+	/**
+	 * Load balancer extension constructor.
+	 *
+	 * @param loadBalancer Load balancer instance: Mandatory.
+	 * @param statsReader  Statistics reader: If null statistics notifier thread will not be started.
+	 */
+	public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) {
+		this.loadBalancer = loadBalancer;
+		this.statsReader = statsReader;
+	}
+
+	@Override
+	public void run() {
+		try {
+			if (log.isInfoEnabled()) {
+				log.info("Load balancer extension started");
+			}
+
+			// Start topology receiver thread
+			topologyEventReceiver = new TopologyEventReceiver();
+			addEventListeners();
+			topologyEventReceiver.setExecutorService(executorService);
+			topologyEventReceiver.execute();
+
+			if (statsReader != null) {
+				// Start stats notifier thread
+				statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
+				Thread statsNotifierThread = new Thread(statisticsNotifier);
+				statsNotifierThread.start();
+			} else {
+				if (log.isWarnEnabled()) {
+					log.warn("Load balancer statistics reader not found");
+				}
+			}
+
+		} catch (Exception e) {
+			if (log.isErrorEnabled()) {
+				log.error("Could not start load balancer extension", e);
+			}
+		}
+	}
+
+	private void addEventListeners() {
+		topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+
+			@Override
+			protected void onEvent(Event event) {
+				try {
+
+					if (!loadBalancerStarted) {
+						// Configure load balancer
+						loadBalancer.configure(TopologyManager.getTopology());
+
+						// Start load balancer
+						loadBalancer.start();
+						loadBalancerStarted = true;
+					}
+				} catch (Exception e) {
+					if (log.isErrorEnabled()) {
+						log.error("Could not start load balancer", e);
+					}
+					terminate();
+				}
+			}
+		});
+		topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				reloadConfiguration();
+			}
+		});
+		topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				reloadConfiguration();
+			}
+		});
+		topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				reloadConfiguration();
+			}
+		});
+		topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				reloadConfiguration();
+			}
+		});
+		topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
+			@Override
+			protected void onEvent(Event event) {
+				reloadConfiguration();
+			}
+		});
+	}
+
+	private void reloadConfiguration() {
+		try {
+			if (loadBalancerStarted) {
+				loadBalancer.reload(TopologyManager.getTopology());
+			}
+		} catch (Exception e) {
+			if (log.isErrorEnabled()) {
+				log.error("Could not reload load balancer configuration", e);
+			}
+		}
+	}
+
+	public void terminate() {
+		if (topologyEventReceiver != null) {
+			topologyEventReceiver.terminate();
+		}
+		if (statisticsNotifier != null) {
+			statisticsNotifier.terminate();
+		}
+		terminated = true;
+	}
 
 	public ExecutorService getExecutorService() {
 		return executorService;

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 56a3fcf..5ee28d1 100644
--- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -58,53 +58,56 @@ import java.util.concurrent.TimeUnit;
 @SiddhiExtension(namespace = "stratos", function = "faultHandling")
 public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
 
-    private static final int TIME_OUT = 60 * 1000;
-    static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
-    private ScheduledExecutorService faultHandleScheduler;
-    private ThreadBarrier threadBarrier;
-    private long timeToKeep;
-    private ISchedulerSiddhiQueue<StreamEvent> window;
-    private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
-    private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
-    private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
-
-    // Map of member id's to their last received health event time stamp
-    private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
-
-    // Event receiver to receive topology events published by cloud-controller
-    private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
-
-    // Stratos member id attribute index in stream execution plan
-    private int memberIdAttrIndex;
-
-    @Override
-    protected void processEvent(InEvent event) {
-        addDataToMap(event);
-    }
-
-    @Override
-    protected void processEvent(InListEvent listEvent) {
-        for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
-            addDataToMap((InEvent) listEvent.getEvent(i));
-        }
-    }
-
-    /**
-     * Add new entry to time stamp map from the received event.
-     *
-     * @param event Event received by Siddhi.
-     */
-    protected void addDataToMap(InEvent event) {
-        String id = (String) event.getData()[memberIdAttrIndex];
-        //checking whether this member is the topology.
-        //sometimes there can be a delay between publishing member terminated events
-        //and actually terminating instances. Hence CEP might get events for already terminated members
-        //so we are checking the topology for the member existence
-        Member member = getMemberFromId(id);
-        if (null == member) {
+	private static final int TIME_OUT = 60 * 1000;
+	static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+	public static final String IDENTIFIER = "AutoScaler";
+	private ScheduledExecutorService faultHandleScheduler;
+	private ThreadBarrier threadBarrier;
+	private long timeToKeep;
+	private ISchedulerSiddhiQueue<StreamEvent> window;
+	private EventPublisher healthStatPublisher =
+			EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
+	private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+	private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+	// Map of member id's to their last received health event time stamp
+	private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+
+	// Event receiver to receive topology events published by cloud-controller
+	private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+	// Stratos member id attribute index in stream execution plan
+	private int memberIdAttrIndex;
+
+	@Override
+	protected void processEvent(InEvent event) {
+		addDataToMap(event);
+	}
+
+	@Override
+	protected void processEvent(InListEvent listEvent) {
+		for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+			addDataToMap((InEvent) listEvent.getEvent(i));
+		}
+	}
+
+	/**
+	 * Add new entry to time stamp map from the received event.
+	 *
+	 * @param event Event received by Siddhi.
+	 */
+	protected void addDataToMap(InEvent event) {
+		String id = (String) event.getData()[memberIdAttrIndex];
+		//checking whether this member is the topology.
+		//sometimes there can be a delay between publishing member terminated events
+		//and actually terminating instances. Hence CEP might get events for already terminated members
+		//so we are checking the topology for the member existence
+		Member member = getMemberFromId(id);
+		if (null == member) {
 			log.debug("Member not found in the toplogy. Event rejected");
 			return;
 		}
+<<<<<<< HEAD
         if (StringUtils.isNotEmpty(id)) {
             memberTimeStampMap.put(id, event.getTimeStamp());
         } else {
@@ -317,4 +320,220 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
+=======
+		if (StringUtils.isNotEmpty(id)) {
+			memberTimeStampMap.put(id, event.getTimeStamp());
+		} else {
+			log.warn("NULL member id found in the event received. Event rejected.");
+		}
+		if (log.isDebugEnabled()) {
+			log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
+		}
+	}
+
+	@Override
+	public Iterator<StreamEvent> iterator() {
+		return window.iterator();
+	}
+
+	@Override
+	public Iterator<StreamEvent> iterator(String predicate) {
+		if (siddhiContext.isDistributedProcessingEnabled()) {
+			return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+		} else {
+			return window.iterator();
+		}
+	}
+
+	/**
+	 * Retrieve the current activated members from the topology and initialize the time stamp map.
+	 * This will allow the system to recover from a restart
+	 *
+	 * @param topology Topology model object
+	 */
+	boolean loadTimeStampMapFromTopology(Topology topology) {
+
+		long currentTimeStamp = System.currentTimeMillis();
+		if (topology == null || topology.getServices() == null) {
+			return false;
+		}
+		// TODO make this efficient by adding APIs to messaging component
+		for (Service service : topology.getServices()) {
+			if (service.getClusters() != null) {
+				for (Cluster cluster : service.getClusters()) {
+					if (cluster.getMembers() != null) {
+						for (Member member : cluster.getMembers()) {
+							// we are checking faulty status only in previously activated members
+							if (member != null && MemberStatus.Activated.equals(member.getStatus())) {
+								// Initialize the member time stamp map from the topology at the beginning
+								memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+							}
+						}
+					}
+				}
+			}
+		}
+
+		log.info("Member time stamp map was successfully loaded from the topology.");
+		if (log.isDebugEnabled()) {
+			log.debug("Member TimeStamp Map: " + memberTimeStampMap);
+		}
+		return true;
+	}
+
+	private Member getMemberFromId(String memberId) {
+		if (StringUtils.isEmpty(memberId)) {
+			return null;
+		}
+		if (TopologyManager.getTopology().isInitialized()) {
+			try {
+				TopologyManager.acquireReadLock();
+				if (TopologyManager.getTopology().getServices() == null) {
+					return null;
+				}
+				// TODO make this efficient by adding APIs to messaging component
+				for (Service service : TopologyManager.getTopology().getServices()) {
+					if (service.getClusters() != null) {
+						for (Cluster cluster : service.getClusters()) {
+							if (cluster.getMembers() != null) {
+								for (Member member : cluster.getMembers()) {
+									if (memberId.equals(member.getMemberId())) {
+										return member;
+									}
+								}
+							}
+						}
+					}
+				}
+			} catch (Exception e) {
+				log.error("Error while reading topology" + e);
+			} finally {
+				TopologyManager.releaseReadLock();
+			}
+		}
+		return null;
+	}
+
+	private void publishMemberFault(String memberId) {
+		Member member = getMemberFromId(memberId);
+		if (member == null) {
+			log.error("Failed to publish member fault event. Member having [member-id] " + memberId +
+			          " does not exist in topology");
+			return;
+		}
+		log.info("Publishing member fault event for [member-id] " + memberId);
+
+		MemberFaultEvent memberFaultEvent =
+				new MemberFaultEvent(member.getClusterId(), member.getInstanceId(), member.getMemberId(),
+				                     member.getPartitionId(), 0);
+
+		memberFaultEventMessageMap.put("message", memberFaultEvent);
+		healthStatPublisher.publish(MemberFaultEventMap, true);
+	}
+
+	@Override
+	public void run() {
+		try {
+			threadBarrier.pass();
+
+			for (Object o : memberTimeStampMap.entrySet()) {
+				Map.Entry pair = (Map.Entry) o;
+				long currentTime = System.currentTimeMillis();
+				Long eventTimeStamp = (Long) pair.getValue();
+
+				if ((currentTime - eventTimeStamp) > TIME_OUT) {
+					log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+					         eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
+					publishMemberFault((String) pair.getKey());
+				}
+			}
+			if (log.isDebugEnabled()) {
+				log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
+				          memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
+			}
+		} catch (Throwable t) {
+			log.error(t.getMessage(), t);
+		} finally {
+			faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+		}
+	}
+
+	@Override
+	protected Object[] currentState() {
+		return new Object[] { window.currentState() };
+	}
+
+	@Override
+	protected void restoreState(Object[] data) {
+		window.restoreState(data);
+		window.restoreState((Object[]) data[0]);
+		window.reSchedule();
+	}
+
+	@Override
+	protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
+	                    AbstractDefinition streamDefinition, String elementId, boolean async,
+	                    SiddhiContext siddhiContext) {
+
+		if (parameters[0] instanceof IntConstant) {
+			timeToKeep = ((IntConstant) parameters[0]).getValue();
+		} else {
+			timeToKeep = ((LongConstant) parameters[0]).getValue();
+		}
+
+		String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
+		memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
+
+		if (this.siddhiContext.isDistributedProcessingEnabled()) {
+			window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+		} else {
+			window = new SchedulerSiddhiQueue<StreamEvent>(this);
+		}
+		MemberFaultEventMap
+				.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
+
+		ExecutorService executorService = StratosThreadPool.getExecutorService(IDENTIFIER, 10);
+		cepTopologyEventReceiver.setExecutorService(executorService);
+		executorService.execute(cepTopologyEventReceiver);
+
+		//Ordinary scheduling
+		window.schedule();
+		if (log.isDebugEnabled()) {
+			log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
+			          ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
+			          ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+		}
+	}
+
+	@Override
+	public void schedule() {
+		faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public void scheduleNow() {
+		faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+	}
+
+	@Override
+	public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+		this.faultHandleScheduler = scheduledExecutorService;
+	}
+
+	@Override
+	public void setThreadBarrier(ThreadBarrier threadBarrier) {
+		this.threadBarrier = threadBarrier;
+	}
+
+	@Override
+	public void destroy() {
+		// terminate topology listener thread
+		cepTopologyEventReceiver.terminate();
+		window = null;
+	}
+
+	public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+		return memberTimeStampMap;
+	}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index 013aee9..7996672 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -31,32 +31,34 @@ import java.util.concurrent.ExecutorService;
  * HAProxy extension main class.
  */
 public class Main {
-    private static final Log log = LogFactory.getLog(Main.class);
+	private static final Log log = LogFactory.getLog(Main.class);
 	private static ExecutorService executorService;
 
 	public static void main(String[] args) {
 
-        LoadBalancerExtension extension = null;
-        try {
-            // Configure log4j properties
-            PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
+		LoadBalancerExtension extension = null;
+		try {
+			// Configure log4j properties
+			PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
 
-            if (log.isInfoEnabled()) {
-                log.info("HAProxy extension started");
-            }
-	        executorService = StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
-            // Validate runtime parameters
-            HAProxyContext.getInstance().validate();
-            extension = new LoadBalancerExtension(new HAProxy(), (HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? new HAProxyStatisticsReader() : null));
-            Thread thread = new Thread(extension);
-            thread.start();
-        } catch (Exception e) {
-            if (log.isErrorEnabled()) {
-                log.error(e);
-            }
-            if (extension != null) {
-                extension.terminate();
-            }
-        }
-    }
+			if (log.isInfoEnabled()) {
+				log.info("HAProxy extension started");
+			}
+			executorService = StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
+			// Validate runtime parameters
+			HAProxyContext.getInstance().validate();
+			extension = new LoadBalancerExtension(new HAProxy(),
+			                                      (HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
+			                                       new HAProxyStatisticsReader() : null));
+			Thread thread = new Thread(extension);
+			thread.start();
+		} catch (Exception e) {
+			if (log.isErrorEnabled()) {
+				log.error(e);
+			}
+			if (extension != null) {
+				extension.terminate();
+			}
+		}
+	}
 }