You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2013/12/08 05:31:36 UTC
git commit: Fixed tenant subscription publishing/receiving logic
Updated Branches:
refs/heads/master 2dbf5598a -> 2fbd0fd38
Fixed tenant subscription publishing/receiving logic
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2fbd0fd3
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2fbd0fd3
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2fbd0fd3
Branch: refs/heads/master
Commit: 2fbd0fd380a1c617eef64128758ad87d4ecb7c3c
Parents: 2dbf559
Author: Imesh Gunaratne <im...@apache.org>
Authored: Sun Dec 8 10:01:20 2013 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Sun Dec 8 10:01:20 2013 +0530
----------------------------------------------------------------------
.../mgt/publisher/TenantSynzhronizerTask.java | 26 ++++-
.../balancer/LoadBalancerTenantReceiver.java | 105 +++++++++++--------
.../balancer/LoadBalancerTopologyReceiver.java | 64 ++---------
.../stratos/load/balancer/RequestDelegator.java | 14 +--
.../conf/LoadBalancerConfiguration.java | 17 +--
.../context/LoadBalancerContextUtil.java | 73 +++++++++++++
.../sample/configuration/loadbalancer2.conf | 4 +-
.../stratos/messaging/domain/tenant/Tenant.java | 5 +
.../distribution/src/main/conf/log4j.properties | 2 +-
9 files changed, 192 insertions(+), 118 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
index 62c5103..d777c2d 100644
--- a/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
+++ b/components/org.apache.stratos.adc.mgt/src/main/java/org/apache/stratos/adc/mgt/publisher/TenantSynzhronizerTask.java
@@ -21,7 +21,9 @@ package org.apache.stratos.adc.mgt.publisher;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.adc.mgt.dao.CartridgeSubscriptionInfo;
import org.apache.stratos.adc.mgt.internal.DataHolder;
+import org.apache.stratos.adc.mgt.utils.PersistenceManager;
import org.apache.stratos.messaging.broker.publish.EventPublisher;
import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
@@ -48,20 +50,34 @@ public class TenantSynzhronizerTask implements Task {
@Override
public void execute() {
try {
- if(log.isInfoEnabled()) {
+ if (log.isInfoEnabled()) {
log.info(String.format("Publishing complete tenant event"));
}
+ Tenant tenant;
List<Tenant> tenants = new ArrayList<Tenant>();
TenantManager tenantManager = DataHolder.getRealmService().getTenantManager();
org.wso2.carbon.user.api.Tenant[] carbonTenants = tenantManager.getAllTenants();
- for(org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
- tenants.add(new Tenant(carbonTenant.getId(), carbonTenant.getDomain()));
+ for (org.wso2.carbon.user.api.Tenant carbonTenant : carbonTenants) {
+ // Create tenant
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Tenant found: [tenant-id] %d [tenant-domain] %s", carbonTenant.getId(), carbonTenant.getDomain()));
+ }
+ tenant = new Tenant(carbonTenant.getId(), carbonTenant.getDomain());
+ // Add subscriptions
+ List<CartridgeSubscriptionInfo> subscriptions = PersistenceManager.getSubscriptionsForTenant(tenant.getTenantId());
+ for (CartridgeSubscriptionInfo subscription : subscriptions) {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Tenant subscription found: [tenant-id] %d [tenant-domain] %s [service] %s",
+ carbonTenant.getId(), carbonTenant.getDomain(), subscription.getCartridge()));
+ }
+ tenant.addServiceSubscription(subscription.getCartridge());
+ }
+ tenants.add(tenant);
}
CompleteTenantEvent event = new CompleteTenantEvent(tenants);
EventPublisher eventPublisher = new EventPublisher(Constants.TENANT_TOPIC);
eventPublisher.publish(event);
- }
- catch (Exception e) {
+ } catch (Exception e) {
if (log.isErrorEnabled()) {
log.error("Could not publish complete tenant event", e);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
index ad17ec9..dc6e402 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTenantReceiver.java
@@ -22,11 +22,14 @@ package org.apache.stratos.load.balancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.messaging.domain.tenant.Tenant;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Service;
import org.apache.stratos.messaging.event.Event;
+import org.apache.stratos.messaging.event.tenant.CompleteTenantEvent;
import org.apache.stratos.messaging.event.tenant.TenantSubscribedEvent;
import org.apache.stratos.messaging.event.tenant.TenantUnSubscribedEvent;
+import org.apache.stratos.messaging.listener.tenant.CompleteTenantEventListener;
import org.apache.stratos.messaging.listener.tenant.TenantSubscribedEventListener;
import org.apache.stratos.messaging.listener.tenant.TenantUnSubscribedEventListener;
import org.apache.stratos.messaging.message.processor.tenant.TenantMessageProcessorChain;
@@ -59,64 +62,80 @@ public class LoadBalancerTenantReceiver implements Runnable {
private TenantMessageProcessorChain createEventProcessorChain() {
TenantMessageProcessorChain messageProcessorChain = new TenantMessageProcessorChain();
- messageProcessorChain.addEventListener(new TenantSubscribedEventListener() {
+ messageProcessorChain.addEventListener(new CompleteTenantEventListener() {
@Override
protected void onEvent(Event event) {
- TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event;
-
- // Find cluster of tenant
- Cluster cluster = findCluster(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId());
- if(cluster != null) {
- for(String hostName : cluster.getHostNames()) {
- // Add hostName, tenantId, cluster to multi-tenant map
- Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusters(hostName);
- if(clusterMap == null) {
- clusterMap = new HashMap<Integer, Cluster>();
- clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster);
- LoadBalancerContext.getInstance().addMultiTenantClusters(hostName, clusterMap);
- }
- else {
- clusterMap.put(tenantSubscribedEvent.getTenantId(), cluster);
- }
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
- hostName, tenantSubscribedEvent.getTenantId(), cluster.getClusterId()));
- }
- }
- }
- else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
- tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId()));
+ CompleteTenantEvent completeTenantEvent = (CompleteTenantEvent) event;
+ for(Tenant tenant : completeTenantEvent.getTenants()) {
+ for(String serviceName : tenant.getServiceSubscriptions()) {
+ addTenantSubscriptionToLbContext(serviceName, tenant.getTenantId());
}
}
}
});
+ messageProcessorChain.addEventListener(new TenantSubscribedEventListener() {
+ @Override
+ protected void onEvent(Event event) {
+ TenantSubscribedEvent tenantSubscribedEvent = (TenantSubscribedEvent) event;
+ addTenantSubscriptionToLbContext(tenantSubscribedEvent.getServiceName(), tenantSubscribedEvent.getTenantId());
+ }
+ });
messageProcessorChain.addEventListener(new TenantUnSubscribedEventListener() {
@Override
protected void onEvent(Event event) {
TenantUnSubscribedEvent tenantUnSubscribedEvent = (TenantUnSubscribedEvent) event;
+ removeTenantSubscriptionFromLbContext(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId());
+ }
+ });
+ return messageProcessorChain;
+ }
- // Find cluster of tenant
- Cluster cluster = findCluster(tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId());
- if(cluster != null) {
- for(String hostName : cluster.getHostNames()) {
- LoadBalancerContext.getInstance().removeMultiTenantClusters(hostName);
- if(log.isDebugEnabled()) {
- log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
- hostName, tenantUnSubscribedEvent.getTenantId(), cluster.getClusterId()));
- }
- }
+ private void addTenantSubscriptionToLbContext(String serviceName, int tenantId) {
+ // Find cluster of tenant
+ Cluster cluster = findCluster(serviceName, tenantId);
+ if(cluster != null) {
+ for(String hostName : cluster.getHostNames()) {
+ // Add hostName, tenantId, cluster to multi-tenant map
+ Map<Integer, Cluster> clusterMap = LoadBalancerContext.getInstance().getMultiTenantClusters(hostName);
+ if(clusterMap == null) {
+ clusterMap = new HashMap<Integer, Cluster>();
+ clusterMap.put(tenantId, cluster);
+ LoadBalancerContext.getInstance().addMultiTenantClusters(hostName, clusterMap);
}
else {
- if(log.isErrorEnabled()) {
- log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
- tenantUnSubscribedEvent.getServiceName(), tenantUnSubscribedEvent.getTenantId()));
- }
+ clusterMap.put(tenantId, cluster);
+ }
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Cluster added to multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+ hostName, tenantId, cluster.getClusterId()));
}
}
- });
- return messageProcessorChain;
+ }
+ else {
+ if(log.isErrorEnabled()) {
+ log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
+ serviceName, tenantId));
+ }
+ }
+ }
+
+ private void removeTenantSubscriptionFromLbContext(String serviceName, int tenantId) {
+ // Find cluster of tenant
+ Cluster cluster = findCluster(serviceName, tenantId);
+ if (cluster != null) {
+ for (String hostName : cluster.getHostNames()) {
+ LoadBalancerContext.getInstance().removeMultiTenantClusters(hostName);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster removed from multi-tenant clusters map: [host-name] %s [tenant-id] %d [cluster] %s",
+ hostName, tenantId, cluster.getClusterId()));
+ }
+ }
+ } else {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Could not find cluster of tenant: [service] %s [tenant-id] %d",
+ serviceName, tenantId));
+ }
+ }
}
private Cluster findCluster(String serviceName, int tenantId) {
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
index 55ff817..bd5a0b1 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/LoadBalancerTopologyReceiver.java
@@ -22,10 +22,10 @@ package org.apache.stratos.load.balancer;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Service;
-import org.apache.stratos.messaging.domain.topology.ServiceType;
import org.apache.stratos.messaging.event.Event;
import org.apache.stratos.messaging.event.topology.ClusterRemovedEvent;
import org.apache.stratos.messaging.event.topology.MemberActivatedEvent;
@@ -85,7 +85,7 @@ public class LoadBalancerTopologyReceiver implements Runnable {
for (Service service : TopologyManager.getTopology().getServices()) {
for (Cluster cluster : service.getClusters()) {
if (hasActiveMembers(cluster)) {
- addClusterToLbContext(cluster);
+ LoadBalancerContextUtil.addClusterToLbContext(cluster);
}
}
}
@@ -113,10 +113,9 @@ public class LoadBalancerTopologyReceiver implements Runnable {
MemberActivatedEvent memberActivatedEvent = (MemberActivatedEvent) event;
Cluster cluster = LoadBalancerContext.getInstance().getCluster(memberActivatedEvent.getClusterId());
if (cluster != null) {
- addClusterToLbContext(cluster);
- }
- else {
- if(log.isWarnEnabled()) {
+ LoadBalancerContextUtil.addClusterToLbContext(cluster);
+ } else {
+ if (log.isWarnEnabled()) {
log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", memberActivatedEvent.getClusterId()));
}
}
@@ -136,11 +135,10 @@ public class LoadBalancerTopologyReceiver implements Runnable {
Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterRemovedEvent.getClusterId());
if (cluster != null) {
for (String hostName : cluster.getHostNames()) {
- removeClusterFromLbContext(hostName);
+ LoadBalancerContextUtil.removeClusterFromLbContext(hostName);
}
- }
- else {
- if(log.isWarnEnabled()) {
+ } else {
+ if (log.isWarnEnabled()) {
log.warn(String.format("Cluster not found in cluster id cluster map: [cluster] %s", clusterRemovedEvent.getClusterId()));
}
}
@@ -161,12 +159,11 @@ public class LoadBalancerTopologyReceiver implements Runnable {
if (service != null) {
for (Cluster cluster : service.getClusters()) {
for (String hostName : cluster.getHostNames()) {
- removeClusterFromLbContext(hostName);
+ LoadBalancerContextUtil.removeClusterFromLbContext(hostName);
}
}
- }
- else {
- if(log.isWarnEnabled()) {
+ } else {
+ if (log.isWarnEnabled()) {
log.warn(String.format("Service not found in topology: [service] %s", serviceRemovedEvent.getServiceName()));
}
}
@@ -178,45 +175,6 @@ public class LoadBalancerTopologyReceiver implements Runnable {
return processorChain;
}
- private void addClusterToLbContext(Cluster cluster) {
- // Add cluster to Map<ClusterId, Cluster>
- LoadBalancerContext.getInstance().addCluster(cluster);
-
- Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
- if (service.getServiceType() == ServiceType.SingleTenant) {
- // Add cluster to SingleTenantClusterMap
- for (String hostName : cluster.getHostNames()) {
- if (!LoadBalancerContext.getInstance().singleTenantClusterExists((hostName))) {
- LoadBalancerContext.getInstance().addSingleTenantCluster(hostName, cluster);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster added to single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
- }
- }
- }
- }
- // MultiTenantClusterMap is updated by tenant receiver.
- }
-
- private void removeClusterFromLbContext(String clusterId) {
- Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterId);
- Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
- if (service.getServiceType() == ServiceType.SingleTenant) {
- // Remove cluster from SingleTenantClusterMap
- for (String hostName : cluster.getHostNames()) {
- if (LoadBalancerContext.getInstance().singleTenantClusterExists(hostName)) {
- LoadBalancerContext.getInstance().removeSingleTenantCluster(hostName);
- if (log.isDebugEnabled()) {
- log.debug(String.format("Cluster removed from single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
- }
- }
- }
- }
- // MultiTenantClusterMap is updated by tenant receiver.
-
- // Remove cluster from Map<ClusterId,Cluster>
- LoadBalancerContext.getInstance().removeCluster(clusterId);
- }
-
/**
* Terminate load balancer topology receiver thread.
*/
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
index 9b0a8a5..be09b31 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/RequestDelegator.java
@@ -23,6 +23,7 @@ import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.stratos.load.balancer.algorithm.AlgorithmContext;
import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithm;
+import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
@@ -116,14 +117,13 @@ public class RequestDelegator {
}
public boolean isTargetHostValid(String hostName) {
- try {
- if (hostName == null)
- return false;
+ if (hostName == null)
+ return false;
- TopologyManager.acquireReadLock();
- return LoadBalancerContext.getInstance().clusterExists(hostName);
- } finally {
- TopologyManager.releaseReadLock();
+ boolean valid = LoadBalancerContext.getInstance().singleTenantClusterExists(hostName);
+ if ((!valid) && (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled())) {
+ valid = LoadBalancerContext.getInstance().multiTenantClustersExists(hostName);
}
+ return valid;
}
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
index c94d55f..ae811a0 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/conf/LoadBalancerConfiguration.java
@@ -28,6 +28,7 @@ import org.apache.stratos.load.balancer.conf.structure.Node;
import org.apache.stratos.load.balancer.conf.structure.NodeBuilder;
import org.apache.stratos.load.balancer.conf.util.Constants;
import org.apache.stratos.load.balancer.context.LoadBalancerContext;
+import org.apache.stratos.load.balancer.context.LoadBalancerContextUtil;
import org.apache.stratos.load.balancer.exception.InvalidConfigurationException;
import org.apache.stratos.messaging.domain.topology.*;
import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
@@ -238,7 +239,7 @@ public class LoadBalancerConfiguration {
public LoadBalancerConfiguration readFromFile() {
String configFilePath = System.getProperty("loadbalancer.conf.file");
- if(configFilePath == null){
+ if (configFilePath == null) {
throw new RuntimeException("loadbalancer.conf.file' system property is not set");
}
@@ -302,8 +303,8 @@ public class LoadBalancerConfiguration {
configuration.setMultiTenancyEnabled(Boolean.parseBoolean(multiTenancyEnabled));
}
- // Read mb ip, port, topology service filter and topology cluster filter if topology event listener is enabled
- if (configuration.isTopologyEventListenerEnabled()) {
+ // Read mb ip and port
+ if (configuration.isTopologyEventListenerEnabled() || configuration.isMultiTenancyEnabled()) {
String mbIp = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_MB_IP);
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_MB_IP, mbIp, "loadbalancer");
configuration.setMbIp(mbIp);
@@ -311,7 +312,10 @@ public class LoadBalancerConfiguration {
String mbPort = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_MB_PORT);
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_MB_PORT, mbPort, "loadbalancer");
configuration.setMbPort(Integer.parseInt(mbPort));
+ }
+ // Read topology service filter and topology cluster filter
+ if (configuration.isTopologyEventListenerEnabled()) {
String serviceFilter = loadBalancerNode.getProperty(Constants.CONF_PROPERTY_TOPOLOGY_SERVICE_FILTER);
if (StringUtils.isNotBlank(serviceFilter)) {
configuration.setTopologyServiceFilter(serviceFilter);
@@ -349,8 +353,7 @@ public class LoadBalancerConfiguration {
validateRequiredPropertyInNode(Constants.CONF_PROPERTY_TENANT_IDENTIFIER_REGEX, tenantIdentifierRegex, "loadbalancer");
try {
Pattern.compile(tenantIdentifierRegex);
- }
- catch (Exception e) {
+ } catch (Exception e) {
throw new InvalidConfigurationException(String.format("Invalid tenant identifier regular expression: %s", tenantIdentifierRegex), e);
}
configuration.setTenantIdentifierRegex(tenantIdentifierRegex);
@@ -419,8 +422,8 @@ public class LoadBalancerConfiguration {
}
service.addCluster(cluster);
- // Add cluster to load balancer context Map<Hostname,Cluster>
- LoadBalancerContext.getInstance().addCluster(cluster);
+ // Add cluster to load balancer context
+ LoadBalancerContextUtil.addClusterToLbContext(cluster);
}
// Add service to topology manager
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
new file mode 100644
index 0000000..f644933
--- /dev/null
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/context/LoadBalancerContextUtil.java
@@ -0,0 +1,73 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.load.balancer.context;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.messaging.domain.topology.Cluster;
+import org.apache.stratos.messaging.domain.topology.Service;
+import org.apache.stratos.messaging.domain.topology.ServiceType;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+
+/**
+ * Load balancer context utility class.
+ */
+public class LoadBalancerContextUtil {
+ private static final Log log = LogFactory.getLog(LoadBalancerContextUtil.class);
+
+ public static void addClusterToLbContext(Cluster cluster) {
+ // Add cluster to Map<ClusterId, Cluster>
+ LoadBalancerContext.getInstance().addCluster(cluster);
+
+ Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
+ if (service.getServiceType() == ServiceType.SingleTenant) {
+ // Add cluster to SingleTenantClusterMap
+ for (String hostName : cluster.getHostNames()) {
+ if (!LoadBalancerContext.getInstance().singleTenantClusterExists((hostName))) {
+ LoadBalancerContext.getInstance().addSingleTenantCluster(hostName, cluster);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster added to single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
+ }
+ }
+ }
+ }
+ // MultiTenantClusterMap is updated by tenant receiver.
+ }
+
+ public static void removeClusterFromLbContext(String clusterId) {
+ Cluster cluster = LoadBalancerContext.getInstance().getCluster(clusterId);
+ Service service = TopologyManager.getTopology().getService(cluster.getServiceName());
+ if (service.getServiceType() == ServiceType.SingleTenant) {
+ // Remove cluster from SingleTenantClusterMap
+ for (String hostName : cluster.getHostNames()) {
+ if (LoadBalancerContext.getInstance().singleTenantClusterExists(hostName)) {
+ LoadBalancerContext.getInstance().removeSingleTenantCluster(hostName);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Cluster removed from single tenant cluster map: [cluster] %s [hostname] %s", cluster.getClusterId(), hostName));
+ }
+ }
+ }
+ }
+ // MultiTenantClusterMap is updated by tenant receiver.
+
+ // Remove cluster from Map<ClusterId,Cluster>
+ LoadBalancerContext.getInstance().removeCluster(clusterId);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
index 1a345a9..dac8f0b 100755
--- a/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
+++ b/components/org.apache.stratos.load.balancer/src/test/resources/sample/configuration/loadbalancer2.conf
@@ -48,8 +48,8 @@ loadbalancer {
# Message broker endpoint
# Provide message broker ip address and port if topology-event-listener or multi-tenancy is set to true.
- # mb-ip: localhost;
- # mb-port: 5677;
+ mb-ip: localhost;
+ mb-port: 5677;
# Topology service filter
# Provide service names in a comma separated list to filter incoming topology events if
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
index f4e3a0c..bc4244a 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/tenant/Tenant.java
@@ -20,6 +20,7 @@
package org.apache.stratos.messaging.domain.tenant;
import java.io.Serializable;
+import java.util.Collection;
import java.util.HashMap;
import java.util.Map;
@@ -52,6 +53,10 @@ public class Tenant implements Serializable{
this.tenantDomain = tenantDomain;
}
+ public Collection<String> getServiceSubscriptions() {
+ return serviceNameMap.keySet();
+ }
+
public boolean isServiceSubscribed(String serviceName) {
return serviceNameMap.containsKey(serviceName);
}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2fbd0fd3/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
----------------------------------------------------------------------
diff --git a/products/load-balancer/modules/distribution/src/main/conf/log4j.properties b/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
index 1942895..3a23f46 100644
--- a/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
+++ b/products/load-balancer/modules/distribution/src/main/conf/log4j.properties
@@ -111,7 +111,7 @@ log4j.appender.CARBON_LOGFILE.File=${carbon.home}/repository/logs/${instance.log
log4j.appender.CARBON_LOGFILE.Append=true
log4j.appender.CARBON_LOGFILE.layout=org.wso2.carbon.utils.logging.TenantAwarePatternLayout
# ConversionPattern will be overridden by the configuration setting in the DB
-log4j.appender.CARBON_LOGFILE.layout.ConversionPattern=TID: [%T] [%S] [%d] %P%5p {%c} - %x %m {%c}%n
+log4j.appender.CARBON_LOGFILE.layout.ConversionPattern=TID: [%T] [%S] [%d] %P%5p {%c} - %x %m %n
log4j.appender.CARBON_LOGFILE.layout.TenantPattern=%U%@%D [%T] [%S]
log4j.appender.CARBON_LOGFILE.threshold=DEBUG