You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:45 UTC

[8/9] stratos git commit: merge with new changes

merge with new changes


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f70aa9ed
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f70aa9ed
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f70aa9ed

Branch: refs/heads/master
Commit: f70aa9edc4d0e32d3b726b99eca217dfd4d3a475
Parents: 4e73393
Author: gayan <ga...@puppet.gayan.org>
Authored: Tue Dec 2 17:42:21 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 17:42:21 2014 +0530

----------------------------------------------------------------------
 .../AutoscalerTopologyEventReceiver.java        | 498 +------------------
 .../internal/AutoscalerServerComponent.java     | 230 ++-------
 .../CloudControllerServiceComponent.java        | 143 +-----
 .../internal/LoadBalancerServiceComponent.java  |  29 +-
 .../extension/FaultHandlingWindowProcessor.java | 217 --------
 5 files changed, 87 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1f14542..f4a5169 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -27,8 +27,11 @@ import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
 import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
 import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
 import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
+import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
+import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException;
 import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
 import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
+import org.apache.stratos.autoscaler.monitor.MonitorFactory;
 import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
 import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
 import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
@@ -63,34 +66,22 @@ public class AutoscalerTopologyEventReceiver {
 	private boolean topologyInitialized;
 	private ExecutorService executorService;
 
-<<<<<<< HEAD
-    public AutoscalerTopologyEventReceiver() {
-        this.topologyEventReceiver = new TopologyEventReceiver();
-        addEventListeners();
-    }
-
+	public AutoscalerTopologyEventReceiver() {
+		this.topologyEventReceiver = new TopologyEventReceiver();
+		addEventListeners();
+	}
 
-    public void execute() {
-        //FIXME this activated before autoscaler deployer activated.
+	public void execute() {
+		//FIXME this activated before autoscaler deployer activated.
 
-	    topologyEventReceiver.setExecutorService(getExecutorService());
-	    topologyEventReceiver.execute();
+		topologyEventReceiver.setExecutorService(getExecutorService());
+		topologyEventReceiver.execute();
 
-	    if (log.isInfoEnabled()) {
-            log.info("Autoscaler topology receiver thread started");
-        }
+		if (log.isInfoEnabled()) {
+			log.info("Autoscaler topology receiver thread started");
+		}
 
-        // Keep the thread live until terminated
-        while (!terminated) {
-            try {
-                Thread.sleep(1000);
-            } catch (InterruptedException ignore) {
-            }
-        }
-        if (log.isInfoEnabled()) {
-            log.info("Autoscaler topology receiver thread terminated");
-        }
-    }
+	}
 
     private boolean allClustersInitialized(Application application) {
         boolean allClustersInitialized = false;
@@ -524,461 +515,6 @@ public class AutoscalerTopologyEventReceiver {
         topologyEventReceiver.terminate();
         terminated = true;
     }
-=======
-	public AutoscalerTopologyEventReceiver() {
-		this.topologyEventReceiver = new TopologyEventReceiver();
-		addEventListeners();
-	}
-
-	public void execute() {
-		//FIXME this activated before autoscaler deployer activated.
-
-		topologyEventReceiver.setExecutorService(executorService);
-		topologyEventReceiver.execute();
-
-		if (log.isInfoEnabled()) {
-			log.info("Autoscaler topology receiver thread started");
-		}
-
-	}
-
-	private boolean allClustersInitialized(Application application) {
-		boolean allClustersInitialized = false;
-		for (ClusterDataHolder holder : application.getClusterDataRecursively()) {
-			TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
-			                                          holder.getClusterId());
-
-			try {
-				Topology topology = TopologyManager.getTopology();
-				if (topology != null) {
-					Service service = topology.getService(holder.getServiceType());
-					if (service != null) {
-						if (service.clusterExists(holder.getClusterId())) {
-							allClustersInitialized = true;
-							return allClustersInitialized;
-						} else {
-							if (log.isDebugEnabled()) {
-								log.debug("[Cluster] " + holder.getClusterId() + " is not found in " +
-								          "the Topology");
-							}
-							allClustersInitialized = false;
-						}
-					} else {
-						if (log.isDebugEnabled()) {
-							log.debug("Service is null in the CompleteTopologyEvent");
-						}
-					}
-				} else {
-					if (log.isDebugEnabled()) {
-						log.debug("Topology is null in the CompleteTopologyEvent");
-					}
-				}
-			} finally {
-				TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
-				                                          holder.getClusterId());
-			}
-		}
-		return allClustersInitialized;
-	}
-
-	private void addEventListeners() {
-		// Listen to topology events that affect clusters
-		topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				if (!topologyInitialized) {
-					log.info("[CompleteTopologyEvent] Received: " + event.getClass());
-					ApplicationHolder.acquireReadLock();
-					try {
-						Applications applications = ApplicationHolder.getApplications();
-						if (applications != null) {
-							for (Application application : applications.getApplications().values()) {
-								if (allClustersInitialized(application)) {
-									startApplicationMonitor(application.getUniqueIdentifier());
-								} else {
-									log.error("Complete Topology is not consistent with the applications " +
-									          "which got persisted");
-								}
-							}
-							topologyInitialized = true;
-						} else {
-							log.info("No applications found in the complete topology");
-						}
-					} catch (Exception e) {
-						log.error("Error processing event", e);
-					} finally {
-						ApplicationHolder.releaseReadLock();
-					}
-				}
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				try {
-					log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass());
-					ApplicationClustersCreatedEvent applicationClustersCreatedEvent =
-							(ApplicationClustersCreatedEvent) event;
-					String appId = applicationClustersCreatedEvent.getAppId();
-					try {
-						//acquire read lock
-						ApplicationHolder.acquireReadLock();
-						//start the application monitor
-						startApplicationMonitor(appId);
-					} catch (Exception e) {
-						String msg = "Error processing event " + e.getLocalizedMessage();
-						log.error(msg, e);
-					} finally {
-						//release read lock
-						ApplicationHolder.releaseReadLock();
-
-					}
-				} catch (ClassCastException e) {
-					String msg = "Error while casting the event " + e.getLocalizedMessage();
-					log.error(msg, e);
-				}
-
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				log.info("[ClusterActivatedEvent] Received: " + event.getClass());
-				ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event;
-				String clusterId = clusterActivatedEvent.getClusterId();
-				AutoscalerContext asCtx = AutoscalerContext.getInstance();
-				AbstractClusterMonitor monitor;
-				monitor = asCtx.getClusterMonitor(clusterId);
-				if (null == monitor) {
-					if (log.isDebugEnabled()) {
-						log.debug(String.format("A cluster monitor is not found in autoscaler context "
-						                        + "[cluster] %s", clusterId));
-					}
-					return;
-				}
-				//changing the status in the monitor, will notify its parent monitor
-
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ClusterResetEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				log.info("[ClusterCreatedEvent] Received: " + event.getClass());
-				ClusterResetEvent clusterResetEvent = (ClusterResetEvent) event;
-				String clusterId = clusterResetEvent.getClusterId();
-				AutoscalerContext asCtx = AutoscalerContext.getInstance();
-				AbstractClusterMonitor monitor;
-				monitor = asCtx.getClusterMonitor(clusterId);
-				if (null == monitor) {
-					if (log.isDebugEnabled()) {
-						log.debug(String.format("A cluster monitor is not found in autoscaler context "
-						                        + "[cluster] %s", clusterId));
-					}
-					return;
-				}
-				//changing the status in the monitor, will notify its parent monitor
-				monitor.destroy();
-				monitor.setStatus(ClusterStatus.Created);
-
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				log.info("[ClusterCreatedEvent] Received: " + event.getClass());
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				log.info("[ClusterInActivateEvent] Received: " + event.getClass());
-				ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event;
-				String clusterId = clusterInactivateEvent.getClusterId();
-				AutoscalerContext asCtx = AutoscalerContext.getInstance();
-				AbstractClusterMonitor monitor;
-				monitor = asCtx.getClusterMonitor(clusterId);
-				if (null == monitor) {
-					if (log.isDebugEnabled()) {
-						log.debug(String.format("A cluster monitor is not found in autoscaler context "
-						                        + "[cluster] %s", clusterId));
-					}
-					return;
-				}
-				//changing the status in the monitor, will notify its parent monitor
-				monitor.setStatus(ClusterStatus.Inactive);
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				log.info("[ClusterTerminatingEvent] Received: " + event.getClass());
-				ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event;
-				String clusterId = clusterTerminatingEvent.getClusterId();
-				String instanceId = clusterTerminatingEvent.getInstanceId();
-				AutoscalerContext asCtx = AutoscalerContext.getInstance();
-				AbstractClusterMonitor monitor;
-				monitor = asCtx.getClusterMonitor(clusterId);
-				if (null == monitor) {
-					if (log.isDebugEnabled()) {
-						log.debug(String.format("A cluster monitor is not found in autoscaler context "
-						                        + "[cluster] %s", clusterId));
-					}
-					// if monitor does not exist, send cluster terminated event
-					ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
-					                                                       clusterTerminatingEvent.getServiceName(),
-					                                                       clusterId, instanceId);
-					return;
-				}
-				//changing the status in the monitor, will notify its parent monitor
-				if (monitor.getStatus() == ClusterStatus.Active) {
-					// terminated gracefully
-					monitor.setStatus(ClusterStatus.Terminating);
-					InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
-				} else {
-					monitor.setStatus(ClusterStatus.Terminating);
-					monitor.terminateAllMembers();
-				}
-				ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
-						process("", clusterId, instanceId);
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				log.info("[ClusterTerminatedEvent] Received: " + event.getClass());
-				ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event;
-				String clusterId = clusterTerminatedEvent.getClusterId();
-				AutoscalerContext asCtx = AutoscalerContext.getInstance();
-				AbstractClusterMonitor monitor;
-				monitor = asCtx.getClusterMonitor(clusterId);
-				if (null == monitor) {
-					if (log.isDebugEnabled()) {
-						log.debug(String.format("A cluster monitor is not found in autoscaler context "
-						                        + "[cluster] %s", clusterId));
-					}
-					// if the cluster monitor is null, assume that its termianted
-					ApplicationMonitor appMonitor =
-							AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
-					if (appMonitor != null) {
-						appMonitor
-								.onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null));
-					}
-					return;
-				}
-				//changing the status in the monitor, will notify its parent monitor
-				monitor.setStatus(ClusterStatus.Terminated);
-				//Destroying and Removing the Cluster monitor
-				monitor.destroy();
-				AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				try {
-					MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
-					String clusterId = memberReadyToShutdownEvent.getClusterId();
-					AutoscalerContext asCtx = AutoscalerContext.getInstance();
-					AbstractClusterMonitor monitor;
-					monitor = asCtx.getClusterMonitor(clusterId);
-					if (null == monitor) {
-						if (log.isDebugEnabled()) {
-							log.debug(String.format("A cluster monitor is not found in autoscaler context "
-							                        + "[cluster] %s", clusterId));
-						}
-						return;
-					}
-					monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
-				} catch (Exception e) {
-					String msg = "Error processing event " + e.getLocalizedMessage();
-					log.error(msg, e);
-				}
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				try {
-					MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
-					String clusterId = memberTerminatedEvent.getClusterId();
-					AbstractClusterMonitor monitor;
-					AutoscalerContext asCtx = AutoscalerContext.getInstance();
-					monitor = asCtx.getClusterMonitor(clusterId);
-					if (null == monitor) {
-						if (log.isDebugEnabled()) {
-							log.debug(String.format("A cluster monitor is not found in autoscaler context "
-							                        + "[cluster] %s", clusterId));
-						}
-						return;
-					}
-					monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
-				} catch (Exception e) {
-					String msg = "Error processing event " + e.getLocalizedMessage();
-					log.error(msg, e);
-				}
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				try {
-					MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
-					String clusterId = memberActivatedEvent.getClusterId();
-					AbstractClusterMonitor monitor;
-					AutoscalerContext asCtx = AutoscalerContext.getInstance();
-					monitor = asCtx.getClusterMonitor(clusterId);
-					if (null == monitor) {
-						if (log.isDebugEnabled()) {
-							log.debug(String.format("A cluster monitor is not found in autoscaler context "
-							                        + "[cluster] %s", clusterId));
-						}
-						return;
-					}
-					monitor.handleMemberActivatedEvent(memberActivatedEvent);
-				} catch (Exception e) {
-					String msg = "Error processing event " + e.getLocalizedMessage();
-					log.error(msg, e);
-				}
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
-			@Override
-			protected void onEvent(Event event) {
-				try {
-					MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
-					String clusterId = maintenanceModeEvent.getClusterId();
-					AbstractClusterMonitor monitor;
-					AutoscalerContext asCtx = AutoscalerContext.getInstance();
-					monitor = asCtx.getClusterMonitor(clusterId);
-					if (null == monitor) {
-						if (log.isDebugEnabled()) {
-							log.debug(String.format("A cluster monitor is not found in autoscaler context "
-							                        + "[cluster] %s", clusterId));
-						}
-						return;
-					}
-					monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
-				} catch (Exception e) {
-					String msg = "Error processing event " + e.getLocalizedMessage();
-					log.error(msg, e);
-				}
-			}
-		});
-
-		topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-
-				ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
-						(ClusterInstanceCreatedEvent) event;
-				AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
-						getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
-
-				if (clusterMonitor != null) {
-					TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-					                                          clusterInstanceCreatedEvent.getClusterId());
-
-					try {
-						Service service = TopologyManager.getTopology().
-								getService(clusterInstanceCreatedEvent.getServiceName());
-
-						if (service != null) {
-							Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
-							if (cluster != null) {
-								// create and add Cluster Context
-								try {
-									if (cluster.isKubernetesCluster()) {
-										clusterMonitor.addClusterContextForInstance(
-												clusterInstanceCreatedEvent.getInstanceId(),
-												ClusterContextFactory.getKubernetesClusterContext(cluster));
-									} else if (cluster.isLbCluster()) {
-										clusterMonitor.addClusterContextForInstance(
-												clusterInstanceCreatedEvent.getInstanceId(),
-												ClusterContextFactory.getVMLBClusterContext(cluster));
-									} else {
-										clusterMonitor.addClusterContextForInstance(
-												clusterInstanceCreatedEvent.getInstanceId(),
-												ClusterContextFactory.getVMServiceClusterContext(cluster));
-									}
-
-									if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
-										clusterMonitor.startScheduler();
-										log.info("Monitoring task for Cluster Monitor with cluster id " +
-										         clusterInstanceCreatedEvent.getClusterId() + " started successfully");
-									}
-
-								} catch (PolicyValidationException e) {
-									log.error(e.getMessage(), e);
-								} catch (PartitionValidationException e) {
-									log.error(e.getMessage(), e);
-								}
-
-							} else {
-								log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() +
-								          ", no cluster instance added to ClusterMonitor " +
-								          clusterInstanceCreatedEvent.getClusterId());
-							}
-						} else {
-							log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
-							          " not found, no cluster instance added to ClusterMonitor " +
-							          clusterInstanceCreatedEvent.getClusterId());
-						}
-
-					} finally {
-						TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
-						                                          clusterInstanceCreatedEvent.getClusterId());
-					}
-
-				} else {
-					log.error("No Cluster Monitor found for cluster id " +
-					          clusterInstanceCreatedEvent.getClusterId());
-				}
-			}
-		});
-	}
-
-	/**
-	 * Terminate load balancer topology receiver thread.
-	 */
-	public void terminate() {
-		topologyEventReceiver.terminate();
-		terminated = true;
-	}
-
-	protected synchronized void startApplicationMonitor(String applicationId) {
-		Thread th = null;
-		if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
-			th = new Thread(new ApplicationMonitorAdder(applicationId));
-		}
-		if (th != null) {
-			th.start();
-		} else {
-			if (log.isDebugEnabled()) {
-				log.debug(String
-						          .format("Application monitor thread already exists: " +
-						                  "[application] %s ", applicationId));
-			}
-		}
-	}
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 
 	public ExecutorService getExecutorService() {
 		return executorService;
@@ -987,8 +523,6 @@ public class AutoscalerTopologyEventReceiver {
 	public void setExecutorService(ExecutorService executorService) {
 		this.executorService = executorService;
 	}
-<<<<<<< HEAD
-=======
 
 	private class ApplicationMonitorAdder implements Runnable {
 		private String appId;
@@ -1047,5 +581,5 @@ public class AutoscalerTopologyEventReceiver {
 			}
 		}
 	}
->>>>>>> ddf277b... Remove unnessary threads in messaging model
+
 }

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index bb5e167..91d52d3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -39,6 +39,7 @@ import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
 import org.apache.stratos.cloud.controller.stub.domain.Partition;
 import org.apache.stratos.common.kubernetes.KubernetesGroup;
 import org.apache.stratos.common.threading.StratosThreadPool;
+import org.drools.reteoo.PartitionManager;
 import org.osgi.service.component.ComponentContext;
 import org.wso2.carbon.ntask.core.service.TaskService;
 import org.wso2.carbon.registry.api.RegistryException;
@@ -75,28 +76,27 @@ public class AutoscalerServerComponent {
 
 
 	protected void activate(ComponentContext componentContext) throws Exception {
-<<<<<<< HEAD
-        try {
-            // Start topology receiver
-	        XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
-	        int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
-	        String threadIdentifier=conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
-	        ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
-            asTopologyReceiver = new AutoscalerTopologyEventReceiver();
-	        asTopologyReceiver.setExecutorService(executorService);
-	        asTopologyReceiver.execute();
+		try {
+			// Start topology receiver
+			XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+			int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
+			String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+			ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
+			asTopologyReceiver = new AutoscalerTopologyEventReceiver();
+			asTopologyReceiver.setExecutorService(executorService);
+			asTopologyReceiver.execute();
 
-            if (log.isDebugEnabled()) {
-                log.debug("Topology receiver executor service started");
-            }
+			if (log.isDebugEnabled()) {
+				log.debug("Topology receiver executor service started");
+			}
 
-            // Start health stat receiver
-            autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
-            Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
-            healthDelegatorThread.start();
-            if (log.isDebugEnabled()) {
-                log.debug("Health statistics receiver thread started");
-            }
+			// Start health stat receiver
+			autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
+			autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+			autoscalerHealthStatEventReceiver.execute();
+			if (log.isDebugEnabled()) {
+				log.debug("Health statistics receiver thread started");
+			}
 
             // Adding the registry stored partitions to the information model
             List<Partition> partitions = RegistryManager.getInstance().retrievePartitions();
@@ -105,7 +105,7 @@ public class AutoscalerServerComponent {
                 Partition partition = partitionIterator.next();
 //                PartitionManager.getInstance().addPartitionToInformationModel(partition);
             }
-            
+
             // Adding the network partitions stored in registry to the information model
 //            List<NetworkPartitionLbHolder> nwPartitionHolders = RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
 //            Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
@@ -113,7 +113,7 @@ public class AutoscalerServerComponent {
 //                NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
 //                PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
 //            }
-            
+
             List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
             Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
             while (asPolicyIterator.hasNext()) {
@@ -121,43 +121,43 @@ public class AutoscalerServerComponent {
                 PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
             }
 
-            List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
-            Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
-            while (depPolicyIterator.hasNext()) {
-                DeploymentPolicy depPolicy = depPolicyIterator.next();
-                PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
-            }
+			List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
+			Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
+			while (depPolicyIterator.hasNext()) {
+				DeploymentPolicy depPolicy = depPolicyIterator.next();
+				PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
+			}
 
-            // Adding KubernetesGroups stored in registry to the information model
-            List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
-            Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
-            while (kubernetesGroupIterator.hasNext()) {
-                KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
-                KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
-            }
+			// Adding KubernetesGroups stored in registry to the information model
+			List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
+			Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
+			while (kubernetesGroupIterator.hasNext()) {
+				KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
+				KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
+			}
 
-            //starting the processor chain
-            ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
-            ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
+			//starting the processor chain
+			ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
+			ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
 
-            GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
-            ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
+			GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
+			ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
 
-            if (log.isInfoEnabled()) {
-                log.info("Scheduling tasks to publish applications");
-            }
+			if (log.isInfoEnabled()) {
+				log.info("Scheduling tasks to publish applications");
+			}
 
-            ApplicationSynchronizerTaskScheduler
-                    .schedule(ServiceReferenceHolder.getInstance()
-                            .getTaskService());
+			ApplicationSynchronizerTaskScheduler
+					.schedule(ServiceReferenceHolder.getInstance()
+					                                .getTaskService());
 
-            if (log.isInfoEnabled()) {
-                log.info("Autoscaler server Component activated");
-            }
-        } catch (Throwable e) {
-            log.error("Error in activating the autoscaler component ", e);
-        }
-    }
+			if (log.isInfoEnabled()) {
+				log.info("Autoscaler server Component activated");
+			}
+		} catch (Throwable e) {
+			log.error("Error in activating the autoscaler component ", e);
+		}
+	}
 
     protected void deactivate(ComponentContext context) {
         asTopologyReceiver.terminate();
@@ -198,128 +198,4 @@ public class AutoscalerServerComponent {
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
 }
-=======
-		try {
-			// Start topology receiver
-			XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
-			int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
-			String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
-			ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
-			asTopologyReceiver = new AutoscalerTopologyEventReceiver();
-			asTopologyReceiver.setExecutorService(executorService);
-			asTopologyReceiver.execute();
-
-			if (log.isDebugEnabled()) {
-				log.debug("Topology receiver executor service started");
-			}
-
-			// Start health stat receiver
-			autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
-			autoscalerHealthStatEventReceiver.setExecutorService(executorService);
-			autoscalerHealthStatEventReceiver.execute();
-			if (log.isDebugEnabled()) {
-				log.debug("Health statistics receiver thread started");
-			}
-
-			// Adding the registry stored partitions to the information model
-			List<Partition> partitions = RegistryManager.getInstance().retrievePartitions();
-			Iterator<Partition> partitionIterator = partitions.iterator();
-			while (partitionIterator.hasNext()) {
-				Partition partition = partitionIterator.next();
-				PartitionManager.getInstance().addPartitionToInformationModel(partition);
-			}
-
-			// Adding the network partitions stored in registry to the information model
-			List<NetworkPartitionLbHolder> nwPartitionHolders =
-					RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
-			Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
-			while (nwPartitionIterator.hasNext()) {
-				NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
-				PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
-			}
-
-			List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
-			Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
-			while (asPolicyIterator.hasNext()) {
-				AutoscalePolicy asPolicy = asPolicyIterator.next();
-				PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
-			}
-
-			List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
-			Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
-			while (depPolicyIterator.hasNext()) {
-				DeploymentPolicy depPolicy = depPolicyIterator.next();
-				PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
-			}
-
-			// Adding KubernetesGroups stored in registry to the information model
-			List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
-			Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
-			while (kubernetesGroupIterator.hasNext()) {
-				KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
-				KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
-			}
-
-			//starting the processor chain
-			ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
-			ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
 
-			GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
-			ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
-
-			if (log.isInfoEnabled()) {
-				log.info("Scheduling tasks to publish applications");
-			}
-
-			ApplicationSynchronizerTaskScheduler
-					.schedule(ServiceReferenceHolder.getInstance()
-					                                .getTaskService());
-
-			if (log.isInfoEnabled()) {
-				log.info("Autoscaler server Component activated");
-			}
-		} catch (Throwable e) {
-			log.error("Error in activating the autoscaler component ", e);
-		}
-	}
-
-	protected void deactivate(ComponentContext context) {
-		asTopologyReceiver.terminate();
-		autoscalerHealthStatEventReceiver.terminate();
-	}
-
-	protected void setRegistryService(RegistryService registryService) {
-		if (log.isDebugEnabled()) {
-			log.debug("Setting the Registry Service");
-		}
-		try {
-			ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry());
-		} catch (RegistryException e) {
-			String msg = "Failed when retrieving Governance System Registry.";
-			log.error(msg, e);
-			throw new AutoScalerException(msg, e);
-		}
-	}
-
-	protected void unsetRegistryService(RegistryService registryService) {
-		if (log.isDebugEnabled()) {
-			log.debug("Un-setting the Registry Service");
-		}
-		ServiceReferenceHolder.getInstance().setRegistry(null);
-	}
-
-	protected void setTaskService(TaskService taskService) {
-		if (log.isDebugEnabled()) {
-			log.debug("Setting the Task Service");
-		}
-		ServiceReferenceHolder.getInstance().setTaskService(taskService);
-	}
-
-	protected void unsetTaskService(TaskService taskService) {
-		if (log.isDebugEnabled()) {
-			log.debug("Un-setting the Task Service");
-		}
-		ServiceReferenceHolder.getInstance().setTaskService(null);
-	}
-}
->>>>>>> ddf277b... Remove unnessary threads in messaging model

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index a413218..700efb9 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,41 +20,23 @@ package org.apache.stratos.cloud.controller.internal;
  *
 */
 
-<<<<<<< HEAD
-<<<<<<< HEAD
+import org.apache.commons.configuration.XMLConfiguration;
 
-<<<<<<< HEAD
 import com.hazelcast.core.HazelcastInstance;
 
-=======
->>>>>>> ddf277b... Remove unnessary threads in messaging model
-=======
-
->>>>>>> ad3e45c... Remove unnessary threads in messaging model
-=======
-import org.apache.commons.configuration.XMLConfiguration;
->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.stratos.cloud.controller.context.CloudControllerContext;
 import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
 import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
 import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-<<<<<<< HEAD
 import org.apache.stratos.cloud.controller.services.CloudControllerService;
 import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
 import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
 import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
 import org.apache.stratos.common.clustering.DistributedObjectProvider;
-=======
-import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
-import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
-import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
-import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver;
-import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
 import org.apache.stratos.common.threading.StratosThreadPool;
 import org.apache.stratos.common.util.ConfUtil;
->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
 import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
 import org.apache.stratos.messaging.util.Util;
 import org.osgi.framework.BundleContext;
@@ -72,7 +54,6 @@ import java.util.concurrent.ExecutorService;
  * Registering Cloud Controller Service.
  *
  * @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
-<<<<<<< HEAD
  * @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
  *                cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
  * @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -81,24 +62,6 @@ import java.util.concurrent.ExecutorService;
  *                cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
  * @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService"
  *                cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
-=======
- * @scr.reference name="distributedMapProvider" interface="org.wso2.carbon.caching.impl.DistributedMapProvider"
- *                cardinality="1..1" policy="dynamic" bind="setDistributedMapProvider" unbind="unsetDistributedMapProvider"
- * @scr.reference name="ntask.component"
- *                interface="org.wso2.carbon.ntask.core.service.TaskService"
- *                cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
- * @scr.reference name="registry.service"
- *                interface="org.wso2.carbon.registry.core.service.RegistryService"
- *                cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
- * @scr.reference name="config.context.service"
-<<<<<<< HEAD
- * interface="org.wso2.carbon.utils.ConfigurationContextService"
- * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
->>>>>>> ddf277b... Remove unnessary threads in messaging model
-=======
- *                interface="org.wso2.carbon.utils.ConfigurationContextService"
- *                cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
->>>>>>> ad3e45c... Remove unnessary threads in messaging model
  */
 public class CloudControllerServiceComponent {
 
@@ -107,8 +70,8 @@ public class CloudControllerServiceComponent {
 	private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
 	private ApplicationTopicReceiver applicationTopicReceiver;
 	private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
-	private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
-	private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
+	private static final String DEFAULT_IDENTIFIER = "Cloud-Controller";
+	private static final String THREAD_POOL_SIZE_KEY = "threadPool.cloudcontroller.threadPoolSize";
 	private static final String COMPONENTS_CONFIG = "stratos-config";
 	private static final int THREAD_POOL_SIZE = 10;
 
@@ -127,28 +90,16 @@ public class CloudControllerServiceComponent {
 				log.info("Application Receiver thread started");
 			}
 
-<<<<<<< HEAD
-            if (log.isInfoEnabled()) {
-                log.info("Application event receiver thread started");
-            }
-=======
 			clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
 			clusterStatusTopicReceiver.setExecutorService(executorService);
 			clusterStatusTopicReceiver.execute();
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 
 			if (log.isInfoEnabled()) {
 				log.info("Cluster status Receiver thread started");
 			}
 
-<<<<<<< HEAD
-            if (log.isInfoEnabled()) {
-                log.info("Cluster status receiver thread started");
-            }
-=======
 			instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
 			instanceStatusTopicReceiver.execute();
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 
 			if (log.isInfoEnabled()) {
 				log.info("Instance status message receiver thread started");
@@ -163,23 +114,15 @@ public class CloudControllerServiceComponent {
 				log.info("Scheduling tasks");
 			}
 
-<<<<<<< HEAD
-            if(log.isInfoEnabled()) {
-                log.info("Scheduling tasks");
-            }
+			TopologySynchronizerTaskScheduler
+					.schedule(ServiceReferenceHolder.getInstance()
+					                                .getTaskService());
 
-            if ((!CloudControllerContext.getInstance().isClustered()) ||
-                    (CloudControllerContext.getInstance().isCoordinator())) {
-                TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
-                if(log.isInfoEnabled()) {
-                    log.info("Topology synchronizer task scheduled");
-                }
-            }
-        } catch (Throwable e) {
-            log.error("**** Cloud controller service bundle is failed to activate ****", e);
+		} catch (Throwable e) {
+			log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
         }
     }
-    
+
     protected void setTaskService(TaskService taskService) {
         if (log.isDebugEnabled()) {
             log.debug("Setting the Task Service");
@@ -193,32 +136,7 @@ public class CloudControllerServiceComponent {
         }
         ServiceReferenceHolder.getInstance().setTaskService(null);
     }
-    
-=======
-			TopologySynchronizerTaskScheduler
-					.schedule(ServiceReferenceHolder.getInstance()
-					                                .getTaskService());
-
-		} catch (Throwable e) {
-			log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
-		}
-	}
-
-	protected void setTaskService(TaskService taskService) {
-		if (log.isDebugEnabled()) {
-			log.debug("Setting the Task Service");
-		}
-		ServiceReferenceHolder.getInstance().setTaskService(taskService);
-	}
 
-	protected void unsetTaskService(TaskService taskService) {
-		if (log.isDebugEnabled()) {
-			log.debug("Unsetting the Task Service");
-		}
-		ServiceReferenceHolder.getInstance().setTaskService(null);
-	}
-
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 	protected void setRegistryService(RegistryService registryService) {
 		if (log.isDebugEnabled()) {
 			log.debug("Setting the Registry Service");
@@ -226,39 +144,22 @@ public class CloudControllerServiceComponent {
 
 		try {
 			UserRegistry registry = registryService.getGovernanceSystemRegistry();
-<<<<<<< HEAD
 	        ServiceReferenceHolder.getInstance().setRegistry(registry);
         } catch (RegistryException e) {
         	String msg = "Failed when retrieving Governance System Registry.";
         	log.error(msg, e);
         	throw new CloudControllerException(msg, e);
-        } 
-=======
-			ServiceReferenceHolder.getInstance()
-			                      .setRegistry(registry);
-		} catch (RegistryException e) {
-			String msg = "Failed when retrieving Governance System Registry.";
-			log.error(msg, e);
-			throw new CloudControllerException(msg, e);
-		}
->>>>>>> ddf277b... Remove unnessary threads in messaging model
+        }
 	}
 
 	protected void unsetRegistryService(RegistryService registryService) {
 		if (log.isDebugEnabled()) {
-<<<<<<< HEAD
             log.debug("Un-setting the Registry Service");
         }
         ServiceReferenceHolder.getInstance().setRegistry(null);
-=======
-			log.debug("Unsetting the Registry Service");
-		}
-		ServiceReferenceHolder.getInstance().setRegistry(null);
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 	}
 
 	protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
-<<<<<<< HEAD
         ServiceReferenceHolder.getInstance().setAxisConfiguration(
                 cfgCtxService.getServerConfigContext().getAxisConfiguration());
     }
@@ -274,27 +175,9 @@ public class CloudControllerServiceComponent {
     protected void unsetDistributedObjectProvider(DistributedObjectProvider distributedObjectProvider) {
         ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null);
     }
-	
-=======
-		ServiceReferenceHolder.getInstance().setAxisConfiguration(
-				cfgCtxService.getServerConfigContext().getAxisConfiguration());
-	}
-
-	protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
-		ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
-	}
-
-	protected void setDistributedMapProvider(DistributedMapProvider mapProvider) {
-		ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider);
-	}
-
-	protected void unsetDistributedMapProvider(DistributedMapProvider mapProvider) {
-		ServiceReferenceHolder.getInstance().setDistributedMapProvider(null);
-	}
 
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 	protected void deactivate(ComponentContext ctx) {
-		// Close event publisher connections to message broker
-		EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
+        // Close event publisher connections to message broker
+        EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
 	}
-}
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 3aa77a8..509bc74 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -120,30 +120,13 @@ public class LoadBalancerServiceComponent {
             TopologyFilterConfigurator.configure(configuration);
 
             if (configuration.isMultiTenancyEnabled()) {
-<<<<<<< HEAD
                 // Start tenant event receiver
                 startTenantEventReceiver();
-=======
-
-                tenantReceiver = new LoadBalancerTenantEventReceiver();
-				tenantReceiver.execute();
-
-                if (log.isInfoEnabled()) {
-                    log.info("Tenant receiver thread started");
-                }
->>>>>>> ae876c1... Remove unnessary threads in messaging model
             }
 
             if (configuration.isTopologyEventListenerEnabled()) {
                 // Start topology receiver
-<<<<<<< HEAD
                 startTopologyEventReceiver();
-=======
-                topologyReceiver = new LoadBalancerTopologyEventReceiver();
-                topologyReceiver.execute();
-                if (log.isInfoEnabled()) {
-                    log.info("Topology receiver thread started");
-                }
 
                 if (log.isInfoEnabled()) {
                     if (TopologyServiceFilter.getInstance().isActive()) {
@@ -177,7 +160,7 @@ public class LoadBalancerServiceComponent {
                         log.info(String.format("Member filter activated: [lb-cluster-ids] %s", sb.toString()));
                     }
                 }
->>>>>>> ae876c1... Remove unnessary threads in messaging model
+
             }
 
             if(configuration.isCepStatsPublisherEnabled()) {
@@ -197,18 +180,16 @@ public class LoadBalancerServiceComponent {
     }
 
     private void startTenantEventReceiver() {
-        tenantReceiver = new LoadBalancerTenantEventReceiver();
-        Thread tenantReceiverThread = new Thread(tenantReceiver);
-        tenantReceiverThread.start();
+	    tenantReceiver = new LoadBalancerTenantEventReceiver();
+	    tenantReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Tenant receiver thread started");
         }
     }
 
     private void startTopologyEventReceiver() {
-        topologyReceiver = new LoadBalancerTopologyEventReceiver();
-        Thread topologyReceiverThread = new Thread(topologyReceiver);
-        topologyReceiverThread.start();
+	    topologyReceiver = new LoadBalancerTopologyEventReceiver();
+	    topologyReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Topology receiver thread started");
         }

http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 5ee28d1..8bfcb2c 100644
--- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -107,7 +107,6 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
 			log.debug("Member not found in the toplogy. Event rejected");
 			return;
 		}
-<<<<<<< HEAD
         if (StringUtils.isNotEmpty(id)) {
             memberTimeStampMap.put(id, event.getTimeStamp());
         } else {
@@ -320,220 +319,4 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
     public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
         return memberTimeStampMap;
     }
-=======
-		if (StringUtils.isNotEmpty(id)) {
-			memberTimeStampMap.put(id, event.getTimeStamp());
-		} else {
-			log.warn("NULL member id found in the event received. Event rejected.");
-		}
-		if (log.isDebugEnabled()) {
-			log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
-		}
-	}
-
-	@Override
-	public Iterator<StreamEvent> iterator() {
-		return window.iterator();
-	}
-
-	@Override
-	public Iterator<StreamEvent> iterator(String predicate) {
-		if (siddhiContext.isDistributedProcessingEnabled()) {
-			return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
-		} else {
-			return window.iterator();
-		}
-	}
-
-	/**
-	 * Retrieve the current activated members from the topology and initialize the time stamp map.
-	 * This will allow the system to recover from a restart
-	 *
-	 * @param topology Topology model object
-	 */
-	boolean loadTimeStampMapFromTopology(Topology topology) {
-
-		long currentTimeStamp = System.currentTimeMillis();
-		if (topology == null || topology.getServices() == null) {
-			return false;
-		}
-		// TODO make this efficient by adding APIs to messaging component
-		for (Service service : topology.getServices()) {
-			if (service.getClusters() != null) {
-				for (Cluster cluster : service.getClusters()) {
-					if (cluster.getMembers() != null) {
-						for (Member member : cluster.getMembers()) {
-							// we are checking faulty status only in previously activated members
-							if (member != null && MemberStatus.Activated.equals(member.getStatus())) {
-								// Initialize the member time stamp map from the topology at the beginning
-								memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
-							}
-						}
-					}
-				}
-			}
-		}
-
-		log.info("Member time stamp map was successfully loaded from the topology.");
-		if (log.isDebugEnabled()) {
-			log.debug("Member TimeStamp Map: " + memberTimeStampMap);
-		}
-		return true;
-	}
-
-	private Member getMemberFromId(String memberId) {
-		if (StringUtils.isEmpty(memberId)) {
-			return null;
-		}
-		if (TopologyManager.getTopology().isInitialized()) {
-			try {
-				TopologyManager.acquireReadLock();
-				if (TopologyManager.getTopology().getServices() == null) {
-					return null;
-				}
-				// TODO make this efficient by adding APIs to messaging component
-				for (Service service : TopologyManager.getTopology().getServices()) {
-					if (service.getClusters() != null) {
-						for (Cluster cluster : service.getClusters()) {
-							if (cluster.getMembers() != null) {
-								for (Member member : cluster.getMembers()) {
-									if (memberId.equals(member.getMemberId())) {
-										return member;
-									}
-								}
-							}
-						}
-					}
-				}
-			} catch (Exception e) {
-				log.error("Error while reading topology" + e);
-			} finally {
-				TopologyManager.releaseReadLock();
-			}
-		}
-		return null;
-	}
-
-	private void publishMemberFault(String memberId) {
-		Member member = getMemberFromId(memberId);
-		if (member == null) {
-			log.error("Failed to publish member fault event. Member having [member-id] " + memberId +
-			          " does not exist in topology");
-			return;
-		}
-		log.info("Publishing member fault event for [member-id] " + memberId);
-
-		MemberFaultEvent memberFaultEvent =
-				new MemberFaultEvent(member.getClusterId(), member.getInstanceId(), member.getMemberId(),
-				                     member.getPartitionId(), 0);
-
-		memberFaultEventMessageMap.put("message", memberFaultEvent);
-		healthStatPublisher.publish(MemberFaultEventMap, true);
-	}
-
-	@Override
-	public void run() {
-		try {
-			threadBarrier.pass();
-
-			for (Object o : memberTimeStampMap.entrySet()) {
-				Map.Entry pair = (Map.Entry) o;
-				long currentTime = System.currentTimeMillis();
-				Long eventTimeStamp = (Long) pair.getValue();
-
-				if ((currentTime - eventTimeStamp) > TIME_OUT) {
-					log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
-					         eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
-					publishMemberFault((String) pair.getKey());
-				}
-			}
-			if (log.isDebugEnabled()) {
-				log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
-				          memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
-			}
-		} catch (Throwable t) {
-			log.error(t.getMessage(), t);
-		} finally {
-			faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
-		}
-	}
-
-	@Override
-	protected Object[] currentState() {
-		return new Object[] { window.currentState() };
-	}
-
-	@Override
-	protected void restoreState(Object[] data) {
-		window.restoreState(data);
-		window.restoreState((Object[]) data[0]);
-		window.reSchedule();
-	}
-
-	@Override
-	protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
-	                    AbstractDefinition streamDefinition, String elementId, boolean async,
-	                    SiddhiContext siddhiContext) {
-
-		if (parameters[0] instanceof IntConstant) {
-			timeToKeep = ((IntConstant) parameters[0]).getValue();
-		} else {
-			timeToKeep = ((LongConstant) parameters[0]).getValue();
-		}
-
-		String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
-		memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
-
-		if (this.siddhiContext.isDistributedProcessingEnabled()) {
-			window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
-		} else {
-			window = new SchedulerSiddhiQueue<StreamEvent>(this);
-		}
-		MemberFaultEventMap
-				.put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
-
-		ExecutorService executorService = StratosThreadPool.getExecutorService(IDENTIFIER, 10);
-		cepTopologyEventReceiver.setExecutorService(executorService);
-		executorService.execute(cepTopologyEventReceiver);
-
-		//Ordinary scheduling
-		window.schedule();
-		if (log.isDebugEnabled()) {
-			log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
-			          ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
-			          ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
-		}
-	}
-
-	@Override
-	public void schedule() {
-		faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
-	}
-
-	@Override
-	public void scheduleNow() {
-		faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
-	}
-
-	@Override
-	public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
-		this.faultHandleScheduler = scheduledExecutorService;
-	}
-
-	@Override
-	public void setThreadBarrier(ThreadBarrier threadBarrier) {
-		this.threadBarrier = threadBarrier;
-	}
-
-	@Override
-	public void destroy() {
-		// terminate topology listener thread
-		cepTopologyEventReceiver.terminate();
-		window = null;
-	}
-
-	public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
-		return memberTimeStampMap;
-	}
->>>>>>> ddf277b... Remove unnessary threads in messaging model
 }