You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/07 19:22:23 UTC
[1/2] git commit: Implemented load balancer tenant receiver and
introduced two separate maps for single tenant and multi-tenant clusters
Updated Branches:
refs/heads/master 91970c3ac -> d05eda2f7
Implemented load balancer tenant receiver and introduced two separate maps for single tenant and multi-tenant clusters
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/25c76201
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/25c76201
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/25c76201
Branch: refs/heads/master
Commit: 25c76201f7c904da112ba555cfdd29469880b068
Parents: aa550b6
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sat Dec 7 23:51:58 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sat Dec 7 23:51:58 2013 +0530
----------------------------------------------------------------------
.../load/balancer/LoadBalancerContext.java | 231 ---------------
.../balancer/LoadBalancerTenantReceiver.java | 160 ++++++++++
.../balancer/LoadBalancerTopologyReceiver.java | 147 ++++++----
.../stratos/load/balancer/RequestDelegator.java | 89 ++++--
.../conf/LoadBalancerConfiguration.java | 7 +-
.../conf/configurator/JndiConfigurator.java | 7 +
.../TopologyFilterConfigurator.java | 1 -
.../load/balancer/conf/structure/Node.java | 10 +-
.../balancer/context/LoadBalancerContext.java | 291 +++++++++++++++++++
.../TenantAwareLoadBalanceEndpoint.java | 2 +-
.../internal/LoadBalancerServiceComponent.java | 40 ++-
...adBalancerInFlightRequestCountCollector.java | 8 +-
.../WSO2CEPInFlightRequestCountObserver.java | 6 +-
.../sample/configuration/loadbalancer1.conf | 4 +-
.../sample/configuration/loadbalancer2.conf | 4 +-
.../sample/configuration/loadbalancer3.conf | 4 +-
.../src/main/conf/loadbalancer.conf | 4 +-
.../config/lb/repository/conf/loadbalancer.conf | 4 +-
18 files changed, 657 insertions(+), 362 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerContext.java
deleted file mode 100644
index ec1c464..0000000
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerContext.java
+++ /dev/null
@@ -1,231 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-
-package org.apache.stratos.load.balancer;
-
-import org.apache.axis2.context.ConfigurationContext;
-import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.messaging.domain.topology.Cluster;
-import org.apache.synapse.config.SynapseConfiguration;
-import org.wso2.carbon.mediation.dependency.mgt.services.DependencyManagementService;
-import org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService;
-import org.wso2.carbon.registry.core.session.UserRegistry;
-import org.wso2.carbon.user.core.service.RealmService;
-
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-
-/**
- * Defines load balancer context information.
- */
-public class LoadBalancerContext {
-
- private static final Log log = LogFactory.getLog(LoadBalancerContext.class);
- private static volatile LoadBalancerContext instance;
-
- private SynapseConfiguration synapseConfiguration;
- private ConfigurationContext configCtxt;
- private AxisConfiguration axisConfiguration;
- private UserRegistry configRegistry;
- private UserRegistry governanceRegistry;
- private DependencyManagementService dependencyManager;
-
- // <TenantId, SynapseEnvironmentService> Map
- private Map<Integer, SynapseEnvironmentService> synapseEnvironmentServiceMap;
- // <ServiceName, ServiceContext> Map
- private Map<String, ServiceContext> serviceContextMap;
- // <ClusterId, ClusterContext> Map
- private Map<String, ClusterContext> clusterContextMap;
- // <Hostname, Cluster> Map
- private Map<String, Cluster> clusterMap;
-
- private LoadBalancerContext() {
- synapseEnvironmentServiceMap = new ConcurrentHashMap<Integer, SynapseEnvironmentService>();
- serviceContextMap = new ConcurrentHashMap<String, ServiceContext>();
- clusterContextMap = new ConcurrentHashMap<String, ClusterContext>();
- clusterMap = new ConcurrentHashMap<String, Cluster>();
- }
-
- public static LoadBalancerContext getInstance() {
- if (instance == null) {
- synchronized (LoadBalancerContext.class){
- if (instance == null) {
- instance = new LoadBalancerContext ();
- }
- }
- }
- return instance;
- }
-
- public void clear() {
- synapseEnvironmentServiceMap.clear();
- serviceContextMap.clear();
- clusterContextMap.clear();
- clusterMap.clear();
- }
-
- public RealmService getRealmService() {
- return realmService;
- }
-
- public void setRealmService(RealmService realmService) {
- this.realmService = realmService;
- }
-
- private RealmService realmService;
-
- public SynapseConfiguration getSynapseConfiguration() throws TenantAwareLoadBalanceEndpointException{
- assertNull("SynapseConfiguration", synapseConfiguration);
- return synapseConfiguration;
- }
-
- public void setSynapseConfiguration(SynapseConfiguration synapseConfiguration) {
- this.synapseConfiguration = synapseConfiguration;
- }
-
- public AxisConfiguration getAxisConfiguration() throws TenantAwareLoadBalanceEndpointException {
- assertNull("AxisConfiguration", axisConfiguration);
- return axisConfiguration;
- }
-
- public void setAxisConfiguration(AxisConfiguration axisConfiguration) {
- this.axisConfiguration = axisConfiguration;
- }
-
- public UserRegistry getConfigRegistry() throws TenantAwareLoadBalanceEndpointException {
- assertNull("Registry", configRegistry);
- return configRegistry;
- }
-
- public void setConfigRegistry(UserRegistry configRegistry) {
- this.configRegistry = configRegistry;
- }
-
- public DependencyManagementService getDependencyManager() {
- return dependencyManager;
- }
-
- public void setDependencyManager(DependencyManagementService dependencyManager) {
- this.dependencyManager = dependencyManager;
- }
-
- private void assertNull(String name, Object object) throws TenantAwareLoadBalanceEndpointException {
- if (object == null) {
- String message = name + " reference in the proxy admin config holder is null";
- log.error(message);
- throw new TenantAwareLoadBalanceEndpointException(message);
- }
- }
-
- public UserRegistry getGovernanceRegistry() {
- return governanceRegistry;
- }
-
- public void setGovernanceRegistry(UserRegistry governanceRegistry) {
- this.governanceRegistry = governanceRegistry;
- }
-
- public SynapseEnvironmentService getSynapseEnvironmentService(int tenantId) {
- return synapseEnvironmentServiceMap.get(tenantId);
- }
-
- public void addSynapseEnvironmentService(int tenantId, SynapseEnvironmentService synapseEnvironmentService) {
- synapseEnvironmentServiceMap.put(tenantId, synapseEnvironmentService);
- }
-
- public void removeSynapseEnvironmentService(int tenantId) {
- synapseEnvironmentServiceMap.remove(tenantId);
- }
-
- public Map<Integer, SynapseEnvironmentService> getSynapseEnvironmentServiceMap() {
- return synapseEnvironmentServiceMap;
- }
-
- public ConfigurationContext getConfigCtxt() {
- return configCtxt;
- }
-
- public void setConfigCtxt(ConfigurationContext configCtxt) {
- this.configCtxt = configCtxt;
- }
-
- // ServiceContextMap methods START
- public Collection<ServiceContext> getServiceContexts() {
- return serviceContextMap.values();
- }
-
- public ServiceContext getServiceContext(String serviceName) {
- return serviceContextMap.get(serviceName);
- }
-
- public void addServiceContext(ServiceContext serviceContext) {
- serviceContextMap.put(serviceContext.getServiceName(), serviceContext);
- }
-
- public void removeServiceContext(String serviceName) {
- serviceContextMap.remove(serviceName);
- }
- // ServiceContextMap methods END
-
- // ClusterContextMap methods START
- public Collection<ClusterContext> getClusterContexts() {
- return clusterContextMap.values();
- }
-
- public ClusterContext getClusterContext(String clusterId) {
- return clusterContextMap.get(clusterId);
- }
-
- public void addClusterContext(ClusterContext clusterContext) {
- clusterContextMap.put(clusterContext.getClusterId(), clusterContext);
- }
-
- public void removeClusterContext(String clusterId) {
- clusterContextMap.remove(clusterId);
- }
- // ClusterContextMap methods END
-
- // ClusterMap methods START
- public Cluster getCluster(String hostName) {
- return clusterMap.get(hostName);
- }
-
- public boolean clusterExists(String hostName) {
- return clusterMap.containsKey(hostName);
- }
-
- public void addCluster(Cluster cluster) {
- for(String hostName : cluster.getHostNames()) {
- addCluster(hostName, cluster);
- }
- }
-
- public void addCluster(String hostName, Cluster cluster) {
- clusterMap.put(hostName, cluster);
- }
-
- public void removeCluster(String hostName) {
- clusterMap.remove(hostName);
- }
- // ClusterMap methods END
-}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
new file mode 100644
index 0000000..ad17ec9
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
@@ -0,0 +1,160 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
+import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
+import org.apache.stratos.messaging.listener.tenant.TenantSubscribedEventListener;
+import org.apache.stratos.messaging.listener.tenant.TenantUnSubscribedEventListener;
+import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantEventMessageDelegator;
+import org.apache.stratos.messaging.message.receiver.tenant.TenantReceiver;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+import java.util.HashMap;
+import java.util.Map;
+
+/**
+ * Load balancer tenant receiver updates load balancer context according to
+ * incoming tenant events.
+ */
+public class LoadBalancerTenantReceiver implements Runnable {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerTenantReceiver.class);
+
+ private final TenantReceiver tenantReiver;
+ private boolean terminated;
+
+ public LoadBalancerTenantReceiver() {
+ tenantReiver = new TenantReceiver(createMessageDelegator());
+ }
+
+ private TenantEventMessageDelegator createMessageDelegator() {
+ TenantMessageProcessorChain processorChain = createEventProcessorChain();
+ return new TenantEventMessageDelegator(processorChain);
+ }
+
+ private TenantMessageProcessorChain createEventProcessorChain() {
+ TenantMessageProcessorChain messageProcessorChain = new TenantMessageProcessorChain();
+ messageProcessorChain.addEventListener(new TenantSubscribedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event;
+
+ // Find cluster of tenant
+ Cluster cluster = findCluster(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId());
+ if(cluster != null) {
+ for(String hostName : cluster.getHostNames()) {
+ // Add hostName, tenantId, cluster to multi-tenant map
+ Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusters(hostName);
+ if(clusterMap == null) {
+ clusterMap = new HashMap<Integer, Cluster>();
+ clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster);
+ LoadBalancerContext.getInstance().addMultiTenantClusters(hostName, clusterMap);
+ }
+ else {
+ clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster);
+ }
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+ hostName, tenantSubscribedEvent.getTenantId(), cluster.getClusterId()));
+ }
+ }
+ }
+ else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
+ tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId()));
+ }
+ }
+ }
+ });
+ messageProcessorChain.addEventListener(new TenantUnSubscribedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event;
+
+ // Find cluster of tenant
+ Cluster cluster = findCluster(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId());
+ if(cluster != null) {
+ for(String hostName : cluster.getHostNames()) {
+ LoadBalancerContext.getInstance().removeMultiTenantClusters(hostName);
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+ hostName, tenantUnSubscribedEvent.getTenantId(), cluster.getClusterId()));
+ }
+ }
+ }
+ else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
+ tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId()));
+ }
+ }
+ }
+ });
+ return messageProcessorChain;
+ }
+
+ private Cluster findCluster(String serviceName, int tenantId) {
+ try {
+ TopologyManager.acquireReadLock();
+ Service service = TopologyManager.getTopology().getService(serviceName);
+ if(service == null) {
+ throw new RuntimeException(String.format("Service not found: %s", serviceName));
+ }
+ for(Cluster cluster : service.getClusters()) {
+ if(cluster.tenantIdInRange(tenantId)) {
+ return cluster;
+ }
+ }
+ return null;
+ }
+ finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ @Override
+ public void run() {
+ Thread tenantReceiverThread = new Thread(tenantReiver);
+ tenantReceiverThread.start();
+
+ // Keep the thread live until terminated
+ while (!terminated);
+ if (log.isInfoEnabled()) {
+ log.info("Load balancer tenant receiver thread terminated");
+ }
+ }
+
+ /**
+ * Terminate load balancer tenant receiver thread.
+ */
+ public void terminate() {
+ tenantReiver.terminate();
+ terminated = true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index ee43eae..55ff817 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -21,24 +21,27 @@ package org.apache.stratos.load.balancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.ServiceType;
import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
+import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
+import org.apache.stratos.messaging.event.topology.ServiceRemovedEvent;
import org.apache.stratos.messaging.listener.topology.ClusterRemovedEventListener;
import org.apache.stratos.messaging.listener.topology.CompleteTopologyEventListener;
import org.apache.stratos.messaging.listener.topology.MemberActivatedEventListener;
import org.apache.stratos.messaging.listener.topology.ServiceRemovedEventListener;
-import org.apache.stratos.messaging.event.topology.*;
import org.apache.stratos.messaging.message.processor.topology.TopologyMessageProcessorChain;
import org.apache.stratos.messaging.message.receiver.topology.TopologyEventMessageDelegator;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.stratos.messaging.message.receiver.topology.TopologyReceiver;
-import java.util.Collection;
-
/**
- * Load balancer topology receiver.
+ * Load balancer topology receiver updates load balancer context according to
+ * incoming topology events.
*/
public class LoadBalancerTopologyReceiver implements Runnable {
@@ -55,52 +58,51 @@ public class LoadBalancerTopologyReceiver implements Runnable {
public void run() {
Thread thread = new Thread(topologyReceiver);
thread.start();
- if(log.isInfoEnabled()) {
+ if (log.isInfoEnabled()) {
log.info("Load balancer topology receiver thread started");
}
// Keep the thread live until terminated
- while (!terminated);
- if(log.isInfoEnabled()) {
+ while (!terminated) ;
+ if (log.isInfoEnabled()) {
log.info("Load balancer topology receiver thread terminated");
}
}
private TopologyEventMessageDelegator createMessageDelegator() {
TopologyMessageProcessorChain processorChain = createEventProcessorChain();
+ return new TopologyEventMessageDelegator(processorChain);
+ }
+
+ private TopologyMessageProcessorChain createEventProcessorChain() {
+ // Listen to topology events that affect clusters
+ TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
processorChain.addEventListener(new CompleteTopologyEventListener() {
@Override
protected void onEvent(Event event) {
try {
TopologyManager.acquireReadLock();
- for(Service service : TopologyManager.getTopology().getServices()) {
- for(Cluster cluster : service.getClusters()) {
- if(hasActiveMembers(cluster)) {
+ for (Service service : TopologyManager.getTopology().getServices()) {
+ for (Cluster cluster : service.getClusters()) {
+ if (hasActiveMembers(cluster)) {
addClusterToLbContext(cluster);
}
}
}
- }
- finally {
+ } finally {
TopologyManager.releaseReadLock();
}
}
private boolean hasActiveMembers(Cluster cluster) {
- for(Member member : cluster.getMembers()) {
- if(member.isActive()) {
+ for (Member member : cluster.getMembers()) {
+ if (member.isActive()) {
return true;
}
}
return false;
}
});
- return new TopologyEventMessageDelegator(processorChain);
- }
-
- private TopologyMessageProcessorChain createEventProcessorChain() {
- // Listen to topology events that affect clusters
- TopologyMessageProcessorChain processorChain = new TopologyMessageProcessorChain();
processorChain.addEventListener(new MemberActivatedEventListener() {
@Override
protected void onEvent(Event event) {
@@ -108,16 +110,17 @@ public class LoadBalancerTopologyReceiver implements Runnable {
TopologyManager.acquireReadLock();
// Add cluster to the context when its first member is activated
- MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent)event;
- Cluster cluster = findCluster(memberActivatedEvent.getClusterId());
- if(cluster == null) {
- if(log.isErrorEnabled()) {
- log.error(String.format("Cluster not found in topology: [cluster] %s", memberActivatedEvent.getClusterId()));
+ MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
+ Cluster cluster = LoadBalancerContext.getInstance().getCluster(memberActivatedEvent.getClusterId());
+ if (cluster != null) {
+ addClusterToLbContext(cluster);
+ }
+ else {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", memberActivatedEvent.getClusterId()));
}
}
- addClusterToLbContext(cluster);
- }
- finally {
+ } finally {
TopologyManager.releaseReadLock();
}
}
@@ -129,10 +132,19 @@ public class LoadBalancerTopologyReceiver implements Runnable {
TopologyManager.acquireReadLock();
// Remove cluster from context
- ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent)event;
- removeClusterFromLbContext(clusterRemovedEvent.getHostName());
- }
- finally {
+ ClusterRemovedEvent clusterRemovedEvent = (ClusterRemovedEvent) event;
+ Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterRemovedEvent.getClusterId());
+ if (cluster != null) {
+ for (String hostName : cluster.getHostNames()) {
+ removeClusterFromLbContext(hostName);
+ }
+ }
+ else {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", clusterRemovedEvent.getClusterId()));
+ }
+ }
+ } finally {
TopologyManager.releaseReadLock();
}
}
@@ -144,16 +156,21 @@ public class LoadBalancerTopologyReceiver implements Runnable {
TopologyManager.acquireReadLock();
// Remove all clusters of given service from context
- ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent)event;
- for(Service service : TopologyManager.getTopology().getServices()) {
- for(Cluster cluster : service.getClusters()) {
- for(String hostName : cluster.getHostNames()) {
+ ServiceRemovedEvent serviceRemovedEvent = (ServiceRemovedEvent) event;
+ Service service = TopologyManager.getTopology().getService(serviceRemovedEvent.getServiceName());
+ if (service != null) {
+ for (Cluster cluster : service.getClusters()) {
+ for (String hostName : cluster.getHostNames()) {
removeClusterFromLbContext(hostName);
}
}
}
- }
- finally {
+ else {
+ if(log.isWarnEnabled()) {
+ log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
+ }
+ }
+ } finally {
TopologyManager.releaseReadLock();
}
}
@@ -162,40 +179,42 @@ public class LoadBalancerTopologyReceiver implements Runnable {
}
private void addClusterToLbContext(Cluster cluster) {
- for(String hostName : cluster.getHostNames()) {
- if(!LoadBalancerContext.getInstance().clusterExists(hostName)) {
- LoadBalancerContext.getInstance().addCluster(hostName, cluster);
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster added to load balancer context: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
+ // Add cluster to Map<ClusterId, Cluster>
+ LoadBalancerContext.getInstance().addCluster(cluster);
+
+ Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
+ if (service.getServiceType() == ServiceType.SingleTenant) {
+ // Add cluster to SingleTenantClusterMap
+ for (String hostName : cluster.getHostNames()) {
+ if (!LoadBalancerContext.getInstance().singleTenantClusterExists((hostName))) {
+ LoadBalancerContext.getInstance().addSingleTenantCluster(hostName, cluster);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster added to single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
+ }
}
}
}
+ // MultiTenantClusterMap is updated by tenant receiver.
}
- private void removeClusterFromLbContext(String hostName) {
- if(LoadBalancerContext.getInstance().clusterExists(hostName)) {
- Cluster cluster = LoadBalancerContext.getInstance().getCluster(hostName);
- LoadBalancerContext.getInstance().removeCluster(hostName);
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster removed from load balancer context: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
- }
- }
- }
-
- private Cluster findCluster(String clusterId) {
- if(clusterId == null) {
- return null;
- }
-
- Collection<Service> services = TopologyManager.getTopology().getServices();
- for (Service service : services) {
- for (Cluster cluster : service.getClusters()) {
- if (clusterId.equals(cluster.getClusterId())) {
- return cluster;
+ private void removeClusterFromLbContext(String clusterId) {
+ Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterId);
+ Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
+ if (service.getServiceType() == ServiceType.SingleTenant) {
+ // Remove cluster from SingleTenantClusterMap
+ for (String hostName : cluster.getHostNames()) {
+ if (LoadBalancerContext.getInstance().singleTenantClusterExists(hostName)) {
+ LoadBalancerContext.getInstance().removeSingleTenantCluster(hostName);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster removed from single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
+ }
}
}
}
- return null;
+ // MultiTenantClusterMap is updated by tenant receiver.
+
+ // Remove cluster from Map<ClusterId,Cluster>
+ LoadBalancerContext.getInstance().removeCluster(clusterId);
}
/**
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
index 11ea81d..96cb3eb 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
@@ -19,18 +19,16 @@
package org.apache.stratos.load.balancer;
-import org.apache.commons.lang.NotImplementedException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.algorithm.AlgorithmContext;
import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithm;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
-import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import java.util.ArrayList;
-import java.util.Collection;
/**
* Implements core load balancing logic for identifying the next member
@@ -45,53 +43,86 @@ public class RequestDelegator {
this.algorithm = algorithm;
}
- public Member findNextMember(String hostName) {
-
+ public Member findNextMemberFromHostName(String hostName) {
try {
- if(hostName == null)
+ if (hostName == null)
return null;
TopologyManager.acquireReadLock();
long startTime = System.currentTimeMillis();
- Cluster cluster = LoadBalancerContext.getInstance().getCluster(hostName);
- if(cluster != null) {
- // Find algorithm context of the cluster
- ClusterContext clusterContext = LoadBalancerContext.getInstance().getClusterContext(cluster.getClusterId());
- if(clusterContext == null) {
- clusterContext = new ClusterContext(cluster.getServiceName(), cluster.getClusterId());
- LoadBalancerContext.getInstance().addClusterContext(clusterContext);
+ Cluster cluster = LoadBalancerContext.getInstance().getSingleTenantCluster(hostName);
+ if (cluster != null) {
+ Member member = findNextMemberInCluster(cluster);
+ if (member != null) {
+ long endTime = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [member] %s", (endTime - startTime), member.getServiceName(), member.getClusterId(), member.getMemberId()));
+ }
}
+ return member;
+ }
+ return null;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
- AlgorithmContext algorithmContext = clusterContext.getAlgorithmContext();
- if(algorithmContext == null) {
- algorithmContext = new AlgorithmContext(cluster.getServiceName(), cluster.getClusterId());
- clusterContext.setAlgorithmContext(algorithmContext);
- }
- algorithm.setMembers(new ArrayList<Member>(cluster.getMembers()));
- Member member = algorithm.getNextMember(algorithmContext);
- long endTime = System.currentTimeMillis();
- if(log.isDebugEnabled()) {
- log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [member] %s", (endTime - startTime), member.getServiceName(), member.getClusterId(), member.getMemberId()));
+ public Member findNextMemberFromTenantId(String hostName, int tenantId) {
+ try {
+ TopologyManager.acquireReadLock();
+ long startTime = System.currentTimeMillis();
+
+ // Find cluster from host name and tenant id
+ Cluster cluster = LoadBalancerContext.getInstance().getMultiTenantCluster(hostName, tenantId);
+ if (cluster != null) {
+ Member member = findNextMemberInCluster(cluster);
+ if (member != null) {
+ long endTime = System.currentTimeMillis();
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Next member identified in %dms: [service] %s [cluster] %s [tenant-id] %d [member] %s",
+ (endTime - startTime), member.getServiceName(), member.getClusterId(), tenantId, member.getMemberId()));
+ }
}
return member;
}
return null;
- }
- finally {
+ } finally {
TopologyManager.releaseReadLock();
}
}
+ private Member findNextMemberInCluster(Cluster cluster) {
+ // Find algorithm context of the cluster
+ ClusterContext clusterContext = LoadBalancerContext.getInstance().getClusterContext(cluster.getClusterId());
+ if (clusterContext == null) {
+ clusterContext = new ClusterContext(cluster.getServiceName(), cluster.getClusterId());
+ LoadBalancerContext.getInstance().addClusterContext(clusterContext);
+ }
+
+ AlgorithmContext algorithmContext = clusterContext.getAlgorithmContext();
+ if (algorithmContext == null) {
+ algorithmContext = new AlgorithmContext(cluster.getServiceName(), cluster.getClusterId());
+ clusterContext.setAlgorithmContext(algorithmContext);
+ }
+ algorithm.setMembers(new ArrayList<Member>(cluster.getMembers()));
+ Member member = algorithm.getNextMember(algorithmContext);
+ if (member == null) {
+ if (log.isWarnEnabled()) {
+ log.warn(String.format("Could not find a member in cluster: [service] %s [cluster] %s", cluster.getServiceName(), cluster.getClusterId()));
+ }
+ }
+ return member;
+ }
+
public boolean isTargetHostValid(String hostName) {
try {
- if(hostName == null)
+ if (hostName == null)
return false;
TopologyManager.acquireReadLock();
return LoadBalancerContext.getInstance().clusterExists(hostName);
- }
- finally {
+ } finally {
TopologyManager.releaseReadLock();
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index 6b29482..c94d55f 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -22,18 +22,21 @@ package org.apache.stratos.load.balancer.conf;
import org.apache.commons.lang.StringUtils;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.load.balancer.conf.domain.Algorithm;
import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
import org.apache.stratos.load.balancer.conf.structure.Node;
import org.apache.stratos.load.balancer.conf.structure.NodeBuilder;
import org.apache.stratos.load.balancer.conf.util.Constants;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.load.balancer.exception.InvalidConfigurationException;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import java.io.File;
-import java.util.*;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Scanner;
import java.util.regex.Pattern;
/**
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/JndiConfigurator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/JndiConfigurator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/JndiConfigurator.java
index 97ec06c..f2526d4 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/JndiConfigurator.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/JndiConfigurator.java
@@ -33,8 +33,11 @@ import java.io.IOException;
*/
public class JndiConfigurator {
+ private static boolean configured;
+
public static void configure(LoadBalancerConfiguration configuration) {
generateJndiPropertiesFile(configuration);
+ configured = true;
}
private static void generateJndiPropertiesFile(LoadBalancerConfiguration configuration) {
@@ -73,4 +76,8 @@ public class JndiConfigurator {
FileOutputStream outputStream = new FileOutputStream(filePath);
IOUtils.write(content, outputStream);
}
+
+ public static boolean isConfigured() {
+ return configured;
+ }
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
index 0a0e51f..e4bac54 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/configurator/TopologyFilterConfigurator.java
@@ -19,7 +19,6 @@
package org.apache.stratos.load.balancer.conf.configurator;
-import org.apache.commons.lang3.StringUtils;
import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
/**
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/structure/Node.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/structure/Node.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/structure/Node.java
index dc24172..5399784 100755
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/structure/Node.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/structure/Node.java
@@ -18,16 +18,12 @@
*/
package org.apache.stratos.load.balancer.conf.structure;
+import org.apache.commons.lang.builder.HashCodeBuilder;
+
import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.List;
-import java.util.Map;
+import java.util.*;
import java.util.Map.Entry;
-import org.apache.commons.lang.builder.HashCodeBuilder;
-
/**
* This is the basic data structure which holds a <i>Nginx</i> formatted configuration file.
*
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
new file mode 100644
index 0000000..e5fcfce
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContext.java
@@ -0,0 +1,291 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.context;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.ClusterContext;
+import org.apache.stratos.load.balancer.ServiceContext;
+import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.wso2.carbon.mediation.dependency.mgt.services.DependencyManagementService;
+import org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService;
+import org.wso2.carbon.registry.core.session.UserRegistry;
+import org.wso2.carbon.user.core.service.RealmService;
+
+import java.util.Collection;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+
+/**
+ * Defines load balancer context information.
+ */
+public class LoadBalancerContext {
+
+ private static final Log log = LogFactory.getLog(LoadBalancerContext.class);
+ private static volatile LoadBalancerContext instance;
+
+ private SynapseConfiguration synapseConfiguration;
+ private ConfigurationContext configCtxt;
+ private AxisConfiguration axisConfiguration;
+ private UserRegistry configRegistry;
+ private UserRegistry governanceRegistry;
+ private DependencyManagementService dependencyManager;
+
+ // Following map is updated by the service component.
+ // <TenantId, SynapseEnvironmentService> Map
+ private Map<Integer, SynapseEnvironmentService> tenantIdSynapseEnvironmentServiceMap;
+
+ // Following maps are updated on demand by the request delegator.
+ // <ServiceName, ServiceContext> Map
+ private Map<String, ServiceContext> serviceNameServiceContextMap;
+ // <ClusterId, ClusterContext> Map
+ private Map<String, ClusterContext> clusterIdClusterContextMap;
+
+ // Following maps are updated by load balancer topology receiver.
+ // <ClusterId, Cluster> Map
+ private Map<String, Cluster> clusterIdClusterMap;
+ // <Hostname, Cluster> Map
+ private Map<String, Cluster> singleTenantClusterMap;
+ // <Hostname, Map<TenantId, Cluster>> Map
+ private Map<String, Map<Integer, Cluster>> multiTenantClusterMap;
+
+ private LoadBalancerContext() {
+ tenantIdSynapseEnvironmentServiceMap = new ConcurrentHashMap<Integer, SynapseEnvironmentService>();
+ serviceNameServiceContextMap = new ConcurrentHashMap<String, ServiceContext>();
+ clusterIdClusterContextMap = new ConcurrentHashMap<String, ClusterContext>();
+ clusterIdClusterMap = new ConcurrentHashMap<String, Cluster>();
+ singleTenantClusterMap = new ConcurrentHashMap<String, Cluster>();
+ multiTenantClusterMap = new ConcurrentHashMap<String, Map<Integer, Cluster>>();
+ }
+
+ public static LoadBalancerContext getInstance() {
+ if (instance == null) {
+ synchronized (LoadBalancerContext.class){
+ if (instance == null) {
+ instance = new LoadBalancerContext ();
+ }
+ }
+ }
+ return instance;
+ }
+
+ public void clear() {
+ tenantIdSynapseEnvironmentServiceMap.clear();
+ serviceNameServiceContextMap.clear();
+ clusterIdClusterContextMap.clear();
+ multiTenantClusterMap.clear();
+ }
+
+ public RealmService getRealmService() {
+ return realmService;
+ }
+
+ public void setRealmService(RealmService realmService) {
+ this.realmService = realmService;
+ }
+
+ private RealmService realmService;
+
+ public SynapseConfiguration getSynapseConfiguration() throws TenantAwareLoadBalanceEndpointException {
+ assertNull("SynapseConfiguration", synapseConfiguration);
+ return synapseConfiguration;
+ }
+
+ public void setSynapseConfiguration(SynapseConfiguration synapseConfiguration) {
+ this.synapseConfiguration = synapseConfiguration;
+ }
+
+ public AxisConfiguration getAxisConfiguration() throws TenantAwareLoadBalanceEndpointException {
+ assertNull("AxisConfiguration", axisConfiguration);
+ return axisConfiguration;
+ }
+
+ public void setAxisConfiguration(AxisConfiguration axisConfiguration) {
+ this.axisConfiguration = axisConfiguration;
+ }
+
+ public UserRegistry getConfigRegistry() throws TenantAwareLoadBalanceEndpointException {
+ assertNull("Registry", configRegistry);
+ return configRegistry;
+ }
+
+ public void setConfigRegistry(UserRegistry configRegistry) {
+ this.configRegistry = configRegistry;
+ }
+
+ public DependencyManagementService getDependencyManager() {
+ return dependencyManager;
+ }
+
+ public void setDependencyManager(DependencyManagementService dependencyManager) {
+ this.dependencyManager = dependencyManager;
+ }
+
+ private void assertNull(String name, Object object) throws TenantAwareLoadBalanceEndpointException {
+ if (object == null) {
+ String message = name + " reference in the proxy admin config holder is null";
+ log.error(message);
+ throw new TenantAwareLoadBalanceEndpointException(message);
+ }
+ }
+
+ public UserRegistry getGovernanceRegistry() {
+ return governanceRegistry;
+ }
+
+ public void setGovernanceRegistry(UserRegistry governanceRegistry) {
+ this.governanceRegistry = governanceRegistry;
+ }
+
+ public SynapseEnvironmentService getSynapseEnvironmentService(int tenantId) {
+ return tenantIdSynapseEnvironmentServiceMap.get(tenantId);
+ }
+
+ public void addSynapseEnvironmentService(int tenantId, SynapseEnvironmentService synapseEnvironmentService) {
+ tenantIdSynapseEnvironmentServiceMap.put(tenantId, synapseEnvironmentService);
+ }
+
+ public void removeSynapseEnvironmentService(int tenantId) {
+ tenantIdSynapseEnvironmentServiceMap.remove(tenantId);
+ }
+
+ public Map<Integer, SynapseEnvironmentService> getTenantIdSynapseEnvironmentServiceMap() {
+ return tenantIdSynapseEnvironmentServiceMap;
+ }
+
+ public ConfigurationContext getConfigCtxt() {
+ return configCtxt;
+ }
+
+ public void setConfigCtxt(ConfigurationContext configCtxt) {
+ this.configCtxt = configCtxt;
+ }
+
+ // ServiceNameServiceContextMap methods START
+ public Collection<ServiceContext> getServiceContexts() {
+ return serviceNameServiceContextMap.values();
+ }
+
+ public ServiceContext getServiceContext(String serviceName) {
+ return serviceNameServiceContextMap.get(serviceName);
+ }
+
+ public void addServiceContext(ServiceContext serviceContext) {
+ serviceNameServiceContextMap.put(serviceContext.getServiceName(), serviceContext);
+ }
+
+ public void removeServiceContext(String serviceName) {
+ serviceNameServiceContextMap.remove(serviceName);
+ }
+ // ServiceNameServiceContextMap methods END
+
+ // ClusterIdClusterContextMap methods START
+ public Collection<ClusterContext> getClusterContexts() {
+ return clusterIdClusterContextMap.values();
+ }
+
+ public ClusterContext getClusterContext(String clusterId) {
+ return clusterIdClusterContextMap.get(clusterId);
+ }
+
+ public void addClusterContext(ClusterContext clusterContext) {
+ clusterIdClusterContextMap.put(clusterContext.getClusterId(), clusterContext);
+ }
+
+ public void removeClusterContext(String clusterId) {
+ clusterIdClusterContextMap.remove(clusterId);
+ }
+ // ClusterIdClusterContextMap methods END
+
+ // ClusterIdClusterMap methods START
+ public Cluster getCluster(String clusterId) {
+ return clusterIdClusterMap.get(clusterId);
+ }
+
+ public boolean clusterExists(String clusterId) {
+ return clusterIdClusterMap.containsKey(clusterId);
+ }
+
+ public void addCluster(Cluster cluster) {
+ clusterIdClusterMap.put(cluster.getClusterId(), cluster);
+ }
+
+ public void removeCluster(String clusterId) {
+ clusterIdClusterMap.remove(clusterId);
+ }
+ // ClusterIdClusterMap methods END
+
+ // SingleTenantClusterMap methods START
+ public Cluster getSingleTenantCluster(String hostName) {
+ return singleTenantClusterMap.get(hostName);
+ }
+
+ public boolean singleTenantClusterExists(String hostName) {
+ return singleTenantClusterMap.containsKey(hostName);
+ }
+
+ public void addSingleTenantCluster(String hostName, Cluster cluster) {
+ singleTenantClusterMap.put(hostName, cluster);
+ }
+
+ public void removeSingleTenantCluster(Cluster cluster) {
+ for(String hostName : cluster.getHostNames()) {
+ removeSingleTenantCluster(hostName);
+ }
+ }
+
+ public void removeSingleTenantCluster(String hostName) {
+ singleTenantClusterMap.remove(hostName);
+ }
+ // SingleTenantClusterMap methods END
+
+ // MultiTenantClusterMap methods START
+ public Cluster getMultiTenantCluster(String hostName, int tenantId) {
+ Map<Integer, Cluster> clusterMap = getMultiTenantClusters(hostName);
+ if(clusterMap != null) {
+ return clusterMap.get(tenantId);
+ }
+ return null;
+ }
+
+ public Map<Integer, Cluster> getMultiTenantClusters(String hostName) {
+ if(multiTenantClustersExists(hostName)) {
+ return null;
+ }
+ return multiTenantClusterMap.get(hostName);
+ }
+
+ public boolean multiTenantClustersExists(String hostName) {
+ return multiTenantClusterMap.containsKey(hostName);
+ }
+
+ public void addMultiTenantClusters(String hostname, Map<Integer, Cluster> clusters) {
+ multiTenantClusterMap.put(hostname, clusters);
+ }
+
+ public void removeMultiTenantClusters(String hostName) {
+ multiTenantClusterMap.remove(hostName);
+ }
+ // MultiTenantClusterMap methods END
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 7b8c816..95ffb1f 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -189,7 +189,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
if(!requestDelegator.isTargetHostValid(targetHost)) {
throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost));
}
- Member member = requestDelegator.findNextMember(targetHost);
+ Member member = requestDelegator.findNextMemberFromHostName(targetHost);
if (member == null)
return null;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
index 15f9600..6ebf1ca 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/internal/LoadBalancerServiceComponent.java
@@ -21,15 +21,17 @@ package org.apache.stratos.load.balancer.internal;
import org.apache.axis2.deployment.DeploymentEngine;
import org.apache.axis2.engine.AxisConfiguration;
-import org.apache.stratos.load.balancer.LoadBalancerContext;
-import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver;
-import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.EndpointDeployer;
+import org.apache.stratos.load.balancer.LoadBalancerTenantReceiver;
+import org.apache.stratos.load.balancer.LoadBalancerTopologyReceiver;
+import org.apache.stratos.load.balancer.TenantAwareLoadBalanceEndpointException;
+import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
import org.apache.stratos.load.balancer.conf.configurator.CEPConfigurator;
import org.apache.stratos.load.balancer.conf.configurator.JndiConfigurator;
-import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
import org.apache.stratos.load.balancer.conf.configurator.SynapseConfigurator;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.messaging.message.filter.topology.ClusterFilter;
import org.apache.stratos.messaging.message.filter.topology.ServiceFilter;
import org.apache.synapse.config.SynapseConfiguration;
@@ -46,10 +48,9 @@ import org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService;
import org.wso2.carbon.mediation.initializer.services.SynapseRegistrationsService;
import org.wso2.carbon.registry.core.exceptions.RegistryException;
import org.wso2.carbon.registry.core.service.RegistryService;
+import org.wso2.carbon.user.core.service.RealmService;
import org.wso2.carbon.utils.ConfigurationContextService;
import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
-import org.wso2.carbon.user.core.service.RealmService;
-import org.apache.stratos.load.balancer.EndpointDeployer;
import java.io.File;
import java.util.Map;
@@ -96,6 +97,7 @@ public class LoadBalancerServiceComponent {
private boolean activated = false;
private LoadBalancerTopologyReceiver topologyReceiver;
+ private LoadBalancerTenantReceiver tenantReceiver;
protected void activate(ComponentContext ctxt) {
try {
@@ -112,10 +114,24 @@ public class LoadBalancerServiceComponent {
// Configure cep settings
CEPConfigurator.configure(configuration);
- if (configuration.isTopologyEventListenerEnabled()) {
+ if (configuration.isMultiTenancyEnabled()) {
// Configure jndi.properties
JndiConfigurator.configure(configuration);
+ tenantReceiver = new LoadBalancerTenantReceiver();
+ Thread tenantReceiverThread = new Thread(tenantReceiver);
+ tenantReceiverThread.start();
+ if (log.isInfoEnabled()) {
+ log.info("Tenant receiver thread started");
+ }
+ }
+
+ if (configuration.isTopologyEventListenerEnabled()) {
+ if (!JndiConfigurator.isConfigured()) {
+ // Configure jndi.properties
+ JndiConfigurator.configure(configuration);
+ }
+
// Start topology receiver
topologyReceiver = new LoadBalancerTopologyReceiver();
Thread topologyReceiverThread = new Thread(topologyReceiver);
@@ -153,7 +169,7 @@ public class LoadBalancerServiceComponent {
log.info("Load balancer service component is activated ");
}
} catch (Exception e) {
- if(log.isFatalEnabled()) {
+ if (log.isFatalEnabled()) {
log.fatal("Failed to activate load balancer service component", e);
}
}
@@ -162,7 +178,7 @@ public class LoadBalancerServiceComponent {
protected void deactivate(ComponentContext context) {
try {
Set<Map.Entry<Integer, SynapseEnvironmentService>> entrySet = LoadBalancerContext
- .getInstance().getSynapseEnvironmentServiceMap().entrySet();
+ .getInstance().getTenantIdSynapseEnvironmentServiceMap().entrySet();
for (Map.Entry<Integer, SynapseEnvironmentService> entry : entrySet) {
unregisterDeployer(entry.getValue().getConfigurationContext()
.getAxisConfiguration(), entry.getValue()
@@ -171,6 +187,8 @@ public class LoadBalancerServiceComponent {
} catch (Exception e) {
log.warn("Couldn't remove the endpoint deployer");
}
+ // Terminate tenant receiver
+ tenantReceiver.terminate();
// Terminate topology receiver
topologyReceiver.terminate();
}
@@ -258,7 +276,7 @@ public class LoadBalancerServiceComponent {
*/
protected void setSynapseEnvironmentService(SynapseEnvironmentService synapseEnvironmentService) {
boolean alreadyCreated = LoadBalancerContext.getInstance()
- .getSynapseEnvironmentServiceMap()
+ .getTenantIdSynapseEnvironmentServiceMap()
.containsKey(synapseEnvironmentService.getTenantId());
LoadBalancerContext.getInstance().addSynapseEnvironmentService(
@@ -337,7 +355,7 @@ public class LoadBalancerServiceComponent {
protected void unsetSynapseRegistrationsService(
SynapseRegistrationsService synapseRegistrationsService) {
int tenantId = synapseRegistrationsService.getTenantId();
- if (LoadBalancerContext.getInstance().getSynapseEnvironmentServiceMap()
+ if (LoadBalancerContext.getInstance().getTenantIdSynapseEnvironmentServiceMap()
.containsKey(tenantId)) {
SynapseEnvironment env = LoadBalancerContext.getInstance()
.getSynapseEnvironmentService(tenantId)
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
index 0ac8e0c..cd9fa1a 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/LoadBalancerInFlightRequestCountCollector.java
@@ -18,15 +18,15 @@
*/
package org.apache.stratos.load.balancer.statistics;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver;
+
import java.util.HashMap;
import java.util.Map;
import java.util.Observable;
import java.util.concurrent.ConcurrentHashMap;
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.stratos.load.balancer.statistics.observers.WSO2CEPInFlightRequestCountObserver;
-
/**
* This is the load balancing stats collector and any observer can get registered here
* and receive notifications periodically.
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
index 9a77778..8b7bf5a 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/statistics/observers/WSO2CEPInFlightRequestCountObserver.java
@@ -18,12 +18,14 @@
*/
package org.apache.stratos.load.balancer.statistics.observers;
-import java.util.*;
-
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.common.statistics.WSO2CEPInFlightRequestPublisher;
+import java.util.Map;
+import java.util.Observable;
+import java.util.Observer;
+
public class WSO2CEPInFlightRequestCountObserver implements Observer {
private static final Log log = LogFactory.getLog(WSO2CEPInFlightRequestCountObserver.class);
private WSO2CEPInFlightRequestPublisher publisher;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
index 08e7f27..4993db7 100755
--- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
+++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer1.conf
@@ -47,7 +47,7 @@ loadbalancer {
topology-event-listener: true;
# Message broker endpoint
- # Provide message broker ip address and port if topology_event_listener_enabled is set to true.
+ # Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true.
mb-ip: localhost;
mb-port: 5677;
@@ -67,7 +67,7 @@ loadbalancer {
cep-stats-publisher: true;
# Complex event processor endpoint
- # Provide CEP ip address and port if stats_publisher_enabled is set to true.
+ # Provide CEP ip address and port if cep-stats-publisher is set to true.
cep-ip: localhost;
cep-port: 7615;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
index b60919a..1a345a9 100755
--- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
+++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
@@ -47,7 +47,7 @@ loadbalancer {
topology-event-listener: false;
# Message broker endpoint
- # Provide message broker ip address and port if topology_event_listener_enabled is set to true.
+ # Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true.
# mb-ip: localhost;
# mb-port: 5677;
@@ -67,7 +67,7 @@ loadbalancer {
cep-stats-publisher: true;
# Complex event processor endpoint
- # Provide CEP ip address and port if stats_publisher_enabled is set to true.
+ # Provide CEP ip address and port if cep-stats-publisher is set to true.
cep-ip: localhost;
cep-port: 7615;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
index 8fbe48d..e9332d0 100755
--- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
+++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer3.conf
@@ -47,7 +47,7 @@ loadbalancer {
topology-event-listener: true;
# Message broker endpoint
- # Provide message broker ip address and port if topology_event_listener_enabled is set to true.
+ # Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true.
mb-ip: localhost;
mb-port: 5677;
@@ -67,7 +67,7 @@ loadbalancer {
cep-stats-publisher: false;
# Complex event processor endpoint
- # Provide CEP ip address and port if stats_publisher_enabled is set to true.
+ # Provide CEP ip address and port if cep-stats-publisher is set to true.
# cep-ip: localhost;
# cep-port: 7615;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
----------------------------------------------------------------------
diff --git a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
index 038aa19..a3b621d 100644
--- a/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
+++ b/products/load-balancer/modules/distribution/src/main/conf/loadbalancer.conf
@@ -41,7 +41,7 @@ loadbalancer {
topology-event-listener: true;
# Message broker endpoint
- # Provide message broker ip address and port if topology_event_listener_enabled is set to true.
+ # Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true.
mb-ip: localhost;
mb-port: 5677;
@@ -61,7 +61,7 @@ loadbalancer {
cep-stats-publisher: true;
# Complex event processor endpoint
- # Provide CEP ip address and port if stats_publisher_enabled is set to true.
+ # Provide CEP ip address and port if cep-stats-publisher is set to true.
cep-ip: localhost;
cep-port: 7615;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/25c76201/tools/stratos-installer/config/lb/repository/conf/loadbalancer.conf
----------------------------------------------------------------------
diff --git a/tools/stratos-installer/config/lb/repository/conf/loadbalancer.conf b/tools/stratos-installer/config/lb/repository/conf/loadbalancer.conf
index 6a6e4ee..971abd5 100644
--- a/tools/stratos-installer/config/lb/repository/conf/loadbalancer.conf
+++ b/tools/stratos-installer/config/lb/repository/conf/loadbalancer.conf
@@ -41,7 +41,7 @@ loadbalancer {
topology-event-listener: true;
# Message broker endpoint
- # Provide message broker ip address and port if topology_event_listener_enabled is set to true.
+ # Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true.
mb-ip: MB_IP;
mb-port: MB_LISTEN_PORT;
@@ -61,7 +61,7 @@ loadbalancer {
cep-stats-publisher: true;
# Complex event processor endpoint
- # Provide CEP ip address and port if stats_publisher_enabled is set to true.
+ # Provide CEP ip address and port if cep-stats-publisher is set to true.
cep-ip: CEP_IP;
cep-port: CEP_LISTEN_PORT;
[2/2] git commit: Merge remote-tracking branch 'origin/master'
Posted by im...@apache.org.
Merge remote-tracking branch 'origin/master'
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/d05eda2f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/d05eda2f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/d05eda2f
Branch: refs/heads/master
Commit: d05eda2f7536cab4818abd1fecba9384d75f21b2
Parents: 25c7620 91970c3
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sat Dec 7 23:52:13 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sat Dec 7 23:52:13 2013 +0530
----------------------------------------------------------------------
.../DatabaseBasedPersistenceManager.java | 19 +-
.../ApplicationCartridgeSubscription.java | 86 ++++
.../mgt/subscription/CartridgeSubscription.java | 46 +-
.../subscription/CartridgeSubscription_old.java | 431 +++++++++++++++++++
.../subscription/DataCartridgeSubscription.java | 49 ++-
.../DataCartridgeSubscription_old.java | 127 ++++++
.../FrameworkCartridgeSubscription.java | 78 ++++
.../subscription/LBCartridgeSubscription.java | 51 +++
.../MultiTenantCartridgeSubscription.java | 268 ++++++------
.../SingleTenantCartridgeSubscription.java | 272 ++++++------
.../factory/CartridgeSubscriptionFactory.java | 33 +-
.../SubscriptionMultiTenantBehaviour.java | 119 +++++
.../SubscriptionSingleTenantBehaviour.java | 120 ++++++
.../tenancy/SubscriptionTenancyBehaviour.java | 54 +++
.../adc/mgt/test/CartridgeSubscriptionTest.java | 1 -
15 files changed, 1429 insertions(+), 325 deletions(-)
----------------------------------------------------------------------