You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:45 UTC
[8/9] stratos git commit: merge with new changes
merge with new changes
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/f70aa9ed
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/f70aa9ed
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/f70aa9ed
Branch: refs/heads/master
Commit: f70aa9edc4d0e32d3b726b99eca217dfd4d3a475
Parents: 4e73393
Author: gayan <ga...@puppet.gayan.org>
Authored: Tue Dec 2 17:42:21 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 17:42:21 2014 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 498 +------------------
.../internal/AutoscalerServerComponent.java | 230 ++-------
.../CloudControllerServiceComponent.java | 143 +-----
.../internal/LoadBalancerServiceComponent.java | 29 +-
.../extension/FaultHandlingWindowProcessor.java | 217 --------
5 files changed, 87 insertions(+), 1030 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index 1f14542..f4a5169 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -27,8 +27,11 @@ import org.apache.stratos.autoscaler.context.cluster.ClusterContextFactory;
import org.apache.stratos.autoscaler.context.cluster.VMClusterContext;
import org.apache.stratos.autoscaler.event.publisher.ClusterStatusEventPublisher;
import org.apache.stratos.autoscaler.event.publisher.InstanceNotificationPublisher;
+import org.apache.stratos.autoscaler.exception.application.DependencyBuilderException;
+import org.apache.stratos.autoscaler.exception.application.TopologyInConsistentException;
import org.apache.stratos.autoscaler.exception.partition.PartitionValidationException;
import org.apache.stratos.autoscaler.exception.policy.PolicyValidationException;
+import org.apache.stratos.autoscaler.monitor.MonitorFactory;
import org.apache.stratos.autoscaler.monitor.cluster.AbstractClusterMonitor;
import org.apache.stratos.autoscaler.monitor.component.ApplicationMonitor;
import org.apache.stratos.autoscaler.monitor.events.ClusterStatusEvent;
@@ -63,34 +66,22 @@ public class AutoscalerTopologyEventReceiver {
private boolean topologyInitialized;
private ExecutorService executorService;
-<<<<<<< HEAD
- public AutoscalerTopologyEventReceiver() {
- this.topologyEventReceiver = new TopologyEventReceiver();
- addEventListeners();
- }
-
+ public AutoscalerTopologyEventReceiver() {
+ this.topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+ }
- public void execute() {
- //FIXME this activated before autoscaler deployer activated.
+ public void execute() {
+ //FIXME this activated before autoscaler deployer activated.
- topologyEventReceiver.setExecutorService(getExecutorService());
- topologyEventReceiver.execute();
+ topologyEventReceiver.setExecutorService(getExecutorService());
+ topologyEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Autoscaler topology receiver thread started");
- }
+ if (log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread started");
+ }
- // Keep the thread live until terminated
- while (!terminated) {
- try {
- Thread.sleep(1000);
- } catch (InterruptedException ignore) {
- }
- }
- if (log.isInfoEnabled()) {
- log.info("Autoscaler topology receiver thread terminated");
- }
- }
+ }
private boolean allClustersInitialized(Application application) {
boolean allClustersInitialized = false;
@@ -524,461 +515,6 @@ public class AutoscalerTopologyEventReceiver {
topologyEventReceiver.terminate();
terminated = true;
}
-=======
- public AutoscalerTopologyEventReceiver() {
- this.topologyEventReceiver = new TopologyEventReceiver();
- addEventListeners();
- }
-
- public void execute() {
- //FIXME this activated before autoscaler deployer activated.
-
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Autoscaler topology receiver thread started");
- }
-
- }
-
- private boolean allClustersInitialized(Application application) {
- boolean allClustersInitialized = false;
- for (ClusterDataHolder holder : application.getClusterDataRecursively()) {
- TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
- holder.getClusterId());
-
- try {
- Topology topology = TopologyManager.getTopology();
- if (topology != null) {
- Service service = topology.getService(holder.getServiceType());
- if (service != null) {
- if (service.clusterExists(holder.getClusterId())) {
- allClustersInitialized = true;
- return allClustersInitialized;
- } else {
- if (log.isDebugEnabled()) {
- log.debug("[Cluster] " + holder.getClusterId() + " is not found in " +
- "the Topology");
- }
- allClustersInitialized = false;
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Service is null in the CompleteTopologyEvent");
- }
- }
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Topology is null in the CompleteTopologyEvent");
- }
- }
- } finally {
- TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
- holder.getClusterId());
- }
- }
- return allClustersInitialized;
- }
-
- private void addEventListeners() {
- // Listen to topology events that affect clusters
- topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
- @Override
- protected void onEvent(Event event) {
- if (!topologyInitialized) {
- log.info("[CompleteTopologyEvent] Received: " + event.getClass());
- ApplicationHolder.acquireReadLock();
- try {
- Applications applications = ApplicationHolder.getApplications();
- if (applications != null) {
- for (Application application : applications.getApplications().values()) {
- if (allClustersInitialized(application)) {
- startApplicationMonitor(application.getUniqueIdentifier());
- } else {
- log.error("Complete Topology is not consistent with the applications " +
- "which got persisted");
- }
- }
- topologyInitialized = true;
- } else {
- log.info("No applications found in the complete topology");
- }
- } catch (Exception e) {
- log.error("Error processing event", e);
- } finally {
- ApplicationHolder.releaseReadLock();
- }
- }
- }
- });
-
- topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass());
- ApplicationClustersCreatedEvent applicationClustersCreatedEvent =
- (ApplicationClustersCreatedEvent) event;
- String appId = applicationClustersCreatedEvent.getAppId();
- try {
- //acquire read lock
- ApplicationHolder.acquireReadLock();
- //start the application monitor
- startApplicationMonitor(appId);
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
- } finally {
- //release read lock
- ApplicationHolder.releaseReadLock();
-
- }
- } catch (ClassCastException e) {
- String msg = "Error while casting the event " + e.getLocalizedMessage();
- log.error(msg, e);
- }
-
- }
- });
-
- topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- log.info("[ClusterActivatedEvent] Received: " + event.getClass());
- ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event;
- String clusterId = clusterActivatedEvent.getClusterId();
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
- //changing the status in the monitor, will notify its parent monitor
-
- }
- });
-
- topologyEventReceiver.addEventListener(new ClusterResetEventListener() {
- @Override
- protected void onEvent(Event event) {
- log.info("[ClusterCreatedEvent] Received: " + event.getClass());
- ClusterResetEvent clusterResetEvent = (ClusterResetEvent) event;
- String clusterId = clusterResetEvent.getClusterId();
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
- //changing the status in the monitor, will notify its parent monitor
- monitor.destroy();
- monitor.setStatus(ClusterStatus.Created);
-
- }
- });
-
- topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- log.info("[ClusterCreatedEvent] Received: " + event.getClass());
- }
- });
-
- topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() {
- @Override
- protected void onEvent(Event event) {
- log.info("[ClusterInActivateEvent] Received: " + event.getClass());
- ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event;
- String clusterId = clusterInactivateEvent.getClusterId();
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
- //changing the status in the monitor, will notify its parent monitor
- monitor.setStatus(ClusterStatus.Inactive);
- }
- });
-
- topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() {
- @Override
- protected void onEvent(Event event) {
- log.info("[ClusterTerminatingEvent] Received: " + event.getClass());
- ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event;
- String clusterId = clusterTerminatingEvent.getClusterId();
- String instanceId = clusterTerminatingEvent.getInstanceId();
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- // if monitor does not exist, send cluster terminated event
- ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
- clusterTerminatingEvent.getServiceName(),
- clusterId, instanceId);
- return;
- }
- //changing the status in the monitor, will notify its parent monitor
- if (monitor.getStatus() == ClusterStatus.Active) {
- // terminated gracefully
- monitor.setStatus(ClusterStatus.Terminating);
- InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
- } else {
- monitor.setStatus(ClusterStatus.Terminating);
- monitor.terminateAllMembers();
- }
- ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
- process("", clusterId, instanceId);
- }
- });
-
- topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- log.info("[ClusterTerminatedEvent] Received: " + event.getClass());
- ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event;
- String clusterId = clusterTerminatedEvent.getClusterId();
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- // if the cluster monitor is null, assume that its termianted
- ApplicationMonitor appMonitor =
- AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
- if (appMonitor != null) {
- appMonitor
- .onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null));
- }
- return;
- }
- //changing the status in the monitor, will notify its parent monitor
- monitor.setStatus(ClusterStatus.Terminated);
- //Destroying and Removing the Cluster monitor
- monitor.destroy();
- AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
- }
- });
-
- topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
- String clusterId = memberReadyToShutdownEvent.getClusterId();
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- AbstractClusterMonitor monitor;
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
- monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
- });
-
- topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- }
- });
-
- topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
- String clusterId = memberTerminatedEvent.getClusterId();
- AbstractClusterMonitor monitor;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
- monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
- });
-
- topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
- String clusterId = memberActivatedEvent.getClusterId();
- AbstractClusterMonitor monitor;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
- monitor.handleMemberActivatedEvent(memberActivatedEvent);
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
- });
-
- topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
- String clusterId = maintenanceModeEvent.getClusterId();
- AbstractClusterMonitor monitor;
- AutoscalerContext asCtx = AutoscalerContext.getInstance();
- monitor = asCtx.getClusterMonitor(clusterId);
- if (null == monitor) {
- if (log.isDebugEnabled()) {
- log.debug(String.format("A cluster monitor is not found in autoscaler context "
- + "[cluster] %s", clusterId));
- }
- return;
- }
- monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
- } catch (Exception e) {
- String msg = "Error processing event " + e.getLocalizedMessage();
- log.error(msg, e);
- }
- }
- });
-
- topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
-
- ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
- (ClusterInstanceCreatedEvent) event;
- AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
- getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
-
- if (clusterMonitor != null) {
- TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
-
- try {
- Service service = TopologyManager.getTopology().
- getService(clusterInstanceCreatedEvent.getServiceName());
-
- if (service != null) {
- Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
- if (cluster != null) {
- // create and add Cluster Context
- try {
- if (cluster.isKubernetesCluster()) {
- clusterMonitor.addClusterContextForInstance(
- clusterInstanceCreatedEvent.getInstanceId(),
- ClusterContextFactory.getKubernetesClusterContext(cluster));
- } else if (cluster.isLbCluster()) {
- clusterMonitor.addClusterContextForInstance(
- clusterInstanceCreatedEvent.getInstanceId(),
- ClusterContextFactory.getVMLBClusterContext(cluster));
- } else {
- clusterMonitor.addClusterContextForInstance(
- clusterInstanceCreatedEvent.getInstanceId(),
- ClusterContextFactory.getVMServiceClusterContext(cluster));
- }
-
- if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
- clusterMonitor.startScheduler();
- log.info("Monitoring task for Cluster Monitor with cluster id " +
- clusterInstanceCreatedEvent.getClusterId() + " started successfully");
- }
-
- } catch (PolicyValidationException e) {
- log.error(e.getMessage(), e);
- } catch (PartitionValidationException e) {
- log.error(e.getMessage(), e);
- }
-
- } else {
- log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() +
- ", no cluster instance added to ClusterMonitor " +
- clusterInstanceCreatedEvent.getClusterId());
- }
- } else {
- log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
- " not found, no cluster instance added to ClusterMonitor " +
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } finally {
- TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
- clusterInstanceCreatedEvent.getClusterId());
- }
-
- } else {
- log.error("No Cluster Monitor found for cluster id " +
- clusterInstanceCreatedEvent.getClusterId());
- }
- }
- });
- }
-
- /**
- * Terminate load balancer topology receiver thread.
- */
- public void terminate() {
- topologyEventReceiver.terminate();
- terminated = true;
- }
-
- protected synchronized void startApplicationMonitor(String applicationId) {
- Thread th = null;
- if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
- th = new Thread(new ApplicationMonitorAdder(applicationId));
- }
- if (th != null) {
- th.start();
- } else {
- if (log.isDebugEnabled()) {
- log.debug(String
- .format("Application monitor thread already exists: " +
- "[application] %s ", applicationId));
- }
- }
- }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
public ExecutorService getExecutorService() {
return executorService;
@@ -987,8 +523,6 @@ public class AutoscalerTopologyEventReceiver {
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
-<<<<<<< HEAD
-=======
private class ApplicationMonitorAdder implements Runnable {
private String appId;
@@ -1047,5 +581,5 @@ public class AutoscalerTopologyEventReceiver {
}
}
}
->>>>>>> ddf277b... Remove unnessary threads in messaging model
+
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index bb5e167..91d52d3 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -39,6 +39,7 @@ import org.apache.stratos.autoscaler.util.ServiceReferenceHolder;
import org.apache.stratos.cloud.controller.stub.domain.Partition;
import org.apache.stratos.common.kubernetes.KubernetesGroup;
import org.apache.stratos.common.threading.StratosThreadPool;
+import org.drools.reteoo.PartitionManager;
import org.osgi.service.component.ComponentContext;
import org.wso2.carbon.ntask.core.service.TaskService;
import org.wso2.carbon.registry.api.RegistryException;
@@ -75,28 +76,27 @@ public class AutoscalerServerComponent {
protected void activate(ComponentContext componentContext) throws Exception {
-<<<<<<< HEAD
- try {
- // Start topology receiver
- XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
- int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
- String threadIdentifier=conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
- ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
- asTopologyReceiver = new AutoscalerTopologyEventReceiver();
- asTopologyReceiver.setExecutorService(executorService);
- asTopologyReceiver.execute();
+ try {
+ // Start topology receiver
+ XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+ int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
+ String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+ ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
+ asTopologyReceiver = new AutoscalerTopologyEventReceiver();
+ asTopologyReceiver.setExecutorService(executorService);
+ asTopologyReceiver.execute();
- if (log.isDebugEnabled()) {
- log.debug("Topology receiver executor service started");
- }
+ if (log.isDebugEnabled()) {
+ log.debug("Topology receiver executor service started");
+ }
- // Start health stat receiver
- autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
- Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
- healthDelegatorThread.start();
- if (log.isDebugEnabled()) {
- log.debug("Health statistics receiver thread started");
- }
+ // Start health stat receiver
+ autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
+ autoscalerHealthStatEventReceiver.setExecutorService(executorService);
+ autoscalerHealthStatEventReceiver.execute();
+ if (log.isDebugEnabled()) {
+ log.debug("Health statistics receiver thread started");
+ }
// Adding the registry stored partitions to the information model
List<Partition> partitions = RegistryManager.getInstance().retrievePartitions();
@@ -105,7 +105,7 @@ public class AutoscalerServerComponent {
Partition partition = partitionIterator.next();
// PartitionManager.getInstance().addPartitionToInformationModel(partition);
}
-
+
// Adding the network partitions stored in registry to the information model
// List<NetworkPartitionLbHolder> nwPartitionHolders = RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
// Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
@@ -113,7 +113,7 @@ public class AutoscalerServerComponent {
// NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
// PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
// }
-
+
List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
while (asPolicyIterator.hasNext()) {
@@ -121,43 +121,43 @@ public class AutoscalerServerComponent {
PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
}
- List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
- Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
- while (depPolicyIterator.hasNext()) {
- DeploymentPolicy depPolicy = depPolicyIterator.next();
- PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
- }
+ List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
+ Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
+ while (depPolicyIterator.hasNext()) {
+ DeploymentPolicy depPolicy = depPolicyIterator.next();
+ PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
+ }
- // Adding KubernetesGroups stored in registry to the information model
- List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
- Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
- while (kubernetesGroupIterator.hasNext()) {
- KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
- KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
- }
+ // Adding KubernetesGroups stored in registry to the information model
+ List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
+ Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
+ while (kubernetesGroupIterator.hasNext()) {
+ KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
+ KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
+ }
- //starting the processor chain
- ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
- ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
+ //starting the processor chain
+ ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
+ ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
- GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
- ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
+ GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
+ ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
- if (log.isInfoEnabled()) {
- log.info("Scheduling tasks to publish applications");
- }
+ if (log.isInfoEnabled()) {
+ log.info("Scheduling tasks to publish applications");
+ }
- ApplicationSynchronizerTaskScheduler
- .schedule(ServiceReferenceHolder.getInstance()
- .getTaskService());
+ ApplicationSynchronizerTaskScheduler
+ .schedule(ServiceReferenceHolder.getInstance()
+ .getTaskService());
- if (log.isInfoEnabled()) {
- log.info("Autoscaler server Component activated");
- }
- } catch (Throwable e) {
- log.error("Error in activating the autoscaler component ", e);
- }
- }
+ if (log.isInfoEnabled()) {
+ log.info("Autoscaler server Component activated");
+ }
+ } catch (Throwable e) {
+ log.error("Error in activating the autoscaler component ", e);
+ }
+ }
protected void deactivate(ComponentContext context) {
asTopologyReceiver.terminate();
@@ -198,128 +198,4 @@ public class AutoscalerServerComponent {
ServiceReferenceHolder.getInstance().setTaskService(null);
}
}
-=======
- try {
- // Start topology receiver
- XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
- int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
- String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
- ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
- asTopologyReceiver = new AutoscalerTopologyEventReceiver();
- asTopologyReceiver.setExecutorService(executorService);
- asTopologyReceiver.execute();
-
- if (log.isDebugEnabled()) {
- log.debug("Topology receiver executor service started");
- }
-
- // Start health stat receiver
- autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
- autoscalerHealthStatEventReceiver.setExecutorService(executorService);
- autoscalerHealthStatEventReceiver.execute();
- if (log.isDebugEnabled()) {
- log.debug("Health statistics receiver thread started");
- }
-
- // Adding the registry stored partitions to the information model
- List<Partition> partitions = RegistryManager.getInstance().retrievePartitions();
- Iterator<Partition> partitionIterator = partitions.iterator();
- while (partitionIterator.hasNext()) {
- Partition partition = partitionIterator.next();
- PartitionManager.getInstance().addPartitionToInformationModel(partition);
- }
-
- // Adding the network partitions stored in registry to the information model
- List<NetworkPartitionLbHolder> nwPartitionHolders =
- RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
- Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
- while (nwPartitionIterator.hasNext()) {
- NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
- PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
- }
-
- List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
- Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
- while (asPolicyIterator.hasNext()) {
- AutoscalePolicy asPolicy = asPolicyIterator.next();
- PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
- }
-
- List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
- Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
- while (depPolicyIterator.hasNext()) {
- DeploymentPolicy depPolicy = depPolicyIterator.next();
- PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
- }
-
- // Adding KubernetesGroups stored in registry to the information model
- List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
- Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
- while (kubernetesGroupIterator.hasNext()) {
- KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
- KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
- }
-
- //starting the processor chain
- ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
- ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
- GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
- ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
-
- if (log.isInfoEnabled()) {
- log.info("Scheduling tasks to publish applications");
- }
-
- ApplicationSynchronizerTaskScheduler
- .schedule(ServiceReferenceHolder.getInstance()
- .getTaskService());
-
- if (log.isInfoEnabled()) {
- log.info("Autoscaler server Component activated");
- }
- } catch (Throwable e) {
- log.error("Error in activating the autoscaler component ", e);
- }
- }
-
- protected void deactivate(ComponentContext context) {
- asTopologyReceiver.terminate();
- autoscalerHealthStatEventReceiver.terminate();
- }
-
- protected void setRegistryService(RegistryService registryService) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the Registry Service");
- }
- try {
- ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry());
- } catch (RegistryException e) {
- String msg = "Failed when retrieving Governance System Registry.";
- log.error(msg, e);
- throw new AutoScalerException(msg, e);
- }
- }
-
- protected void unsetRegistryService(RegistryService registryService) {
- if (log.isDebugEnabled()) {
- log.debug("Un-setting the Registry Service");
- }
- ServiceReferenceHolder.getInstance().setRegistry(null);
- }
-
- protected void setTaskService(TaskService taskService) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the Task Service");
- }
- ServiceReferenceHolder.getInstance().setTaskService(taskService);
- }
-
- protected void unsetTaskService(TaskService taskService) {
- if (log.isDebugEnabled()) {
- log.debug("Un-setting the Task Service");
- }
- ServiceReferenceHolder.getInstance().setTaskService(null);
- }
-}
->>>>>>> ddf277b... Remove unnessary threads in messaging model
http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index a413218..700efb9 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,41 +20,23 @@ package org.apache.stratos.cloud.controller.internal;
*
*/
-<<<<<<< HEAD
-<<<<<<< HEAD
+import org.apache.commons.configuration.XMLConfiguration;
-<<<<<<< HEAD
import com.hazelcast.core.HazelcastInstance;
-=======
->>>>>>> ddf277b... Remove unnessary threads in messaging model
-=======
-
->>>>>>> ad3e45c... Remove unnessary threads in messaging model
-=======
-import org.apache.commons.configuration.XMLConfiguration;
->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.context.CloudControllerContext;
import org.apache.stratos.cloud.controller.messaging.receiver.application.ApplicationTopicReceiver;
import org.apache.stratos.cloud.controller.messaging.receiver.cluster.status.ClusterStatusTopicReceiver;
import org.apache.stratos.cloud.controller.exception.CloudControllerException;
-<<<<<<< HEAD
import org.apache.stratos.cloud.controller.services.CloudControllerService;
import org.apache.stratos.cloud.controller.services.impl.CloudControllerServiceImpl;
import org.apache.stratos.cloud.controller.messaging.publisher.TopologySynchronizerTaskScheduler;
import org.apache.stratos.cloud.controller.messaging.receiver.instance.status.InstanceStatusTopicReceiver;
import org.apache.stratos.common.clustering.DistributedObjectProvider;
-=======
-import org.apache.stratos.cloud.controller.impl.CloudControllerServiceImpl;
-import org.apache.stratos.cloud.controller.interfaces.CloudControllerService;
-import org.apache.stratos.cloud.controller.publisher.TopologySynchronizerTaskScheduler;
-import org.apache.stratos.cloud.controller.receiver.instance.status.InstanceStatusTopicReceiver;
-import org.apache.stratos.cloud.controller.util.ServiceReferenceHolder;
import org.apache.stratos.common.threading.StratosThreadPool;
import org.apache.stratos.common.util.ConfUtil;
->>>>>>> 1b26a96... Adding executor service for threads and remove unnecessary threads
import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.util.Util;
import org.osgi.framework.BundleContext;
@@ -72,7 +54,6 @@ import java.util.concurrent.ExecutorService;
* Registering Cloud Controller Service.
*
* @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
-<<<<<<< HEAD
* @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
* cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
* @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -81,24 +62,6 @@ import java.util.concurrent.ExecutorService;
* cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
* @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
-=======
- * @scr.reference name="distributedMapProvider" interface="org.wso2.carbon.caching.impl.DistributedMapProvider"
- * cardinality="1..1" policy="dynamic" bind="setDistributedMapProvider" unbind="unsetDistributedMapProvider"
- * @scr.reference name="ntask.component"
- * interface="org.wso2.carbon.ntask.core.service.TaskService"
- * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
- * @scr.reference name="registry.service"
- * interface="org.wso2.carbon.registry.core.service.RegistryService"
- * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
- * @scr.reference name="config.context.service"
-<<<<<<< HEAD
- * interface="org.wso2.carbon.utils.ConfigurationContextService"
- * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
->>>>>>> ddf277b... Remove unnessary threads in messaging model
-=======
- * interface="org.wso2.carbon.utils.ConfigurationContextService"
- * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
->>>>>>> ad3e45c... Remove unnessary threads in messaging model
*/
public class CloudControllerServiceComponent {
@@ -107,8 +70,8 @@ public class CloudControllerServiceComponent {
private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
private ApplicationTopicReceiver applicationTopicReceiver;
private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
- private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
- private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
+ private static final String DEFAULT_IDENTIFIER = "Cloud-Controller";
+ private static final String THREAD_POOL_SIZE_KEY = "threadPool.cloudcontroller.threadPoolSize";
private static final String COMPONENTS_CONFIG = "stratos-config";
private static final int THREAD_POOL_SIZE = 10;
@@ -127,28 +90,16 @@ public class CloudControllerServiceComponent {
log.info("Application Receiver thread started");
}
-<<<<<<< HEAD
- if (log.isInfoEnabled()) {
- log.info("Application event receiver thread started");
- }
-=======
clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
clusterStatusTopicReceiver.setExecutorService(executorService);
clusterStatusTopicReceiver.execute();
->>>>>>> ddf277b... Remove unnessary threads in messaging model
if (log.isInfoEnabled()) {
log.info("Cluster status Receiver thread started");
}
-<<<<<<< HEAD
- if (log.isInfoEnabled()) {
- log.info("Cluster status receiver thread started");
- }
-=======
instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
instanceStatusTopicReceiver.execute();
->>>>>>> ddf277b... Remove unnessary threads in messaging model
if (log.isInfoEnabled()) {
log.info("Instance status message receiver thread started");
@@ -163,23 +114,15 @@ public class CloudControllerServiceComponent {
log.info("Scheduling tasks");
}
-<<<<<<< HEAD
- if(log.isInfoEnabled()) {
- log.info("Scheduling tasks");
- }
+ TopologySynchronizerTaskScheduler
+ .schedule(ServiceReferenceHolder.getInstance()
+ .getTaskService());
- if ((!CloudControllerContext.getInstance().isClustered()) ||
- (CloudControllerContext.getInstance().isCoordinator())) {
- TopologySynchronizerTaskScheduler.schedule(ServiceReferenceHolder.getInstance().getTaskService());
- if(log.isInfoEnabled()) {
- log.info("Topology synchronizer task scheduled");
- }
- }
- } catch (Throwable e) {
- log.error("**** Cloud controller service bundle is failed to activate ****", e);
+ } catch (Throwable e) {
+ log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
}
}
-
+
protected void setTaskService(TaskService taskService) {
if (log.isDebugEnabled()) {
log.debug("Setting the Task Service");
@@ -193,32 +136,7 @@ public class CloudControllerServiceComponent {
}
ServiceReferenceHolder.getInstance().setTaskService(null);
}
-
-=======
- TopologySynchronizerTaskScheduler
- .schedule(ServiceReferenceHolder.getInstance()
- .getTaskService());
-
- } catch (Throwable e) {
- log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
- }
- }
-
- protected void setTaskService(TaskService taskService) {
- if (log.isDebugEnabled()) {
- log.debug("Setting the Task Service");
- }
- ServiceReferenceHolder.getInstance().setTaskService(taskService);
- }
- protected void unsetTaskService(TaskService taskService) {
- if (log.isDebugEnabled()) {
- log.debug("Unsetting the Task Service");
- }
- ServiceReferenceHolder.getInstance().setTaskService(null);
- }
-
->>>>>>> ddf277b... Remove unnessary threads in messaging model
protected void setRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
log.debug("Setting the Registry Service");
@@ -226,39 +144,22 @@ public class CloudControllerServiceComponent {
try {
UserRegistry registry = registryService.getGovernanceSystemRegistry();
-<<<<<<< HEAD
ServiceReferenceHolder.getInstance().setRegistry(registry);
} catch (RegistryException e) {
String msg = "Failed when retrieving Governance System Registry.";
log.error(msg, e);
throw new CloudControllerException(msg, e);
- }
-=======
- ServiceReferenceHolder.getInstance()
- .setRegistry(registry);
- } catch (RegistryException e) {
- String msg = "Failed when retrieving Governance System Registry.";
- log.error(msg, e);
- throw new CloudControllerException(msg, e);
- }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
+ }
}
protected void unsetRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
-<<<<<<< HEAD
log.debug("Un-setting the Registry Service");
}
ServiceReferenceHolder.getInstance().setRegistry(null);
-=======
- log.debug("Unsetting the Registry Service");
- }
- ServiceReferenceHolder.getInstance().setRegistry(null);
->>>>>>> ddf277b... Remove unnessary threads in messaging model
}
protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
-<<<<<<< HEAD
ServiceReferenceHolder.getInstance().setAxisConfiguration(
cfgCtxService.getServerConfigContext().getAxisConfiguration());
}
@@ -274,27 +175,9 @@ public class CloudControllerServiceComponent {
protected void unsetDistributedObjectProvider(DistributedObjectProvider distributedObjectProvider) {
ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null);
}
-
-=======
- ServiceReferenceHolder.getInstance().setAxisConfiguration(
- cfgCtxService.getServerConfigContext().getAxisConfiguration());
- }
-
- protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
- ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
- }
-
- protected void setDistributedMapProvider(DistributedMapProvider mapProvider) {
- ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider);
- }
-
- protected void unsetDistributedMapProvider(DistributedMapProvider mapProvider) {
- ServiceReferenceHolder.getInstance().setDistributedMapProvider(null);
- }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
protected void deactivate(ComponentContext ctx) {
- // Close event publisher connections to message broker
- EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
}
-}
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 3aa77a8..509bc74 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -120,30 +120,13 @@ public class LoadBalancerServiceComponent {
TopologyFilterConfigurator.configure(configuration);
if (configuration.isMultiTenancyEnabled()) {
-<<<<<<< HEAD
// Start tenant event receiver
startTenantEventReceiver();
-=======
-
- tenantReceiver = new LoadBalancerTenantEventReceiver();
- tenantReceiver.execute();
-
- if (log.isInfoEnabled()) {
- log.info("Tenant receiver thread started");
- }
->>>>>>> ae876c1... Remove unnessary threads in messaging model
}
if (configuration.isTopologyEventListenerEnabled()) {
// Start topology receiver
-<<<<<<< HEAD
startTopologyEventReceiver();
-=======
- topologyReceiver = new LoadBalancerTopologyEventReceiver();
- topologyReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Topology receiver thread started");
- }
if (log.isInfoEnabled()) {
if (TopologyServiceFilter.getInstance().isActive()) {
@@ -177,7 +160,7 @@ public class LoadBalancerServiceComponent {
log.info(String.format("Member filter activated: [lb-cluster-ids] %s", sb.toString()));
}
}
->>>>>>> ae876c1... Remove unnessary threads in messaging model
+
}
if(configuration.isCepStatsPublisherEnabled()) {
@@ -197,18 +180,16 @@ public class LoadBalancerServiceComponent {
}
private void startTenantEventReceiver() {
- tenantReceiver = new LoadBalancerTenantEventReceiver();
- Thread tenantReceiverThread = new Thread(tenantReceiver);
- tenantReceiverThread.start();
+ tenantReceiver = new LoadBalancerTenantEventReceiver();
+ tenantReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Tenant receiver thread started");
}
}
private void startTopologyEventReceiver() {
- topologyReceiver = new LoadBalancerTopologyEventReceiver();
- Thread topologyReceiverThread = new Thread(topologyReceiver);
- topologyReceiverThread.start();
+ topologyReceiver = new LoadBalancerTopologyEventReceiver();
+ topologyReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Topology receiver thread started");
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/f70aa9ed/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 5ee28d1..8bfcb2c 100644
--- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -107,7 +107,6 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
log.debug("Member not found in the toplogy. Event rejected");
return;
}
-<<<<<<< HEAD
if (StringUtils.isNotEmpty(id)) {
memberTimeStampMap.put(id, event.getTimeStamp());
} else {
@@ -320,220 +319,4 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
return memberTimeStampMap;
}
-=======
- if (StringUtils.isNotEmpty(id)) {
- memberTimeStampMap.put(id, event.getTimeStamp());
- } else {
- log.warn("NULL member id found in the event received. Event rejected.");
- }
- if (log.isDebugEnabled()) {
- log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
- }
- }
-
- @Override
- public Iterator<StreamEvent> iterator() {
- return window.iterator();
- }
-
- @Override
- public Iterator<StreamEvent> iterator(String predicate) {
- if (siddhiContext.isDistributedProcessingEnabled()) {
- return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
- } else {
- return window.iterator();
- }
- }
-
- /**
- * Retrieve the current activated members from the topology and initialize the time stamp map.
- * This will allow the system to recover from a restart
- *
- * @param topology Topology model object
- */
- boolean loadTimeStampMapFromTopology(Topology topology) {
-
- long currentTimeStamp = System.currentTimeMillis();
- if (topology == null || topology.getServices() == null) {
- return false;
- }
- // TODO make this efficient by adding APIs to messaging component
- for (Service service : topology.getServices()) {
- if (service.getClusters() != null) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.getMembers() != null) {
- for (Member member : cluster.getMembers()) {
- // we are checking faulty status only in previously activated members
- if (member != null && MemberStatus.Activated.equals(member.getStatus())) {
- // Initialize the member time stamp map from the topology at the beginning
- memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
- }
- }
- }
- }
- }
- }
-
- log.info("Member time stamp map was successfully loaded from the topology.");
- if (log.isDebugEnabled()) {
- log.debug("Member TimeStamp Map: " + memberTimeStampMap);
- }
- return true;
- }
-
- private Member getMemberFromId(String memberId) {
- if (StringUtils.isEmpty(memberId)) {
- return null;
- }
- if (TopologyManager.getTopology().isInitialized()) {
- try {
- TopologyManager.acquireReadLock();
- if (TopologyManager.getTopology().getServices() == null) {
- return null;
- }
- // TODO make this efficient by adding APIs to messaging component
- for (Service service : TopologyManager.getTopology().getServices()) {
- if (service.getClusters() != null) {
- for (Cluster cluster : service.getClusters()) {
- if (cluster.getMembers() != null) {
- for (Member member : cluster.getMembers()) {
- if (memberId.equals(member.getMemberId())) {
- return member;
- }
- }
- }
- }
- }
- }
- } catch (Exception e) {
- log.error("Error while reading topology" + e);
- } finally {
- TopologyManager.releaseReadLock();
- }
- }
- return null;
- }
-
- private void publishMemberFault(String memberId) {
- Member member = getMemberFromId(memberId);
- if (member == null) {
- log.error("Failed to publish member fault event. Member having [member-id] " + memberId +
- " does not exist in topology");
- return;
- }
- log.info("Publishing member fault event for [member-id] " + memberId);
-
- MemberFaultEvent memberFaultEvent =
- new MemberFaultEvent(member.getClusterId(), member.getInstanceId(), member.getMemberId(),
- member.getPartitionId(), 0);
-
- memberFaultEventMessageMap.put("message", memberFaultEvent);
- healthStatPublisher.publish(MemberFaultEventMap, true);
- }
-
- @Override
- public void run() {
- try {
- threadBarrier.pass();
-
- for (Object o : memberTimeStampMap.entrySet()) {
- Map.Entry pair = (Map.Entry) o;
- long currentTime = System.currentTimeMillis();
- Long eventTimeStamp = (Long) pair.getValue();
-
- if ((currentTime - eventTimeStamp) > TIME_OUT) {
- log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
- eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
- publishMemberFault((String) pair.getKey());
- }
- }
- if (log.isDebugEnabled()) {
- log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
- memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
- }
- } catch (Throwable t) {
- log.error(t.getMessage(), t);
- } finally {
- faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
- }
- }
-
- @Override
- protected Object[] currentState() {
- return new Object[] { window.currentState() };
- }
-
- @Override
- protected void restoreState(Object[] data) {
- window.restoreState(data);
- window.restoreState((Object[]) data[0]);
- window.reSchedule();
- }
-
- @Override
- protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
- AbstractDefinition streamDefinition, String elementId, boolean async,
- SiddhiContext siddhiContext) {
-
- if (parameters[0] instanceof IntConstant) {
- timeToKeep = ((IntConstant) parameters[0]).getValue();
- } else {
- timeToKeep = ((LongConstant) parameters[0]).getValue();
- }
-
- String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
- memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
-
- if (this.siddhiContext.isDistributedProcessingEnabled()) {
- window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
- } else {
- window = new SchedulerSiddhiQueue<StreamEvent>(this);
- }
- MemberFaultEventMap
- .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
-
- ExecutorService executorService = StratosThreadPool.getExecutorService(IDENTIFIER, 10);
- cepTopologyEventReceiver.setExecutorService(executorService);
- executorService.execute(cepTopologyEventReceiver);
-
- //Ordinary scheduling
- window.schedule();
- if (log.isDebugEnabled()) {
- log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
- ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
- ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
- }
- }
-
- @Override
- public void schedule() {
- faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void scheduleNow() {
- faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
- }
-
- @Override
- public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
- this.faultHandleScheduler = scheduledExecutorService;
- }
-
- @Override
- public void setThreadBarrier(ThreadBarrier threadBarrier) {
- this.threadBarrier = threadBarrier;
- }
-
- @Override
- public void destroy() {
- // terminate topology listener thread
- cepTopologyEventReceiver.terminate();
- window = null;
- }
-
- public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
- return memberTimeStampMap;
- }
->>>>>>> ddf277b... Remove unnessary threads in messaging model
}