You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by la...@apache.org on 2014/12/03 07:56:40 UTC
[3/9] stratos git commit: Remove unnessary threads in messaging model
Remove unnessary threads in messaging model
Conflicts:
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/8012f8c8
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/8012f8c8
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/8012f8c8
Branch: refs/heads/master
Commit: 8012f8c8d1b7ef808fae55d0f18826e651c04ed9
Parents: 9e6e91d
Author: gayan <ga...@puppet.gayan.org>
Authored: Mon Dec 1 16:25:22 2014 +0530
Committer: gayan <ga...@puppet.gayan.org>
Committed: Tue Dec 2 16:36:37 2014 +0530
----------------------------------------------------------------------
.../AutoscalerTopologyEventReceiver.java | 527 ++++++++++++++++++-
.../internal/AutoscalerServerComponent.java | 131 ++++-
.../CloudControllerServiceComponent.java | 142 ++++-
.../application/ApplicationTopicReceiver.java | 65 ++-
.../status/ClusterStatusTopicReceiver.java | 110 ++--
.../status/InstanceStatusTopicReceiver.java | 128 +++--
.../extension/api/LoadBalancerExtension.java | 256 +++++----
.../extension/FaultHandlingWindowProcessor.java | 307 +++++++++--
.../apache/stratos/haproxy/extension/Main.java | 48 +-
9 files changed, 1332 insertions(+), 382 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
index bfdf30b..1f14542 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/event/receiver/topology/AutoscalerTopologyEventReceiver.java
@@ -54,15 +54,16 @@ import java.util.concurrent.ExecutorService;
/**
* Autoscaler topology receiver.
*/
-public class AutoscalerTopologyEventReceiver{
+public class AutoscalerTopologyEventReceiver {
- private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
+ private static final Log log = LogFactory.getLog(AutoscalerTopologyEventReceiver.class);
- private TopologyEventReceiver topologyEventReceiver;
- private boolean terminated;
- private boolean topologyInitialized;
+ private TopologyEventReceiver topologyEventReceiver;
+ private boolean terminated;
+ private boolean topologyInitialized;
private ExecutorService executorService;
+<<<<<<< HEAD
public AutoscalerTopologyEventReceiver() {
this.topologyEventReceiver = new TopologyEventReceiver();
addEventListeners();
@@ -523,6 +524,461 @@ public class AutoscalerTopologyEventReceiver{
topologyEventReceiver.terminate();
terminated = true;
}
+=======
+ public AutoscalerTopologyEventReceiver() {
+ this.topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+ }
+
+ public void execute() {
+ //FIXME this activated before autoscaler deployer activated.
+
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+
+ if (log.isInfoEnabled()) {
+ log.info("Autoscaler topology receiver thread started");
+ }
+
+ }
+
+ private boolean allClustersInitialized(Application application) {
+ boolean allClustersInitialized = false;
+ for (ClusterDataHolder holder : application.getClusterDataRecursively()) {
+ TopologyManager.acquireReadLockForCluster(holder.getServiceType(),
+ holder.getClusterId());
+
+ try {
+ Topology topology = TopologyManager.getTopology();
+ if (topology != null) {
+ Service service = topology.getService(holder.getServiceType());
+ if (service != null) {
+ if (service.clusterExists(holder.getClusterId())) {
+ allClustersInitialized = true;
+ return allClustersInitialized;
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("[Cluster] " + holder.getClusterId() + " is not found in " +
+ "the Topology");
+ }
+ allClustersInitialized = false;
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Service is null in the CompleteTopologyEvent");
+ }
+ }
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Topology is null in the CompleteTopologyEvent");
+ }
+ }
+ } finally {
+ TopologyManager.releaseReadLockForCluster(holder.getServiceType(),
+ holder.getClusterId());
+ }
+ }
+ return allClustersInitialized;
+ }
+
+ private void addEventListeners() {
+ // Listen to topology events that affect clusters
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ if (!topologyInitialized) {
+ log.info("[CompleteTopologyEvent] Received: " + event.getClass());
+ ApplicationHolder.acquireReadLock();
+ try {
+ Applications applications = ApplicationHolder.getApplications();
+ if (applications != null) {
+ for (Application application : applications.getApplications().values()) {
+ if (allClustersInitialized(application)) {
+ startApplicationMonitor(application.getUniqueIdentifier());
+ } else {
+ log.error("Complete Topology is not consistent with the applications " +
+ "which got persisted");
+ }
+ }
+ topologyInitialized = true;
+ } else {
+ log.info("No applications found in the complete topology");
+ }
+ } catch (Exception e) {
+ log.error("Error processing event", e);
+ } finally {
+ ApplicationHolder.releaseReadLock();
+ }
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ApplicationClustersCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ log.info("[ApplicationClustersCreatedEvent] Received: " + event.getClass());
+ ApplicationClustersCreatedEvent applicationClustersCreatedEvent =
+ (ApplicationClustersCreatedEvent) event;
+ String appId = applicationClustersCreatedEvent.getAppId();
+ try {
+ //acquire read lock
+ ApplicationHolder.acquireReadLock();
+ //start the application monitor
+ startApplicationMonitor(appId);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ } finally {
+ //release read lock
+ ApplicationHolder.releaseReadLock();
+
+ }
+ } catch (ClassCastException e) {
+ String msg = "Error while casting the event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterActivatedEvent] Received: " + event.getClass());
+ ClusterActivatedEvent clusterActivatedEvent = (ClusterActivatedEvent) event;
+ String clusterId = clusterActivatedEvent.getClusterId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ //changing the status in the monitor, will notify its parent monitor
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterResetEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterCreatedEvent] Received: " + event.getClass());
+ ClusterResetEvent clusterResetEvent = (ClusterResetEvent) event;
+ String clusterId = clusterResetEvent.getClusterId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ //changing the status in the monitor, will notify its parent monitor
+ monitor.destroy();
+ monitor.setStatus(ClusterStatus.Created);
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterCreatedEvent] Received: " + event.getClass());
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterInActivateEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterInActivateEvent] Received: " + event.getClass());
+ ClusterInactivateEvent clusterInactivateEvent = (ClusterInactivateEvent) event;
+ String clusterId = clusterInactivateEvent.getClusterId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ //changing the status in the monitor, will notify its parent monitor
+ monitor.setStatus(ClusterStatus.Inactive);
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterTerminatingEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterTerminatingEvent] Received: " + event.getClass());
+ ClusterTerminatingEvent clusterTerminatingEvent = (ClusterTerminatingEvent) event;
+ String clusterId = clusterTerminatingEvent.getClusterId();
+ String instanceId = clusterTerminatingEvent.getInstanceId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ // if monitor does not exist, send cluster terminated event
+ ClusterStatusEventPublisher.sendClusterTerminatedEvent(clusterTerminatingEvent.getAppId(),
+ clusterTerminatingEvent.getServiceName(),
+ clusterId, instanceId);
+ return;
+ }
+ //changing the status in the monitor, will notify its parent monitor
+ if (monitor.getStatus() == ClusterStatus.Active) {
+ // terminated gracefully
+ monitor.setStatus(ClusterStatus.Terminating);
+ InstanceNotificationPublisher.sendInstanceCleanupEventForCluster(clusterId);
+ } else {
+ monitor.setStatus(ClusterStatus.Terminating);
+ monitor.terminateAllMembers();
+ }
+ ServiceReferenceHolder.getInstance().getClusterStatusProcessorChain().
+ process("", clusterId, instanceId);
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ log.info("[ClusterTerminatedEvent] Received: " + event.getClass());
+ ClusterTerminatedEvent clusterTerminatedEvent = (ClusterTerminatedEvent) event;
+ String clusterId = clusterTerminatedEvent.getClusterId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ // if the cluster monitor is null, assume that its termianted
+ ApplicationMonitor appMonitor =
+ AutoscalerContext.getInstance().getAppMonitor(clusterTerminatedEvent.getAppId());
+ if (appMonitor != null) {
+ appMonitor
+ .onChildStatusEvent(new ClusterStatusEvent(ClusterStatus.Terminated, clusterId, null));
+ }
+ return;
+ }
+ //changing the status in the monitor, will notify its parent monitor
+ monitor.setStatus(ClusterStatus.Terminated);
+ //Destroying and Removing the Cluster monitor
+ monitor.destroy();
+ AutoscalerContext.getInstance().removeClusterMonitor(clusterId);
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new MemberReadyToShutdownEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberReadyToShutdownEvent memberReadyToShutdownEvent = (MemberReadyToShutdownEvent) event;
+ String clusterId = memberReadyToShutdownEvent.getClusterId();
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ AbstractClusterMonitor monitor;
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberReadyToShutdownEvent(memberReadyToShutdownEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new MemberStartedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberTerminatedEvent memberTerminatedEvent = (MemberTerminatedEvent) event;
+ String clusterId = memberTerminatedEvent.getClusterId();
+ AbstractClusterMonitor monitor;
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberTerminatedEvent(memberTerminatedEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+ String clusterId = memberActivatedEvent.getClusterId();
+ AbstractClusterMonitor monitor;
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberActivatedEvent(memberActivatedEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new MemberMaintenanceListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ MemberMaintenanceModeEvent maintenanceModeEvent = (MemberMaintenanceModeEvent) event;
+ String clusterId = maintenanceModeEvent.getClusterId();
+ AbstractClusterMonitor monitor;
+ AutoscalerContext asCtx = AutoscalerContext.getInstance();
+ monitor = asCtx.getClusterMonitor(clusterId);
+ if (null == monitor) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("A cluster monitor is not found in autoscaler context "
+ + "[cluster] %s", clusterId));
+ }
+ return;
+ }
+ monitor.handleMemberMaintenanceModeEvent(maintenanceModeEvent);
+ } catch (Exception e) {
+ String msg = "Error processing event " + e.getLocalizedMessage();
+ log.error(msg, e);
+ }
+ }
+ });
+
+ topologyEventReceiver.addEventListener(new ClusterInstanceCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+
+ ClusterInstanceCreatedEvent clusterInstanceCreatedEvent =
+ (ClusterInstanceCreatedEvent) event;
+ AbstractClusterMonitor clusterMonitor = AutoscalerContext.getInstance().
+ getClusterMonitor(clusterInstanceCreatedEvent.getClusterId());
+
+ if (clusterMonitor != null) {
+ TopologyManager.acquireReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+
+ try {
+ Service service = TopologyManager.getTopology().
+ getService(clusterInstanceCreatedEvent.getServiceName());
+
+ if (service != null) {
+ Cluster cluster = service.getCluster(clusterInstanceCreatedEvent.getClusterId());
+ if (cluster != null) {
+ // create and add Cluster Context
+ try {
+ if (cluster.isKubernetesCluster()) {
+ clusterMonitor.addClusterContextForInstance(
+ clusterInstanceCreatedEvent.getInstanceId(),
+ ClusterContextFactory.getKubernetesClusterContext(cluster));
+ } else if (cluster.isLbCluster()) {
+ clusterMonitor.addClusterContextForInstance(
+ clusterInstanceCreatedEvent.getInstanceId(),
+ ClusterContextFactory.getVMLBClusterContext(cluster));
+ } else {
+ clusterMonitor.addClusterContextForInstance(
+ clusterInstanceCreatedEvent.getInstanceId(),
+ ClusterContextFactory.getVMServiceClusterContext(cluster));
+ }
+
+ if (clusterMonitor.hasMonitoringStarted().compareAndSet(false, true)) {
+ clusterMonitor.startScheduler();
+ log.info("Monitoring task for Cluster Monitor with cluster id " +
+ clusterInstanceCreatedEvent.getClusterId() + " started successfully");
+ }
+
+ } catch (PolicyValidationException e) {
+ log.error(e.getMessage(), e);
+ } catch (PartitionValidationException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ } else {
+ log.error("Cluster not found for " + clusterInstanceCreatedEvent.getClusterId() +
+ ", no cluster instance added to ClusterMonitor " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+ } else {
+ log.error("Service " + clusterInstanceCreatedEvent.getServiceName() +
+ " not found, no cluster instance added to ClusterMonitor " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } finally {
+ TopologyManager.releaseReadLockForCluster(clusterInstanceCreatedEvent.getServiceName(),
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+
+ } else {
+ log.error("No Cluster Monitor found for cluster id " +
+ clusterInstanceCreatedEvent.getClusterId());
+ }
+ }
+ });
+ }
+
+ /**
+ * Terminate load balancer topology receiver thread.
+ */
+ public void terminate() {
+ topologyEventReceiver.terminate();
+ terminated = true;
+ }
+
+ protected synchronized void startApplicationMonitor(String applicationId) {
+ Thread th = null;
+ if (AutoscalerContext.getInstance().getAppMonitor(applicationId) == null) {
+ th = new Thread(new ApplicationMonitorAdder(applicationId));
+ }
+ if (th != null) {
+ th.start();
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug(String
+ .format("Application monitor thread already exists: " +
+ "[application] %s ", applicationId));
+ }
+ }
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
public ExecutorService getExecutorService() {
return executorService;
@@ -531,4 +987,65 @@ public class AutoscalerTopologyEventReceiver{
public void setExecutorService(ExecutorService executorService) {
this.executorService = executorService;
}
+<<<<<<< HEAD
+=======
+
+ private class ApplicationMonitorAdder implements Runnable {
+ private String appId;
+
+ public ApplicationMonitorAdder(String appId) {
+ this.appId = appId;
+ }
+
+ public void run() {
+ ApplicationMonitor applicationMonitor = null;
+ int retries = 5;
+ boolean success = false;
+ do {
+ try {
+ Thread.sleep(5000);
+ } catch (InterruptedException e1) {
+ }
+ try {
+ long start = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug("application monitor is going to be started for [application] " +
+ appId);
+ }
+ try {
+ applicationMonitor = MonitorFactory.getApplicationMonitor(appId);
+ } catch (PolicyValidationException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ long end = System.currentTimeMillis();
+ log.info("Time taken to start app monitor: " + (end - start) / 1000);
+ success = true;
+ } catch (DependencyBuilderException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ } catch (TopologyInConsistentException e) {
+ String msg = "Application monitor creation failed for Application: ";
+ log.warn(msg, e);
+ retries--;
+ }
+ } while (!success && retries != 0);
+
+ if (applicationMonitor == null) {
+ String msg = "Application monitor creation failed, even after retrying for 5 times, "
+ + "for Application: " + appId;
+ log.error(msg);
+ throw new RuntimeException(msg);
+ }
+
+ AutoscalerContext.getInstance().addAppMonitor(applicationMonitor);
+ if (log.isInfoEnabled()) {
+ log.info(String.format("Application monitor has been added successfully: " +
+ "[application] %s", applicationMonitor.getId()));
+ }
+ }
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
index 3694066..2e443de 100644
--- a/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
+++ b/components/org.apache.stratos.autoscaler/src/main/java/org/apache/stratos/autoscaler/internal/AutoscalerServerComponent.java
@@ -65,16 +65,16 @@ public class AutoscalerServerComponent {
private static final String THREAD_IDENTIFIER_KEY = "threadPool.autoscaler.identifier";
private static final String DEFAULT_IDENTIFIER = "Auto-Scaler";
- private static final String THREAD_POOL_SIZE_KEY= "threadPool.autoscaler.threadPoolSize";
+ private static final String THREAD_POOL_SIZE_KEY = "threadPool.autoscaler.threadPoolSize";
private static final String COMPONENTS_CONFIG = "components-config";
private static final int THREAD_POOL_SIZE = 10;
private static final Log log = LogFactory.getLog(AutoscalerServerComponent.class);
-
private AutoscalerTopologyEventReceiver asTopologyReceiver;
- private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
+ private AutoscalerHealthStatEventReceiver autoscalerHealthStatEventReceiver;
protected void activate(ComponentContext componentContext) throws Exception {
+<<<<<<< HEAD
try {
// Start topology receiver
XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
@@ -197,3 +197,128 @@ public class AutoscalerServerComponent {
ServiceReferenceHolder.getInstance().setTaskService(null);
}
}
+=======
+ try {
+ // Start topology receiver
+ XMLConfiguration conf = ConfUtil.getInstance(COMPONENTS_CONFIG).getConfiguration();
+ int threadPoolSize = conf.getInt(THREAD_POOL_SIZE_KEY, THREAD_POOL_SIZE);
+ String threadIdentifier = conf.getString(THREAD_IDENTIFIER_KEY, DEFAULT_IDENTIFIER);
+ ExecutorService executorService = StratosThreadPool.getExecutorService(threadIdentifier, threadPoolSize);
+ asTopologyReceiver = new AutoscalerTopologyEventReceiver();
+ asTopologyReceiver.setExecutorService(executorService);
+ asTopologyReceiver.execute();
+
+ if (log.isDebugEnabled()) {
+ log.debug("Topology receiver executor service started");
+ }
+
+ // Start health stat receiver
+ autoscalerHealthStatEventReceiver = new AutoscalerHealthStatEventReceiver();
+ Thread healthDelegatorThread = new Thread(autoscalerHealthStatEventReceiver);
+ healthDelegatorThread.start();
+ if (log.isDebugEnabled()) {
+ log.debug("Health statistics receiver thread started");
+ }
+
+ // Adding the registry stored partitions to the information model
+ List<Partition> partitions = RegistryManager.getInstance().retrievePartitions();
+ Iterator<Partition> partitionIterator = partitions.iterator();
+ while (partitionIterator.hasNext()) {
+ Partition partition = partitionIterator.next();
+ PartitionManager.getInstance().addPartitionToInformationModel(partition);
+ }
+
+ // Adding the network partitions stored in registry to the information model
+ List<NetworkPartitionLbHolder> nwPartitionHolders =
+ RegistryManager.getInstance().retrieveNetworkPartitionLbHolders();
+ Iterator<NetworkPartitionLbHolder> nwPartitionIterator = nwPartitionHolders.iterator();
+ while (nwPartitionIterator.hasNext()) {
+ NetworkPartitionLbHolder nwPartition = nwPartitionIterator.next();
+ PartitionManager.getInstance().addNetworkPartitionLbHolder(nwPartition);
+ }
+
+ List<AutoscalePolicy> asPolicies = RegistryManager.getInstance().retrieveASPolicies();
+ Iterator<AutoscalePolicy> asPolicyIterator = asPolicies.iterator();
+ while (asPolicyIterator.hasNext()) {
+ AutoscalePolicy asPolicy = asPolicyIterator.next();
+ PolicyManager.getInstance().addASPolicyToInformationModel(asPolicy);
+ }
+
+ List<DeploymentPolicy> depPolicies = RegistryManager.getInstance().retrieveDeploymentPolicies();
+ Iterator<DeploymentPolicy> depPolicyIterator = depPolicies.iterator();
+ while (depPolicyIterator.hasNext()) {
+ DeploymentPolicy depPolicy = depPolicyIterator.next();
+ PolicyManager.getInstance().addDeploymentPolicyToInformationModel(depPolicy);
+ }
+
+ // Adding KubernetesGroups stored in registry to the information model
+ List<KubernetesGroup> kubernetesGroupList = RegistryManager.getInstance().retrieveKubernetesGroups();
+ Iterator<KubernetesGroup> kubernetesGroupIterator = kubernetesGroupList.iterator();
+ while (kubernetesGroupIterator.hasNext()) {
+ KubernetesGroup kubernetesGroup = kubernetesGroupIterator.next();
+ KubernetesManager.getInstance().addNewKubernetesGroup(kubernetesGroup);
+ }
+
+ //starting the processor chain
+ ClusterStatusProcessorChain clusterStatusProcessorChain = new ClusterStatusProcessorChain();
+ ServiceReferenceHolder.getInstance().setClusterStatusProcessorChain(clusterStatusProcessorChain);
+
+ GroupStatusProcessorChain groupStatusProcessorChain = new GroupStatusProcessorChain();
+ ServiceReferenceHolder.getInstance().setGroupStatusProcessorChain(groupStatusProcessorChain);
+
+ if (log.isInfoEnabled()) {
+ log.info("Scheduling tasks to publish applications");
+ }
+
+ ApplicationSynchronizerTaskScheduler
+ .schedule(ServiceReferenceHolder.getInstance()
+ .getTaskService());
+
+ if (log.isInfoEnabled()) {
+ log.info("Autoscaler server Component activated");
+ }
+ } catch (Throwable e) {
+ log.error("Error in activating the autoscaler component ", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext context) {
+ asTopologyReceiver.terminate();
+ autoscalerHealthStatEventReceiver.terminate();
+ }
+
+ protected void setRegistryService(RegistryService registryService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting the Registry Service");
+ }
+ try {
+ ServiceReferenceHolder.getInstance().setRegistry(registryService.getGovernanceSystemRegistry());
+ } catch (RegistryException e) {
+ String msg = "Failed when retrieving Governance System Registry.";
+ log.error(msg, e);
+ throw new AutoScalerException(msg, e);
+ }
+ }
+
+ protected void unsetRegistryService(RegistryService registryService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Un-setting the Registry Service");
+ }
+ ServiceReferenceHolder.getInstance().setRegistry(null);
+ }
+
+ protected void setTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(taskService);
+ }
+
+ protected void unsetTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Un-setting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(null);
+ }
+}
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
index a47e924..dfbf1ec 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/internal/CloudControllerServiceComponent.java
@@ -20,9 +20,12 @@ package org.apache.stratos.cloud.controller.internal;
*
*/
+<<<<<<< HEAD
import com.hazelcast.core.HazelcastInstance;
+=======
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.cloud.controller.context.CloudControllerContext;
@@ -49,6 +52,7 @@ import org.wso2.carbon.utils.ConfigurationContextService;
* Registering Cloud Controller Service.
*
* @scr.component name="org.apache.stratos.cloud.controller" immediate="true"
+<<<<<<< HEAD
* @scr.reference name="distributedObjectProvider" interface="org.apache.stratos.common.clustering.DistributedObjectProvider"
* cardinality="1..1" policy="dynamic" bind="setDistributedObjectProvider" unbind="unsetDistributedObjectProvider"
* @scr.reference name="ntask.component" interface="org.wso2.carbon.ntask.core.service.TaskService"
@@ -57,42 +61,72 @@ import org.wso2.carbon.utils.ConfigurationContextService;
* cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
* @scr.reference name="config.context.service" interface="org.wso2.carbon.utils.ConfigurationContextService"
* cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+=======
+ * @scr.reference name="distributedMapProvider" interface="org.wso2.carbon.caching.impl.DistributedMapProvider"
+ * cardinality="1..1" policy="dynamic" bind="setDistributedMapProvider" unbind="unsetDistributedMapProvider"
+ * @scr.reference name="ntask.component"
+ * interface="org.wso2.carbon.ntask.core.service.TaskService"
+ * cardinality="1..1" policy="dynamic" bind="setTaskService" unbind="unsetTaskService"
+ * @scr.reference name="registry.service"
+ * interface="org.wso2.carbon.registry.core.service.RegistryService"
+ * cardinality="1..1" policy="dynamic" bind="setRegistryService" unbind="unsetRegistryService"
+ * @scr.reference name="config.context.service"
+ * interface="org.wso2.carbon.utils.ConfigurationContextService"
+ * cardinality="1..1" policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
*/
public class CloudControllerServiceComponent {
- private static final Log log = LogFactory.getLog(CloudControllerServiceComponent.class);
- private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
- private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
- private ApplicationTopicReceiver applicationTopicReceiver;
+ private static final Log log = LogFactory.getLog(CloudControllerServiceComponent.class);
+ private ClusterStatusTopicReceiver clusterStatusTopicReceiver;
+ private InstanceStatusTopicReceiver instanceStatusTopicReceiver;
+ private ApplicationTopicReceiver applicationTopicReceiver;
- protected void activate(ComponentContext context) {
- try {
- applicationTopicReceiver = new ApplicationTopicReceiver();
- applicationTopicReceiver.execute();
+ protected void activate(ComponentContext context) {
+ try {
+ applicationTopicReceiver = new ApplicationTopicReceiver();
+ applicationTopicReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Application Receiver thread started");
+ }
+
+<<<<<<< HEAD
if (log.isInfoEnabled()) {
log.info("Application event receiver thread started");
}
+=======
+ clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
+ clusterStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
- clusterStatusTopicReceiver = new ClusterStatusTopicReceiver();
- clusterStatusTopicReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cluster status Receiver thread started");
+ }
+<<<<<<< HEAD
if (log.isInfoEnabled()) {
log.info("Cluster status receiver thread started");
}
+=======
+ instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
+ instanceStatusTopicReceiver.execute();
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
- instanceStatusTopicReceiver = new InstanceStatusTopicReceiver();
- instanceStatusTopicReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Instance status message receiver thread started");
+ }
- if(log.isInfoEnabled()) {
- log.info("Instance status message receiver thread started");
- }
+ // Register cloud controller service
+ BundleContext bundleContext = context.getBundleContext();
+ bundleContext.registerService(CloudControllerService.class.getName(),
+ new CloudControllerServiceImpl(), null);
- // Register cloud controller service
- BundleContext bundleContext = context.getBundleContext();
- bundleContext.registerService(CloudControllerService.class.getName(),
- new CloudControllerServiceImpl(), null);
+ if (log.isInfoEnabled()) {
+ log.info("Scheduling tasks");
+ }
+<<<<<<< HEAD
if(log.isInfoEnabled()) {
log.info("Scheduling tasks");
}
@@ -123,29 +157,71 @@ public class CloudControllerServiceComponent {
ServiceReferenceHolder.getInstance().setTaskService(null);
}
+=======
+ TopologySynchronizerTaskScheduler
+ .schedule(ServiceReferenceHolder.getInstance()
+ .getTaskService());
+
+ } catch (Throwable e) {
+ log.error("******* Cloud Controller Service bundle is failed to activate ****", e);
+ }
+ }
+
+ protected void setTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Setting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(taskService);
+ }
+
+ protected void unsetTaskService(TaskService taskService) {
+ if (log.isDebugEnabled()) {
+ log.debug("Unsetting the Task Service");
+ }
+ ServiceReferenceHolder.getInstance().setTaskService(null);
+ }
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
protected void setRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
log.debug("Setting the Registry Service");
}
-
- try {
+
+ try {
UserRegistry registry = registryService.getGovernanceSystemRegistry();
+<<<<<<< HEAD
ServiceReferenceHolder.getInstance().setRegistry(registry);
} catch (RegistryException e) {
String msg = "Failed when retrieving Governance System Registry.";
log.error(msg, e);
throw new CloudControllerException(msg, e);
}
+=======
+ ServiceReferenceHolder.getInstance()
+ .setRegistry(registry);
+ } catch (RegistryException e) {
+ String msg = "Failed when retrieving Governance System Registry.";
+ log.error(msg, e);
+ throw new CloudControllerException(msg, e);
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
protected void unsetRegistryService(RegistryService registryService) {
if (log.isDebugEnabled()) {
+<<<<<<< HEAD
log.debug("Un-setting the Registry Service");
}
ServiceReferenceHolder.getInstance().setRegistry(null);
+=======
+ log.debug("Unsetting the Registry Service");
+ }
+ ServiceReferenceHolder.getInstance().setRegistry(null);
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
-
+
protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
+<<<<<<< HEAD
ServiceReferenceHolder.getInstance().setAxisConfiguration(
cfgCtxService.getServerConfigContext().getAxisConfiguration());
}
@@ -162,8 +238,26 @@ public class CloudControllerServiceComponent {
ServiceReferenceHolder.getInstance().setDistributedObjectProvider(null);
}
+=======
+ ServiceReferenceHolder.getInstance().setAxisConfiguration(
+ cfgCtxService.getServerConfigContext().getAxisConfiguration());
+ }
+
+ protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
+ ServiceReferenceHolder.getInstance().setAxisConfiguration(null);
+ }
+
+ protected void setDistributedMapProvider(DistributedMapProvider mapProvider) {
+ ServiceReferenceHolder.getInstance().setDistributedMapProvider(mapProvider);
+ }
+
+ protected void unsetDistributedMapProvider(DistributedMapProvider mapProvider) {
+ ServiceReferenceHolder.getInstance().setDistributedMapProvider(null);
+ }
+
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
protected void deactivate(ComponentContext ctx) {
- // Close event publisher connections to message broker
- EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
+ // Close event publisher connections to message broker
+ EventPublisherPool.close(Util.Topics.TOPOLOGY_TOPIC.getTopicName());
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
index 87dfe6e..d65b7f5 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/application/ApplicationTopicReceiver.java
@@ -29,45 +29,44 @@ import org.apache.stratos.messaging.message.receiver.applications.ApplicationsEv
/**
* This is to receive the application topic messages.
*/
-public class ApplicationTopicReceiver{
- private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class);
- private ApplicationsEventReceiver applicationsEventReceiver;
- private boolean terminated;
+public class ApplicationTopicReceiver {
+ private static final Log log = LogFactory.getLog(ApplicationTopicReceiver.class);
+ private ApplicationsEventReceiver applicationsEventReceiver;
+ private boolean terminated;
- public ApplicationTopicReceiver() {
- this.applicationsEventReceiver = new ApplicationsEventReceiver();
- addEventListeners();
+ public ApplicationTopicReceiver() {
+ this.applicationsEventReceiver = new ApplicationsEventReceiver();
+ addEventListeners();
- }
+ }
-
+ public void execute() {
- public void execute() {
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread started");
+ }
+ applicationsEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread started");
- }
- applicationsEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread terminated");
+ }
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread terminated");
- }
+ }
- }
- private void addEventListeners() {
- applicationsEventReceiver.addEventListener(new ApplicationTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- //Remove the application related data
- ApplicationTerminatedEvent terminatedEvent = (ApplicationTerminatedEvent)event;
- log.info("ApplicationTerminatedEvent received for [application] " + terminatedEvent.getAppId());
- String appId = terminatedEvent.getAppId();
- TopologyBuilder.handleApplicationClustersRemoved(appId, terminatedEvent.getClusterData());
- }
- });
- }
+ private void addEventListeners() {
+ applicationsEventReceiver.addEventListener(new ApplicationTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ //Remove the application related data
+ ApplicationTerminatedEvent terminatedEvent = (ApplicationTerminatedEvent) event;
+ log.info("ApplicationTerminatedEvent received for [application] " + terminatedEvent.getAppId());
+ String appId = terminatedEvent.getAppId();
+ TopologyBuilder.handleApplicationClustersRemoved(appId, terminatedEvent.getClusterData());
+ }
+ });
+ }
- public void setTerminated(boolean terminated) {
- this.terminated = terminated;
- }
+ public void setTerminated(boolean terminated) {
+ this.terminated = terminated;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
index bd2fbf0..ca6d4ad 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/cluster/status/ClusterStatusTopicReceiver.java
@@ -26,16 +26,16 @@ import org.apache.stratos.messaging.event.cluster.status.*;
import org.apache.stratos.messaging.listener.cluster.status.*;
import org.apache.stratos.messaging.message.receiver.cluster.status.ClusterStatusEventReceiver;
-public class ClusterStatusTopicReceiver{
- private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
+public class ClusterStatusTopicReceiver {
+ private static final Log log = LogFactory.getLog(ClusterStatusTopicReceiver.class);
- private ClusterStatusEventReceiver statusEventReceiver;
- private boolean terminated;
+ private ClusterStatusEventReceiver statusEventReceiver;
+ private boolean terminated;
- public ClusterStatusTopicReceiver() {
- this.statusEventReceiver = new ClusterStatusEventReceiver();
- addEventListeners();
- }
+ public ClusterStatusTopicReceiver() {
+ this.statusEventReceiver = new ClusterStatusEventReceiver();
+ addEventListeners();
+ }
public void execute() {
@@ -47,58 +47,58 @@ public class ClusterStatusTopicReceiver{
}
private void addEventListeners() {
- // Listen to topology events that affect clusters
- statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
- }
- });
+ // Listen to topology events that affect clusters
+ statusEventReceiver.addEventListener(new ClusterStatusClusterResetEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleClusterReset((ClusterStatusClusterResetEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- //TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
- }
- });
+ statusEventReceiver.addEventListener(new ClusterStatusClusterInstanceCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ //TopologyBuilder.handleClusterInstanceCreated((ClusterStatusClusterInstanceCreatedEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new ClusterStatusClusterCreatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
- }
- });
+ statusEventReceiver.addEventListener(new ClusterStatusClusterCreatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleClusterCreated((ClusterStatusClusterCreatedEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event);
- }
- });
+ statusEventReceiver.addEventListener(new ClusterStatusClusterActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleClusterActivatedEvent((ClusterStatusClusterActivatedEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event);
- }
- });
+ statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleClusterTerminatedEvent((ClusterStatusClusterTerminatedEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event);
- }
- });
+ statusEventReceiver.addEventListener(new ClusterStatusClusterTerminatingEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleClusterTerminatingEvent((ClusterStatusClusterTerminatingEvent) event);
+ }
+ });
- statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent) event);
- }
- });
- }
+ statusEventReceiver.addEventListener(new ClusterStatusClusterInactivateEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleClusterInActivateEvent((ClusterStatusClusterInactivateEvent) event);
+ }
+ });
+ }
- public void setTerminated(boolean terminated) {
- this.terminated = terminated;
- }
+ public void setTerminated(boolean terminated) {
+ this.terminated = terminated;
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
index d5475f0..42aabed 100644
--- a/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
+++ b/components/org.apache.stratos.cloud.controller/src/main/java/org/apache/stratos/cloud/controller/messaging/receiver/instance/status/InstanceStatusTopicReceiver.java
@@ -35,71 +35,67 @@ import org.apache.stratos.messaging.message.receiver.instance.status.InstanceSta
/**
* This will handle the instance status events
*/
-public class InstanceStatusTopicReceiver{
- private static final Log log = LogFactory.getLog(InstanceStatusTopicReceiver.class);
-
- private InstanceStatusEventReceiver statusEventReceiver;
- private boolean terminated;
-
- public InstanceStatusTopicReceiver() {
- this.statusEventReceiver = new InstanceStatusEventReceiver();
- addEventListeners();
- }
-
-
-
- public void execute() {
- statusEventReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread started");
- }
-
-
- if (log.isInfoEnabled()) {
- log.info("Cloud controller application status thread terminated");
- }
- }
-
- private void addEventListeners() {
- statusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event);
- }
- });
-
- statusEventReceiver.addEventListener(new InstanceStartedEventListener() {
- @Override
- protected void onEvent(Event event) {
- TopologyBuilder.handleMemberStarted((InstanceStartedEvent) event);
- }
- });
-
- statusEventReceiver.addEventListener(new InstanceReadyToShutdownEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) event);
- } catch (Exception e) {
- String error = "Failed to retrieve the instance status event message";
- log.error(error, e);
- }
- }
- });
-
- statusEventReceiver.addEventListener(new InstanceMaintenanceListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
- } catch (Exception e) {
- String error = "Failed to retrieve the instance status event message";
- log.error(error, e);
- }
- }
- });
-
-
- }
+public class InstanceStatusTopicReceiver {
+ private static final Log log = LogFactory.getLog(InstanceStatusTopicReceiver.class);
+
+ private InstanceStatusEventReceiver statusEventReceiver;
+ private boolean terminated;
+
+ public InstanceStatusTopicReceiver() {
+ this.statusEventReceiver = new InstanceStatusEventReceiver();
+ addEventListeners();
+ }
+
+ public void execute() {
+ statusEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread started");
+ }
+
+ if (log.isInfoEnabled()) {
+ log.info("Cloud controller application status thread terminated");
+ }
+ }
+
+ private void addEventListeners() {
+ statusEventReceiver.addEventListener(new InstanceActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleMemberActivated((InstanceActivatedEvent) event);
+ }
+ });
+
+ statusEventReceiver.addEventListener(new InstanceStartedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TopologyBuilder.handleMemberStarted((InstanceStartedEvent) event);
+ }
+ });
+
+ statusEventReceiver.addEventListener(new InstanceReadyToShutdownEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ TopologyBuilder.handleMemberReadyToShutdown((InstanceReadyToShutdownEvent) event);
+ } catch (Exception e) {
+ String error = "Failed to retrieve the instance status event message";
+ log.error(error, e);
+ }
+ }
+ });
+
+ statusEventReceiver.addEventListener(new InstanceMaintenanceListener() {
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ TopologyBuilder.handleMemberMaintenance((InstanceMaintenanceModeEvent) event);
+ } catch (Exception e) {
+ String error = "Failed to retrieve the instance status event message";
+ log.error(error, e);
+ }
+ }
+ });
+
+ }
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index e74721f..188b2ac 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -35,136 +35,134 @@ import java.util.concurrent.ExecutorService;
* received from the message broker.
*/
public class LoadBalancerExtension implements Runnable {
- private static final Log log = LogFactory.getLog(LoadBalancerExtension.class);
-
- private LoadBalancer loadBalancer;
- private LoadBalancerStatisticsReader statsReader;
- private boolean loadBalancerStarted;
- private TopologyEventReceiver topologyEventReceiver;
- private LoadBalancerStatisticsNotifier statisticsNotifier;
- private boolean terminated;
+ private static final Log log = LogFactory.getLog(LoadBalancerExtension.class);
+
+ private LoadBalancer loadBalancer;
+ private LoadBalancerStatisticsReader statsReader;
+ private boolean loadBalancerStarted;
+ private TopologyEventReceiver topologyEventReceiver;
+ private LoadBalancerStatisticsNotifier statisticsNotifier;
+ private boolean terminated;
private ExecutorService executorService;
- /**
- * Load balancer extension constructor.
- * @param loadBalancer Load balancer instance: Mandatory.
- * @param statsReader Statistics reader: If null statistics notifier thread will not be started.
- */
- public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) {
- this.loadBalancer = loadBalancer;
- this.statsReader = statsReader;
- }
-
- @Override
- public void run() {
- try {
- if(log.isInfoEnabled()) {
- log.info("Load balancer extension started");
- }
-
- // Start topology receiver thread
- topologyEventReceiver = new TopologyEventReceiver();
- addEventListeners();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
-
-
- if(statsReader != null) {
- // Start stats notifier thread
- statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
- Thread statsNotifierThread = new Thread(statisticsNotifier);
- statsNotifierThread.start();
- }
- else {
- if(log.isWarnEnabled()) {
- log.warn("Load balancer statistics reader not found");
- }
- }
-
- // Keep the thread live until terminated
- while (!terminated);
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not start load balancer extension", e);
- }
- }
- }
-
- private void addEventListeners() {
- topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
-
- @Override
- protected void onEvent(Event event) {
- try {
-
- if (!loadBalancerStarted) {
- // Configure load balancer
- loadBalancer.configure(TopologyManager.getTopology());
-
- // Start load balancer
- loadBalancer.start();
- loadBalancerStarted = true;
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not start load balancer", e);
- }
- terminate();
- }
- }
- });
- topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
- }
-
- private void reloadConfiguration() {
- try {
- if (loadBalancerStarted) {
- loadBalancer.reload(TopologyManager.getTopology());
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not reload load balancer configuration", e);
- }
- }
- }
-
- public void terminate() {
- if(topologyEventReceiver != null) {
- topologyEventReceiver.terminate();
- }
- if(statisticsNotifier != null) {
- statisticsNotifier.terminate();
- }
- terminated = true;
- }
+
+ /**
+ * Load balancer extension constructor.
+ *
+ * @param loadBalancer Load balancer instance: Mandatory.
+ * @param statsReader Statistics reader: If null statistics notifier thread will not be started.
+ */
+ public LoadBalancerExtension(LoadBalancer loadBalancer, LoadBalancerStatisticsReader statsReader) {
+ this.loadBalancer = loadBalancer;
+ this.statsReader = statsReader;
+ }
+
+ @Override
+ public void run() {
+ try {
+ if (log.isInfoEnabled()) {
+ log.info("Load balancer extension started");
+ }
+
+ // Start topology receiver thread
+ topologyEventReceiver = new TopologyEventReceiver();
+ addEventListeners();
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+
+ if (statsReader != null) {
+ // Start stats notifier thread
+ statisticsNotifier = new LoadBalancerStatisticsNotifier(statsReader);
+ Thread statsNotifierThread = new Thread(statisticsNotifier);
+ statsNotifierThread.start();
+ } else {
+ if (log.isWarnEnabled()) {
+ log.warn("Load balancer statistics reader not found");
+ }
+ }
+
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not start load balancer extension", e);
+ }
+ }
+ }
+
+ private void addEventListeners() {
+ topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
+
+ @Override
+ protected void onEvent(Event event) {
+ try {
+
+ if (!loadBalancerStarted) {
+ // Configure load balancer
+ loadBalancer.configure(TopologyManager.getTopology());
+
+ // Start load balancer
+ loadBalancer.start();
+ loadBalancerStarted = true;
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not start load balancer", e);
+ }
+ terminate();
+ }
+ }
+ });
+ topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
+ }
+
+ private void reloadConfiguration() {
+ try {
+ if (loadBalancerStarted) {
+ loadBalancer.reload(TopologyManager.getTopology());
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not reload load balancer configuration", e);
+ }
+ }
+ }
+
+ public void terminate() {
+ if (topologyEventReceiver != null) {
+ topologyEventReceiver.terminate();
+ }
+ if (statisticsNotifier != null) {
+ statisticsNotifier.terminate();
+ }
+ terminated = true;
+ }
public ExecutorService getExecutorService() {
return executorService;
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
----------------------------------------------------------------------
diff --git a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
index 56a3fcf..5ee28d1 100644
--- a/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
+++ b/extensions/cep/stratos-cep-extension/src/main/java/org/apache/stratos/cep/extension/FaultHandlingWindowProcessor.java
@@ -58,53 +58,56 @@ import java.util.concurrent.TimeUnit;
@SiddhiExtension(namespace = "stratos", function = "faultHandling")
public class FaultHandlingWindowProcessor extends WindowProcessor implements RunnableWindowProcessor {
- private static final int TIME_OUT = 60 * 1000;
- static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
- private ScheduledExecutorService faultHandleScheduler;
- private ThreadBarrier threadBarrier;
- private long timeToKeep;
- private ISchedulerSiddhiQueue<StreamEvent> window;
- private EventPublisher healthStatPublisher = EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
- private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
- private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
-
- // Map of member id's to their last received health event time stamp
- private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
-
- // Event receiver to receive topology events published by cloud-controller
- private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
-
- // Stratos member id attribute index in stream execution plan
- private int memberIdAttrIndex;
-
- @Override
- protected void processEvent(InEvent event) {
- addDataToMap(event);
- }
-
- @Override
- protected void processEvent(InListEvent listEvent) {
- for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
- addDataToMap((InEvent) listEvent.getEvent(i));
- }
- }
-
- /**
- * Add new entry to time stamp map from the received event.
- *
- * @param event Event received by Siddhi.
- */
- protected void addDataToMap(InEvent event) {
- String id = (String) event.getData()[memberIdAttrIndex];
- //checking whether this member is the topology.
- //sometimes there can be a delay between publishing member terminated events
- //and actually terminating instances. Hence CEP might get events for already terminated members
- //so we are checking the topology for the member existence
- Member member = getMemberFromId(id);
- if (null == member) {
+ private static final int TIME_OUT = 60 * 1000;
+ static final Logger log = Logger.getLogger(FaultHandlingWindowProcessor.class);
+ public static final String IDENTIFIER = "AutoScaler";
+ private ScheduledExecutorService faultHandleScheduler;
+ private ThreadBarrier threadBarrier;
+ private long timeToKeep;
+ private ISchedulerSiddhiQueue<StreamEvent> window;
+ private EventPublisher healthStatPublisher =
+ EventPublisherPool.getPublisher(Util.Topics.HEALTH_STAT_TOPIC.getTopicName());
+ private Map<String, Object> MemberFaultEventMap = new HashMap<String, Object>();
+ private Map<String, Object> memberFaultEventMessageMap = new HashMap<String, Object>();
+
+ // Map of member id's to their last received health event time stamp
+ private ConcurrentHashMap<String, Long> memberTimeStampMap = new ConcurrentHashMap<String, Long>();
+
+ // Event receiver to receive topology events published by cloud-controller
+ private CEPTopologyEventReceiver cepTopologyEventReceiver = new CEPTopologyEventReceiver(this);
+
+ // Stratos member id attribute index in stream execution plan
+ private int memberIdAttrIndex;
+
+ @Override
+ protected void processEvent(InEvent event) {
+ addDataToMap(event);
+ }
+
+ @Override
+ protected void processEvent(InListEvent listEvent) {
+ for (int i = 0, size = listEvent.getActiveEvents(); i < size; i++) {
+ addDataToMap((InEvent) listEvent.getEvent(i));
+ }
+ }
+
+ /**
+ * Add new entry to time stamp map from the received event.
+ *
+ * @param event Event received by Siddhi.
+ */
+ protected void addDataToMap(InEvent event) {
+ String id = (String) event.getData()[memberIdAttrIndex];
+ //checking whether this member is the topology.
+ //sometimes there can be a delay between publishing member terminated events
+ //and actually terminating instances. Hence CEP might get events for already terminated members
+ //so we are checking the topology for the member existence
+ Member member = getMemberFromId(id);
+ if (null == member) {
log.debug("Member not found in the toplogy. Event rejected");
return;
}
+<<<<<<< HEAD
if (StringUtils.isNotEmpty(id)) {
memberTimeStampMap.put(id, event.getTimeStamp());
} else {
@@ -317,4 +320,220 @@ public class FaultHandlingWindowProcessor extends WindowProcessor implements Run
public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
return memberTimeStampMap;
}
+=======
+ if (StringUtils.isNotEmpty(id)) {
+ memberTimeStampMap.put(id, event.getTimeStamp());
+ } else {
+ log.warn("NULL member id found in the event received. Event rejected.");
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Event received from [member-id] " + id + " [time-stamp] " + event.getTimeStamp());
+ }
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator() {
+ return window.iterator();
+ }
+
+ @Override
+ public Iterator<StreamEvent> iterator(String predicate) {
+ if (siddhiContext.isDistributedProcessingEnabled()) {
+ return ((SchedulerSiddhiQueueGrid<StreamEvent>) window).iterator(predicate);
+ } else {
+ return window.iterator();
+ }
+ }
+
+ /**
+ * Retrieve the current activated members from the topology and initialize the time stamp map.
+ * This will allow the system to recover from a restart
+ *
+ * @param topology Topology model object
+ */
+ boolean loadTimeStampMapFromTopology(Topology topology) {
+
+ long currentTimeStamp = System.currentTimeMillis();
+ if (topology == null || topology.getServices() == null) {
+ return false;
+ }
+ // TODO make this efficient by adding APIs to messaging component
+ for (Service service : topology.getServices()) {
+ if (service.getClusters() != null) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.getMembers() != null) {
+ for (Member member : cluster.getMembers()) {
+ // we are checking faulty status only in previously activated members
+ if (member != null && MemberStatus.Activated.equals(member.getStatus())) {
+ // Initialize the member time stamp map from the topology at the beginning
+ memberTimeStampMap.putIfAbsent(member.getMemberId(), currentTimeStamp);
+ }
+ }
+ }
+ }
+ }
+ }
+
+ log.info("Member time stamp map was successfully loaded from the topology.");
+ if (log.isDebugEnabled()) {
+ log.debug("Member TimeStamp Map: " + memberTimeStampMap);
+ }
+ return true;
+ }
+
+ private Member getMemberFromId(String memberId) {
+ if (StringUtils.isEmpty(memberId)) {
+ return null;
+ }
+ if (TopologyManager.getTopology().isInitialized()) {
+ try {
+ TopologyManager.acquireReadLock();
+ if (TopologyManager.getTopology().getServices() == null) {
+ return null;
+ }
+ // TODO make this efficient by adding APIs to messaging component
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ if (service.getClusters() != null) {
+ for (Cluster cluster : service.getClusters()) {
+ if (cluster.getMembers() != null) {
+ for (Member member : cluster.getMembers()) {
+ if (memberId.equals(member.getMemberId())) {
+ return member;
+ }
+ }
+ }
+ }
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error while reading topology" + e);
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+ return null;
+ }
+
+ private void publishMemberFault(String memberId) {
+ Member member = getMemberFromId(memberId);
+ if (member == null) {
+ log.error("Failed to publish member fault event. Member having [member-id] " + memberId +
+ " does not exist in topology");
+ return;
+ }
+ log.info("Publishing member fault event for [member-id] " + memberId);
+
+ MemberFaultEvent memberFaultEvent =
+ new MemberFaultEvent(member.getClusterId(), member.getInstanceId(), member.getMemberId(),
+ member.getPartitionId(), 0);
+
+ memberFaultEventMessageMap.put("message", memberFaultEvent);
+ healthStatPublisher.publish(MemberFaultEventMap, true);
+ }
+
+ @Override
+ public void run() {
+ try {
+ threadBarrier.pass();
+
+ for (Object o : memberTimeStampMap.entrySet()) {
+ Map.Entry pair = (Map.Entry) o;
+ long currentTime = System.currentTimeMillis();
+ Long eventTimeStamp = (Long) pair.getValue();
+
+ if ((currentTime - eventTimeStamp) > TIME_OUT) {
+ log.info("Faulty member detected [member-id] " + pair.getKey() + " with [last time-stamp] " +
+ eventTimeStamp + " [time-out] " + TIME_OUT + " milliseconds");
+ publishMemberFault((String) pair.getKey());
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Fault handling processor iteration completed with [time-stamp map length] " +
+ memberTimeStampMap.size() + " [time-stamp map] " + memberTimeStampMap);
+ }
+ } catch (Throwable t) {
+ log.error(t.getMessage(), t);
+ } finally {
+ faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ }
+ }
+
+ @Override
+ protected Object[] currentState() {
+ return new Object[] { window.currentState() };
+ }
+
+ @Override
+ protected void restoreState(Object[] data) {
+ window.restoreState(data);
+ window.restoreState((Object[]) data[0]);
+ window.reSchedule();
+ }
+
+ @Override
+ protected void init(Expression[] parameters, QueryPostProcessingElement nextProcessor,
+ AbstractDefinition streamDefinition, String elementId, boolean async,
+ SiddhiContext siddhiContext) {
+
+ if (parameters[0] instanceof IntConstant) {
+ timeToKeep = ((IntConstant) parameters[0]).getValue();
+ } else {
+ timeToKeep = ((LongConstant) parameters[0]).getValue();
+ }
+
+ String memberIdAttrName = ((Variable) parameters[1]).getAttributeName();
+ memberIdAttrIndex = streamDefinition.getAttributePosition(memberIdAttrName);
+
+ if (this.siddhiContext.isDistributedProcessingEnabled()) {
+ window = new SchedulerSiddhiQueueGrid<StreamEvent>(elementId, this, this.siddhiContext, this.async);
+ } else {
+ window = new SchedulerSiddhiQueue<StreamEvent>(this);
+ }
+ MemberFaultEventMap
+ .put("org.apache.stratos.messaging.event.health.stat.MemberFaultEvent", memberFaultEventMessageMap);
+
+ ExecutorService executorService = StratosThreadPool.getExecutorService(IDENTIFIER, 10);
+ cepTopologyEventReceiver.setExecutorService(executorService);
+ executorService.execute(cepTopologyEventReceiver);
+
+ //Ordinary scheduling
+ window.schedule();
+ if (log.isDebugEnabled()) {
+ log.debug("Fault handling window processor initialized with [timeToKeep] " + timeToKeep +
+ ", [memberIdAttrName] " + memberIdAttrName + ", [memberIdAttrIndex] " + memberIdAttrIndex +
+ ", [distributed-enabled] " + this.siddhiContext.isDistributedProcessingEnabled());
+ }
+ }
+
+ @Override
+ public void schedule() {
+ faultHandleScheduler.schedule(this, timeToKeep, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void scheduleNow() {
+ faultHandleScheduler.schedule(this, 0, TimeUnit.MILLISECONDS);
+ }
+
+ @Override
+ public void setScheduledExecutorService(ScheduledExecutorService scheduledExecutorService) {
+ this.faultHandleScheduler = scheduledExecutorService;
+ }
+
+ @Override
+ public void setThreadBarrier(ThreadBarrier threadBarrier) {
+ this.threadBarrier = threadBarrier;
+ }
+
+ @Override
+ public void destroy() {
+ // terminate topology listener thread
+ cepTopologyEventReceiver.terminate();
+ window = null;
+ }
+
+ public ConcurrentHashMap<String, Long> getMemberTimeStampMap() {
+ return memberTimeStampMap;
+ }
+>>>>>>> ddf277b... Remove unnessary threads in messaging model
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/8012f8c8/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
----------------------------------------------------------------------
diff --git a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
index 013aee9..7996672 100644
--- a/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
+++ b/extensions/load-balancer/haproxy-extension/src/main/java/org/apache/stratos/haproxy/extension/Main.java
@@ -31,32 +31,34 @@ import java.util.concurrent.ExecutorService;
* HAProxy extension main class.
*/
public class Main {
- private static final Log log = LogFactory.getLog(Main.class);
+ private static final Log log = LogFactory.getLog(Main.class);
private static ExecutorService executorService;
public static void main(String[] args) {
- LoadBalancerExtension extension = null;
- try {
- // Configure log4j properties
- PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
+ LoadBalancerExtension extension = null;
+ try {
+ // Configure log4j properties
+ PropertyConfigurator.configure(System.getProperty("log4j.properties.file.path"));
- if (log.isInfoEnabled()) {
- log.info("HAProxy extension started");
- }
- executorService = StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
- // Validate runtime parameters
- HAProxyContext.getInstance().validate();
- extension = new LoadBalancerExtension(new HAProxy(), (HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ? new HAProxyStatisticsReader() : null));
- Thread thread = new Thread(extension);
- thread.start();
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error(e);
- }
- if (extension != null) {
- extension.terminate();
- }
- }
- }
+ if (log.isInfoEnabled()) {
+ log.info("HAProxy extension started");
+ }
+ executorService = StratosThreadPool.getExecutorService("Load_Balance_Extension", 10);
+ // Validate runtime parameters
+ HAProxyContext.getInstance().validate();
+ extension = new LoadBalancerExtension(new HAProxy(),
+ (HAProxyContext.getInstance().isCEPStatsPublisherEnabled() ?
+ new HAProxyStatisticsReader() : null));
+ Thread thread = new Thread(extension);
+ thread.start();
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error(e);
+ }
+ if (extension != null) {
+ extension.terminate();
+ }
+ }
+ }
}