You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2015/03/05 10:40:25 UTC

stratos git commit: Updating load balancer event receivers

Repository: stratos
Updated Branches:
  refs/heads/master d8d7ca445 -> bf688e445


Updating load balancer event receivers


Project: http://git-wip-us.apache.org/repos/asf/stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/stratos/commit/bf688e44
Tree: http://git-wip-us.apache.org/repos/asf/stratos/tree/bf688e44
Diff: http://git-wip-us.apache.org/repos/asf/stratos/diff/bf688e44

Branch: refs/heads/master
Commit: bf688e4457518fbce44ff6752dff33d1b20b04e9
Parents: d8d7ca4
Author: Imesh Gunaratne <im...@apache.org>
Authored: Thu Mar 5 15:10:14 2015 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Thu Mar 5 15:10:14 2015 +0530

----------------------------------------------------------------------
 .../extension/api/LoadBalancerExtension.java    | 158 ++++++++++++-------
 .../internal/LoadBalancerServiceComponent.java  |  63 ++++----
 2 files changed, 139 insertions(+), 82 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/stratos/blob/bf688e44/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
index 9d82fdb..7818390 100644
--- a/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
+++ b/components/org.apache.stratos.load.balancer.extension.api/src/main/java/org/apache/stratos/load/balancer/extension/api/LoadBalancerExtension.java
@@ -21,13 +21,17 @@ package org.apache.stratos.load.balancer.extension.api;
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonApplicationSignUpEventReceiver;
+import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonDomainMappingEventReceiver;
 import org.apache.stratos.load.balancer.common.event.receivers.LoadBalancerCommonTopologyEventReceiver;
 import org.apache.stratos.load.balancer.common.statistics.LoadBalancerStatisticsReader;
 import org.apache.stratos.load.balancer.common.statistics.notifier.LoadBalancerStatisticsNotifier;
 import org.apache.stratos.load.balancer.common.topology.TopologyProvider;
 import org.apache.stratos.messaging.event.Event;
 import org.apache.stratos.messaging.listener.topology.*;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyEventReceiver;
+import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
+import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
 
 import java.util.concurrent.ExecutorService;
 
@@ -41,12 +45,15 @@ public class LoadBalancerExtension {
 	private LoadBalancer loadBalancer;
 	private LoadBalancerStatisticsReader statsReader;
 	private boolean loadBalancerStarted;
-    private TopologyProvider topologyProvider;
-	private TopologyEventReceiver topologyEventReceiver;
 	private LoadBalancerStatisticsNotifier statisticsNotifier;
 	private ExecutorService executorService;
 
-	/**
+    private TopologyProvider topologyProvider;
+    private LoadBalancerCommonTopologyEventReceiver topologyEventReceiver;
+    private LoadBalancerCommonDomainMappingEventReceiver domainMappingEventReceiver;
+    private LoadBalancerCommonApplicationSignUpEventReceiver applicationSignUpEventReceiver;
+
+    /**
 	 * Load balancer extension constructor.
 	 *
 	 * @param loadBalancer Load balancer instance: Mandatory.
@@ -66,10 +73,9 @@ public class LoadBalancerExtension {
 			}
 
 			// Start topology receiver thread
-			topologyEventReceiver = new LoadBalancerCommonTopologyEventReceiver(topologyProvider);
-			addEventListeners();
-			topologyEventReceiver.setExecutorService(executorService);
-			topologyEventReceiver.execute();
+			startTopologyEventReceiver(executorService, topologyProvider);
+            startApplicationSignUpEventReceiver(executorService, topologyProvider);
+            startDomainMappingEventReceiver(executorService, topologyProvider);
 
 			if (statsReader != null) {
 				// Start stats notifier thread
@@ -89,58 +95,104 @@ public class LoadBalancerExtension {
 		}
 	}
 
-	private void addEventListeners() {
+    private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+
+        topologyEventReceiver = new LoadBalancerCommonTopologyEventReceiver(topologyProvider);
+        addTopologyEventListeners(topologyEventReceiver);
+        topologyEventReceiver.setExecutorService(executorService);
+        topologyEventReceiver.execute();
+        if (log.isInfoEnabled()) {
+            log.info("Topology receiver thread started");
+        }
+
+        if (log.isInfoEnabled()) {
+            if(TopologyServiceFilter.getInstance().isActive()) {
+                log.info(String.format("Service filter activated: [filter] %s",
+                        TopologyServiceFilter.getInstance().toString()));
+            }
+
+            if(TopologyClusterFilter.getInstance().isActive()) {
+                log.info(String.format("Cluster filter activated: [filter] %s",
+                        TopologyClusterFilter.getInstance().toString()));
+            }
+
+            if(TopologyMemberFilter.getInstance().isActive()) {
+                log.info(String.format("Member filter activated: [filter] %s",
+                        TopologyMemberFilter.getInstance().toString()));
+            }
+        }
+    }
+
+    private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+        domainMappingEventReceiver = new LoadBalancerCommonDomainMappingEventReceiver(topologyProvider);
+        domainMappingEventReceiver.setExecutorService(executorService);
+        domainMappingEventReceiver.execute();
+        if (log.isInfoEnabled()) {
+            log.info("Domain mapping event receiver thread started");
+        }
+    }
+
+    private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+        applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
+        applicationSignUpEventReceiver.setExecutorService(executorService);
+        applicationSignUpEventReceiver.execute();
+        if (log.isInfoEnabled()) {
+            log.info("Application signup event receiver thread started");
+        }
+    }
+
+    private void addTopologyEventListeners(LoadBalancerCommonTopologyEventReceiver topologyEventReceiver) {
 		topologyEventReceiver.addEventListener(new CompleteTopologyEventListener() {
 
-			@Override
-			protected void onEvent(Event event) {
-				try {
-					if (!loadBalancerStarted) {
-						// Configure load balancer
-						loadBalancer.configure(topologyProvider.getTopology());
-
-						// Start load balancer
-						loadBalancer.start();
-						loadBalancerStarted = true;
-					}
-				} catch (Exception e) {
-					if (log.isErrorEnabled()) {
-						log.error("Could not start load balancer", e);
-					}
-					terminate();
-				}
-			}
-		});
+            @Override
+            protected void onEvent(Event event) {
+                try {
+                    if (!loadBalancerStarted) {
+                        // Configure load balancer
+                        loadBalancer.configure(topologyProvider.getTopology());
+
+                        // Start load balancer
+                        loadBalancer.start();
+                        loadBalancerStarted = true;
+                    }
+                } catch (Exception e) {
+                    if (log.isErrorEnabled()) {
+                        log.error("Could not start load balancer", e);
+                    }
+                    terminate();
+                }
+            }
+        });
 		topologyEventReceiver.addEventListener(new MemberActivatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				reloadConfiguration();
-			}
-		});
+            @Override
+            protected void onEvent(Event event) {
+                reloadConfiguration();
+            }
+        });
 		topologyEventReceiver.addEventListener(new MemberSuspendedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				reloadConfiguration();
-			}
-		});
+            @Override
+            protected void onEvent(Event event) {
+                reloadConfiguration();
+            }
+        });
 		topologyEventReceiver.addEventListener(new MemberTerminatedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				reloadConfiguration();
-			}
-		});
+            @Override
+            protected void onEvent(Event event) {
+                reloadConfiguration();
+            }
+        });
 		topologyEventReceiver.addEventListener(new ClusterRemovedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				reloadConfiguration();
-			}
-		});
+            @Override
+            protected void onEvent(Event event) {
+                reloadConfiguration();
+            }
+        });
 		topologyEventReceiver.addEventListener(new ServiceRemovedEventListener() {
-			@Override
-			protected void onEvent(Event event) {
-				reloadConfiguration();
-			}
-		});
+            @Override
+            protected void onEvent(Event event) {
+                reloadConfiguration();
+            }
+        });
 	}
 
 	private void reloadConfiguration() {

http://git-wip-us.apache.org/repos/asf/stratos/blob/bf688e44/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index b391d21..fed6e84 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -42,7 +42,6 @@ import org.apache.stratos.load.balancer.util.LoadBalancerConstants;
 import org.apache.stratos.messaging.message.filter.topology.TopologyClusterFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyMemberFilter;
 import org.apache.stratos.messaging.message.filter.topology.TopologyServiceFilter;
-import org.apache.stratos.messaging.message.receiver.tenant.TenantEventReceiver;
 import org.apache.synapse.config.SynapseConfiguration;
 import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
 import org.apache.synapse.core.SynapseEnvironment;
@@ -91,8 +90,7 @@ public class LoadBalancerServiceComponent {
 
     private boolean activated = false;
     private ExecutorService executorService;
-    private TenantEventReceiver tenantReceiver;
-    private LoadBalancerTopologyEventReceiver topologyReceiver;
+    private LoadBalancerTopologyEventReceiver topologyEventReceiver;
     private LoadBalancerDomainMappingEventReceiver domainMappingEventReceiver;
     private LoadBalancerCommonApplicationSignUpEventReceiver applicationSignUpEventReceiver;
     private LoadBalancerStatisticsNotifier statisticsNotifier;
@@ -132,15 +130,12 @@ public class LoadBalancerServiceComponent {
                 LoadBalancerConfiguration.getInstance().setTopologyProvider(topologyProvider);
             }
 
-            if (configuration.isMultiTenancyEnabled()) {
-                // Start tenant event receiver
-                startTenantEventReceiver(executorService, topologyProvider);
-            }
-
-            if(configuration.isDomainMappingEnabled()) {
+            if (configuration.isMultiTenancyEnabled() || configuration.isDomainMappingEnabled()) {
                 // Start application signup event receiver
                 startApplicationSignUpEventReceiver(executorService, topologyProvider);
+            }
 
+            if(configuration.isDomainMappingEnabled()) {
                 // Start domain mapping event receiver
                 startDomainMappingEventReceiver(executorService, topologyProvider);
             }
@@ -166,17 +161,11 @@ public class LoadBalancerServiceComponent {
         }
     }
 
-    private void startTenantEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
-
-        tenantReceiver = new TenantEventReceiver();
-        tenantReceiver.setExecutorService(executorService);
-	    tenantReceiver.execute();
-        if (log.isInfoEnabled()) {
-            log.info("Tenant event receiver thread started");
+    private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+        if(domainMappingEventReceiver != null) {
+            return;
         }
-    }
 
-    private void startDomainMappingEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
         domainMappingEventReceiver = new LoadBalancerDomainMappingEventReceiver(topologyProvider);
         domainMappingEventReceiver.setExecutorService(executorService);
         domainMappingEventReceiver.execute();
@@ -186,6 +175,10 @@ public class LoadBalancerServiceComponent {
     }
 
     private void startApplicationSignUpEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+        if(applicationSignUpEventReceiver != null) {
+            return;
+        }
+
         applicationSignUpEventReceiver = new LoadBalancerCommonApplicationSignUpEventReceiver(topologyProvider);
         applicationSignUpEventReceiver.setExecutorService(executorService);
         applicationSignUpEventReceiver.execute();
@@ -195,10 +188,13 @@ public class LoadBalancerServiceComponent {
     }
 
     private void startTopologyEventReceiver(ExecutorService executorService, TopologyProvider topologyProvider) {
+        if(topologyEventReceiver != null) {
+            return;
+        }
 
-        topologyReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
-	    topologyReceiver.setExecutorService(executorService);
-	    topologyReceiver.execute();
+        topologyEventReceiver = new LoadBalancerTopologyEventReceiver(topologyProvider);
+	    topologyEventReceiver.setExecutorService(executorService);
+	    topologyEventReceiver.execute();
         if (log.isInfoEnabled()) {
             log.info("Topology receiver thread started");
         }
@@ -243,21 +239,30 @@ public class LoadBalancerServiceComponent {
             log.warn("An error occurred while removing endpoint deployer", e);
         }
 
-        // Terminate tenant receiver
-        if(tenantReceiver != null) {
+        // Terminate topology receiver
+        if(topologyEventReceiver != null) {
             try {
-                tenantReceiver.terminate();
+                topologyEventReceiver.terminate();
             } catch (Exception e) {
-                log.warn("An error occurred while terminating tenant event receiver", e);
+                log.warn("An error occurred while terminating topology event receiver", e);
             }
         }
 
-        // Terminate topology receiver
-        if(topologyReceiver != null) {
+        // Terminate application signup event receiver
+        if(applicationSignUpEventReceiver != null) {
             try {
-                topologyReceiver.terminate();
+                applicationSignUpEventReceiver.terminate();
             } catch (Exception e) {
-                log.warn("An error occurred while terminating topology event receiver", e);
+                log.warn("An error occurred while terminating application signup event receiver", e);
+            }
+        }
+
+        // Terminate domain mapping event receiver
+        if(domainMappingEventReceiver != null) {
+            try {
+                domainMappingEventReceiver.terminate();
+            } catch (Exception e) {
+                log.warn("An error occurred while terminating domain mapping event receiver", e);
             }
         }