You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/03/05 10:40:25 UTC
stratos git commit: Updating load balancer event receivers
Repository: stratos
Updated Branches:
refs/heads/master d8d7ca445 -> bf688e445
Updating load balancer event receivers
Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bf688e44
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bf688e44
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bf688e44
Branch: refs/heads/master
Commit: bf688e4457518fbce44ff6752dff33d1b20b04e9
Parents: d8d7ca4
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Mar 5 15:10:14 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Mar 5 15:10:14 2015 +0530
----------------------------------------------------------------------
.../extension/api/LoadBalancerExtension.java | 158 ++++++++++++-------
.../internal/LoadBalancerServiceComponent.java | 63 ++++----
2 files changed, 139 insertions(+), 82 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf688e44/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 9d82fdb..7818390 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -21,13 +21,17 @@ package org.apache.stratos.load.balancer.extension.api;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonApplicationSignUpEventReceiver;
+import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonDomainMappingEventReceiver;
import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonTopologyEventReceiver;
import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
import java.util.concurrent.ExecutorService;
@@ -41,12 +45,15 @@ public class LoadBalancerExtension {
private LoadBalancer loadBalancer;
private LoadBalancerStatisticsReader statsReader;
private boolean loadBalancerStarted;
- private TopologyProvider topologyProvider;
- private TopologyEventReceiver topologyEventReceiver;
private LoadBalancerStatisticsNotifier statisticsNotifier;
private ExecutorService executorService;
- /**
+ private TopologyProvider topologyProvider;
+ private LoadBalancerCommonTopologyEventReceiver topologyEventReceiver;
+ private LoadBalancerCommonDomainMappingEventReceiver domainMappingEventReceiver;
+ private LoadBalancerCommonApplicationSignUpEventReceiver applicationSignUpEventReceiver;
+
+ /**
* Load balancer extension constructor.
*
* @param loadBalancer Load balancer instance: Mandatory.
@@ -66,10 +73,9 @@ public class LoadBalancerExtension {
}
// Start topology receiver thread
- topologyEventReceiver = new LoadBalancerCommonTopologyEventReceiver(topologyProvider);
- addEventListeners();
- topologyEventReceiver.setExecutorService(executorService);
- topologyEventReceiver.execute();
+ startTopologyEventReceiver(executorService, topologyProvider);
+ startApplicationSignUpEventReceiver(executorService, topologyProvider);
+ startDomainMappingEventReceiver(executorService, topologyProvider);
if (statsReader != null) {
// Start stats notifier thread
@@ -89,58 +95,104 @@ public class LoadBalancerExtension {
}
}
- private void addEventListeners() {
+ private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+
+ topologyEventReceiver = new LoadBalancerCommonTopologyEventReceiver(topologyProvider);
+ addTopologyEventListeners(topologyEventReceiver);
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Topology receiver thread started");
+ }
+
+ if (log.isInfoEnabled()) {
+ if(TopologyServiceFilter.getInstance().isActive()) {
+ log.info(String.format("Service filter activated: [filter] %s",
+ TopologyServiceFilter.getInstance().toString()));
+ }
+
+ if(TopologyClusterFilter.getInstance().isActive()) {
+ log.info(String.format("Cluster filter activated: [filter] %s",
+ TopologyClusterFilter.getInstance().toString()));
+ }
+
+ if(TopologyMemberFilter.getInstance().isActive()) {
+ log.info(String.format("Member filter activated: [filter] %s",
+ TopologyMemberFilter.getInstance().toString()));
+ }
+ }
+ }
+
+ private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ domainMappingEventReceiver = new LoadBalancerCommonDomainMappingEventReceiver(topologyProvider);
+ domainMappingEventReceiver.setExecutorService(executorService);
+ domainMappingEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Domain mapping event receiver thread started");
+ }
+ }
+
+ private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
+ applicationSignUpEventReceiver.setExecutorService(executorService);
+ applicationSignUpEventReceiver.execute();
+ if (log.isInfoEnabled()) {
+ log.info("Application signup event receiver thread started");
+ }
+ }
+
+ private void addTopologyEventListeners(LoadBalancerCommonTopologyEventReceiver topologyEventReceiver) {
topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
- @Override
- protected void onEvent(Event event) {
- try {
- if (!loadBalancerStarted) {
- // Configure load balancer
- loadBalancer.configure(topologyProvider.getTopology());
-
- // Start load balancer
- loadBalancer.start();
- loadBalancerStarted = true;
- }
- } catch (Exception e) {
- if (log.isErrorEnabled()) {
- log.error("Could not start load balancer", e);
- }
- terminate();
- }
- }
- });
+ @Override
+ protected void onEvent(Event event) {
+ try {
+ if (!loadBalancerStarted) {
+ // Configure load balancer
+ loadBalancer.configure(topologyProvider.getTopology());
+
+ // Start load balancer
+ loadBalancer.start();
+ loadBalancerStarted = true;
+ }
+ } catch (Exception e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not start load balancer", e);
+ }
+ terminate();
+ }
+ }
+ });
topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
- @Override
- protected void onEvent(Event event) {
- reloadConfiguration();
- }
- });
+ @Override
+ protected void onEvent(Event event) {
+ reloadConfiguration();
+ }
+ });
}
private void reloadConfiguration() {
http://git-wip-us.apache.org/repos/asf/stratos/blob/bf688e44/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index b391d21..fed6e84 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -42,7 +42,6 @@ import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
-import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
import org.apache.synapse.config.SynapseConfiguration;
import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
import org.apache.synapse.core.SynapseEnvironment;
@@ -91,8 +90,7 @@ public class LoadBalancerServiceComponent {
private boolean activated = false;
private ExecutorService executorService;
- private TenantEventReceiver tenantReceiver;
- private LoadBalancerTopologyEventReceiver topologyReceiver;
+ private LoadBalancerTopologyEventReceiver topologyEventReceiver;
private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver;
private LoadBalancerCommonApplicationSignUpEventReceiver applicationSignUpEventReceiver;
private LoadBalancerStatisticsNotifier statisticsNotifier;
@@ -132,15 +130,12 @@ public class LoadBalancerServiceComponent {
LoadBalancerConfiguration.getInstance().setTopologyProvider(topologyProvider);
}
- if (configuration.isMultiTenancyEnabled()) {
- // Start tenant event receiver
- startTenantEventReceiver(executorService, topologyProvider);
- }
-
- if(configuration.isDomainMappingEnabled()) {
+ if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) {
// Start application signup event receiver
startApplicationSignUpEventReceiver(executorService, topologyProvider);
+ }
+ if(configuration.isDomainMappingEnabled()) {
// Start domain mapping event receiver
startDomainMappingEventReceiver(executorService, topologyProvider);
}
@@ -166,17 +161,11 @@ public class LoadBalancerServiceComponent {
}
}
- private void startTenantEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
-
- tenantReceiver = new TenantEventReceiver();
- tenantReceiver.setExecutorService(executorService);
- tenantReceiver.execute();
- if (log.isInfoEnabled()) {
- log.info("Tenant event receiver thread started");
+ private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ if(domainMappingEventReceiver != null) {
+ return;
}
- }
- private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider);
domainMappingEventReceiver.setExecutorService(executorService);
domainMappingEventReceiver.execute();
@@ -186,6 +175,10 @@ public class LoadBalancerServiceComponent {
}
private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ if(applicationSignUpEventReceiver != null) {
+ return;
+ }
+
applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
applicationSignUpEventReceiver.setExecutorService(executorService);
applicationSignUpEventReceiver.execute();
@@ -195,10 +188,13 @@ public class LoadBalancerServiceComponent {
}
private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+ if(topologyEventReceiver != null) {
+ return;
+ }
- topologyReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
- topologyReceiver.setExecutorService(executorService);
- topologyReceiver.execute();
+ topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
+ topologyEventReceiver.setExecutorService(executorService);
+ topologyEventReceiver.execute();
if (log.isInfoEnabled()) {
log.info("Topology receiver thread started");
}
@@ -243,21 +239,30 @@ public class LoadBalancerServiceComponent {
log.warn("An error occurred while removing endpoint deployer", e);
}
- // Terminate tenant receiver
- if(tenantReceiver != null) {
+ // Terminate topology receiver
+ if(topologyEventReceiver != null) {
try {
- tenantReceiver.terminate();
+ topologyEventReceiver.terminate();
} catch (Exception e) {
- log.warn("An error occurred while terminating tenant event receiver", e);
+ log.warn("An error occurred while terminating topology event receiver", e);
}
}
- // Terminate topology receiver
- if(topologyReceiver != null) {
+ // Terminate application signup event receiver
+ if(applicationSignUpEventReceiver != null) {
try {
- topologyReceiver.terminate();
+ applicationSignUpEventReceiver.terminate();
} catch (Exception e) {
- log.warn("An error occurred while terminating topology event receiver", e);
+ log.warn("An error occurred while terminating application signup event receiver", e);
+ }
+ }
+
+ // Terminate domain mapping event receiver
+ if(domainMappingEventReceiver != null) {
+ try {
+ domainMappingEventReceiver.terminate();
+ } catch (Exception e) {
+ log.warn("An error occurred while terminating domain mapping event receiver", e);
}
}