You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2013/07/05 12:33:05 UTC
[28/34] committing refactoered adc components and top level pom in
components
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/EndpointDeployer.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/EndpointDeployer.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/EndpointDeployer.java
new file mode 100644
index 0000000..e4274f5
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/EndpointDeployer.java
@@ -0,0 +1,71 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.context.ConfigurationContext;
+import org.wso2.carbon.mediation.initializer.ServiceBusConstants;
+import org.wso2.carbon.mediation.initializer.ServiceBusUtils;
+import org.wso2.carbon.mediation.initializer.persistence.MediationPersistenceManager;
+
+import java.util.Properties;
+
+/**
+ * Responsible for deploying Synapse artifacts.
+ */
+public class EndpointDeployer extends org.apache.synapse.deployers.EndpointDeployer {
+
+ MediationPersistenceManager mpm;
+
+ @Override
+ public void init(ConfigurationContext configCtx) {
+ super.init(configCtx);
+ this.mpm = ServiceBusUtils.getMediationPersistenceManager(configCtx.getAxisConfiguration());
+ }
+
+ @Override
+ public String deploySynapseArtifact(OMElement artifactConfig, String fileName,
+ Properties properties) {
+ String epName = super.deploySynapseArtifact(artifactConfig, fileName, properties);
+ mpm.saveItemToRegistry(epName, ServiceBusConstants.ITEM_TYPE_ENDPOINT);
+ return epName;
+ }
+
+ @Override
+ public String updateSynapseArtifact(OMElement artifactConfig, String fileName,
+ String existingArtifactName, Properties properties) {
+ String epName = super.updateSynapseArtifact(
+ artifactConfig, fileName, existingArtifactName, properties);
+ mpm.saveItemToRegistry(epName, ServiceBusConstants.ITEM_TYPE_ENDPOINT);
+ return epName;
+ }
+
+ @Override
+ public void undeploySynapseArtifact(String artifactName) {
+ super.undeploySynapseArtifact(artifactName);
+ mpm.deleteItemFromRegistry(artifactName, ServiceBusConstants.ITEM_TYPE_ENDPOINT);
+ }
+
+ @Override
+ public void restoreSynapseArtifact(String artifactName) {
+ super.restoreSynapseArtifact(artifactName);
+ mpm.saveItemToRegistry(artifactName, ServiceBusConstants.ITEM_TYPE_ENDPOINT);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantAwareLoadBalanceEndpointException.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantAwareLoadBalanceEndpointException.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantAwareLoadBalanceEndpointException.java
new file mode 100644
index 0000000..ac78fdd
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantAwareLoadBalanceEndpointException.java
@@ -0,0 +1,39 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint;
+
+/**
+ * Exception to be thrown from this component.
+ */
+public class TenantAwareLoadBalanceEndpointException extends RuntimeException {
+
+ private static final long serialVersionUID = -663839410798538370L;
+
+ public TenantAwareLoadBalanceEndpointException(String msg) {
+ super(msg);
+ }
+
+ public TenantAwareLoadBalanceEndpointException(String msg, Throwable cause) {
+ super(msg, cause);
+ }
+
+ public TenantAwareLoadBalanceEndpointException(Throwable cause) {
+ super(cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantLoadBalanceMembershipHandler.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantLoadBalanceMembershipHandler.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantLoadBalanceMembershipHandler.java
new file mode 100644
index 0000000..07208cb
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/TenantLoadBalanceMembershipHandler.java
@@ -0,0 +1,208 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint;
+
+
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.management.GroupManagementAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.LoadBalanceMembershipHandler;
+import org.apache.synapse.endpoints.algorithms.AlgorithmContext;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+import org.apache.stratos.lb.common.conf.util.HostContext;
+import org.wso2.carbon.user.api.UserStoreException;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+/**
+ * Bridge between Axis2 membership notification and Synapse load balancing
+ */
+public class TenantLoadBalanceMembershipHandler implements LoadBalanceMembershipHandler {
+ private static final Log log = LogFactory.getLog(TenantLoadBalanceMembershipHandler.class);
+
+ private ConfigurationContext configCtx;
+
+ private LoadbalanceAlgorithm lbAlgo;
+
+ /**
+ * Key - Host, Value - HostContext
+ */
+ private static Map<String, HostContext> hostContextsMap =
+ new HashMap<String, HostContext>();
+
+ private ClusteringAgent clusteringAgent;
+
+ private boolean isClusteringEnabled;
+ private String endpointName;
+
+ public TenantLoadBalanceMembershipHandler(Map<String, HostContext> hostContexts,
+ LoadbalanceAlgorithm algorithm,
+ ConfigurationContext configCtx,
+ boolean isClusteringEnabled,
+ String endpointName) {
+
+ lbAlgo = algorithm;
+ this.isClusteringEnabled = isClusteringEnabled;
+ this.endpointName = endpointName;
+ this.configCtx = configCtx;
+
+ for (HostContext host : hostContexts.values()) {
+
+ addHostContext(host);
+
+ }
+ }
+
+ /**
+ * This will be used to add new {@link HostContext}s.
+ * @param host {@link HostContext}
+ */
+ public void addHostContext(HostContext host){
+
+ String hostName = host.getHostName();
+
+ AlgorithmContext algorithmContext =
+ new AlgorithmContext(isClusteringEnabled,
+ configCtx, endpointName + "." +
+ hostName);
+
+ host.setAlgorithm(lbAlgo.clone());
+ host.setAlgorithmContext(algorithmContext);
+
+ hostContextsMap.put(hostName, host);
+
+ }
+
+ /**
+ * This will be used to remove an existing {@link HostContext}s.
+ * @param host {@link HostContext}
+ */
+ public void removeHostContext(String host){
+
+ hostContextsMap.remove(host);
+
+ }
+
+ public void init(Properties props, LoadbalanceAlgorithm algorithm) {
+ // Nothing to do
+ }
+
+ public void setConfigurationContext(ConfigurationContext configCtx) {
+ this.configCtx = configCtx;
+
+ // The following code does the bridging between Axis2 and Synapse load balancing
+ clusteringAgent = configCtx.getAxisConfiguration().getClusteringAgent();
+ if (clusteringAgent == null) {
+ String msg = "In order to enable load balancing across an Axis2 cluster, " +
+ "the cluster entry should be enabled in the axis2.xml file";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+
+ public ConfigurationContext getConfigurationContext() {
+ return configCtx;
+ }
+
+ /**
+ * Getting the next member to which the request has to be sent in a round-robin fashion
+ *
+ * @param context The AlgorithmContext
+ * @return The current member
+ * @deprecated Use {@link #getNextApplicationMember(String, int)}
+ */
+ public Member getNextApplicationMember(AlgorithmContext context) {
+ throw new UnsupportedOperationException("This operation is invalid. " +
+ "Call getNextApplicationMember(String host)");
+ }
+
+ public boolean isAValidHostName(String host){
+ if(getHostContext(host) != null){
+ return true;
+ }
+ return false;
+ }
+
+ public Member getNextApplicationMember(String host, int tenantId) {
+ HostContext hostContext = getHostContext(host);
+
+ if(hostContext == null){
+ String msg = "Invalid host name : " + host;
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ // here we have to pass tenant id to get domain from hostContext
+ String domain = hostContext.getDomainFromTenantId(tenantId);
+ String subDomain = hostContext.getSubDomainFromTenantId(tenantId);
+
+ LoadbalanceAlgorithm algorithm = hostContext.getAlgorithm();
+ GroupManagementAgent groupMgtAgent = clusteringAgent.getGroupManagementAgent(domain, subDomain);
+
+ if (groupMgtAgent == null) {
+ String tenantDomain;
+ try {
+ tenantDomain = ConfigHolder.getInstance().getRealmService().getTenantManager().getDomain(tenantId);
+ } catch (UserStoreException ignore) {
+ tenantDomain = ""+tenantId;
+ }
+
+ String msg =
+ "No Group Management Agent found for the domain: " + domain + ", subDomain: "
+ + subDomain + ", host: " + host+ " and for tenant: "
+ + tenantDomain;
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ algorithm.setApplicationMembers(groupMgtAgent.getMembers());
+ AlgorithmContext context = hostContext.getAlgorithmContext();
+ return algorithm.getNextApplicationMember(context);
+ }
+
+ public HostContext getHostContext(String host) {
+ HostContext hostContext = hostContextsMap.get(host);
+ if (hostContext == null) {
+ int indexOfDot;
+ if ((indexOfDot = host.indexOf(".")) != -1) {
+ hostContext = getHostContext(host.substring(indexOfDot + 1));
+ }
+ }
+ return hostContext;
+ }
+
+ public LoadbalanceAlgorithm getLoadbalanceAlgorithm() {
+ return lbAlgo;
+ }
+
+ public Properties getProperties() {
+ return null;
+ }
+
+ public ClusteringAgent getClusteringAgent() {
+ return clusteringAgent;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/builder/TopologySyncher.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/builder/TopologySyncher.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/builder/TopologySyncher.java
new file mode 100644
index 0000000..478790e
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/builder/TopologySyncher.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.builder;
+
+import java.util.Iterator;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.stratos.lb.common.conf.LoadBalancerConfiguration;
+import org.apache.stratos.lb.common.conf.LoadBalancerConfiguration.ServiceConfiguration;
+import org.apache.stratos.lb.common.conf.structure.Node;
+import org.apache.stratos.lb.common.conf.structure.NodeBuilder;
+import org.apache.stratos.lb.common.conf.util.HostContext;
+import org.apache.stratos.lb.common.conf.util.TenantDomainContext;
+import org.apache.stratos.lb.endpoint.TenantLoadBalanceMembershipHandler;
+import org.apache.stratos.lb.endpoint.group.mgt.GroupMgtAgentBuilder;
+
+/**
+ * A Thread, which is responsible for making a sense out of a message received for
+ * ELB via topology synchronization.
+ */
+public class TopologySyncher implements Runnable {
+
+ private static final Log log = LogFactory.getLog(TopologySyncher.class);
+
+ /*
+ * This is a reference to sharedTopologyQueue ConfigHolder.
+ */
+ private BlockingQueue<String> sharedQueue;
+
+ public TopologySyncher(BlockingQueue<String> queue) {
+
+ sharedQueue = queue;
+
+ }
+
+ @Override
+ public void run() {
+ // grab the lb configuration instance
+ LoadBalancerConfiguration lbconfig = LoadBalancerConfiguration.getInstance();
+
+ // FIXME Currently there has to be at least one dummy cluster defined in the loadbalancer
+ // conf
+ // in order to proper initialization of TribesClusteringAgent.
+ generateGroupMgtAgents(lbconfig);
+
+ // this thread should run for ever, untill terminated.
+ while (true) {
+ try {
+
+ // grabs a message or waits till the queue is non-empty
+ String message = sharedQueue.take();
+// ConfigHolder data = ConfigHolder.getInstance();
+
+ // this message needs attention only if it's not same as the previous message and
+ // not null of course.
+// if (data.getPreviousMsg() == null || !data.getPreviousMsg().equals(message)) {
+
+ // reset the previous message
+// data.setPreviousMsg(message);
+
+ // build the nginx format of this message, and get the Node object
+ Node topologyNode = NodeBuilder.buildNode(message);
+
+ // reset service configurations
+// lbconfig.resetData();
+ // create new service configurations
+ List<ServiceConfiguration> currentServiceConfigs = lbconfig.createServicesConfig(topologyNode);
+
+ generateGroupMgtAgents(lbconfig);
+
+ removeGroupMgtAgents(lbconfig, currentServiceConfigs);
+
+// }
+
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ }
+
+ private void removeGroupMgtAgents(LoadBalancerConfiguration lbConfig, List<ServiceConfiguration> currentServiceConfigs) {
+
+ for (Iterator iterator = lbConfig.getServiceConfigurations().values().iterator(); iterator.hasNext();) {
+ Map<String, ServiceConfiguration> valuesMap = (Map<String, ServiceConfiguration>) iterator.next();
+
+ for (Iterator iterator2 = valuesMap.values().iterator(); iterator2.hasNext();) {
+ ServiceConfiguration oldServiceConfig = (ServiceConfiguration) iterator2.next();
+
+ if(!currentServiceConfigs.contains(oldServiceConfig)){
+ // if the ServiceConfiguration is not there any more in the latest topology
+ lbConfig.removeServiceConfiguration(oldServiceConfig.getDomain(), oldServiceConfig.getSubDomain());
+ GroupMgtAgentBuilder.resetGroupMgtAgent(oldServiceConfig.getDomain(), oldServiceConfig.getSubDomain());
+ }
+ }
+ }
+ }
+
+ /**
+ * @param lbconfig
+ */
+ private void generateGroupMgtAgents(LoadBalancerConfiguration lbconfig) {
+ TenantLoadBalanceMembershipHandler handler =
+ ConfigHolder.getInstance()
+ .getTenantLoadBalanceMembershipHandler();
+
+ if (handler == null) {
+ String msg =
+ "TenantLoadBalanceMembershipHandler is null. Thus, We cannot proceed.";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ Map<String, HostContext> hostContexts = lbconfig.getHostContextMap();
+
+ // Add the Axis2 GroupManagement agents
+ if (hostContexts != null) {
+ // iterate through each host context
+ for (HostContext hostCtxt : hostContexts.values()) {
+ // each host can has multiple Tenant Contexts, iterate through them
+ for (TenantDomainContext tenantCtxt : hostCtxt
+ .getTenantDomainContexts()) {
+
+ String domain = tenantCtxt.getDomain();
+ String subDomain = tenantCtxt.getSubDomain();
+
+ // creates the group management agent
+ GroupMgtAgentBuilder.createGroupMgtAgent(domain,
+ subDomain);
+ }
+
+ // add to the handler
+ handler.addHostContext(hostCtxt);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/cluster/manager/ClusterDomainManagerImpl.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/cluster/manager/ClusterDomainManagerImpl.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/cluster/manager/ClusterDomainManagerImpl.java
new file mode 100644
index 0000000..c7e1a1d
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/cluster/manager/ClusterDomainManagerImpl.java
@@ -0,0 +1,228 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.cluster.manager;
+
+import java.util.ArrayList;
+import java.util.List;
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.wso2.carbon.cartridge.messages.ClusterDomainManager;
+import org.wso2.carbon.cartridge.messages.ClusterDomain;
+import org.apache.stratos.lb.common.conf.LoadBalancerConfiguration;
+import org.apache.stratos.lb.common.conf.LoadBalancerConfiguration.ServiceConfiguration;
+import org.apache.stratos.lb.common.conf.util.Constants;
+import org.apache.stratos.lb.common.conf.util.HostContext;
+import org.apache.stratos.lb.common.conf.util.TenantDomainContext;
+import org.apache.stratos.lb.common.group.mgt.SubDomainAwareGroupManagementAgent;
+import org.apache.stratos.lb.endpoint.TenantLoadBalanceMembershipHandler;
+
+/**
+ * Bridge between the ELB and the Stratos2 Agent.
+ */
+public class ClusterDomainManagerImpl implements ClusterDomainManager {
+
+ private static final Log log = LogFactory.getLog(ClusterDomainManagerImpl.class);
+
+ @Override
+ public void addClusterDomain(ClusterDomain cluster) {
+
+ // create group management agent, if one doesn't exist already.
+ HostContext hostCtxt = createGroupMgtAgentIfNotExists(cluster);
+
+ // we should only update if the above step is successful.
+ if (hostCtxt != null) {
+ // create / update Service Configuration
+ createOrUpdateServiceConfig(cluster, hostCtxt);
+ }
+
+ }
+
+ @Override
+ public void removeClusterDomain(String domain, String subDomain, String hostName) {
+
+ TenantLoadBalanceMembershipHandler handler =
+ ConfigHolder.getInstance()
+ .getTenantLoadBalanceMembershipHandler();
+
+ if (handler == null) {
+ String msg = "TenantLoadBalanceMembershipHandler is null. Thus, We cannot proceed.";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ handler.removeHostContext(hostName);
+
+ LoadBalancerConfiguration lbConfig = ConfigHolder.getInstance().getLbConfig();
+
+ lbConfig.removeServiceConfiguration(domain, subDomain);
+
+ }
+
+ private void createOrUpdateServiceConfig(ClusterDomain cluster, HostContext ctxt) {
+ LoadBalancerConfiguration lbConfig = ConfigHolder.getInstance().getLbConfig();
+
+ String domain = cluster.getDomain();
+ String subDomain = cluster.getSubDomain();
+
+ if (subDomain == null || subDomain.isEmpty()) {
+ // uses default sub domain
+ subDomain = Constants.DEFAULT_SUB_DOMAIN;
+ }
+
+ String hostName = cluster.getHostName();
+ String tenantRange = cluster.getTenantRange();
+ int minInstances = cluster.getMinInstances();
+ int maxInstances = cluster.getMaxInstances();
+ String service = cluster.getServiceName();
+ int maxRequestsPerSecond = cluster.getMaxRequestsPerSecond();
+ int roundsToAverage = cluster.getRoundsToAverage();
+ double alarmingUpperRate = cluster.getAlarmingUpperRate();
+ double alarmingLowerRate = cluster.getAlarmingLowerRate();
+ double scaleDownFactor = cluster.getScaleDownFactor();
+
+ ServiceConfiguration serviceConfig ;
+
+ if((serviceConfig = lbConfig.getServiceConfig(domain, subDomain)) == null){
+ serviceConfig = lbConfig.new ServiceConfiguration();
+ }
+
+ // we simply override the attributes with new values
+ serviceConfig.setDomain(domain);
+ serviceConfig.setSub_domain(subDomain);
+ serviceConfig.setTenant_range(tenantRange);
+ serviceConfig.setHosts(hostName);
+ serviceConfig.setMin_app_instances(minInstances);
+ serviceConfig.setMax_app_instances(maxInstances);
+ serviceConfig.setMax_requests_per_second(maxRequestsPerSecond);
+ serviceConfig.setRounds_to_average(roundsToAverage);
+ serviceConfig.setAlarming_upper_rate(alarmingUpperRate);
+ serviceConfig.setAlarming_lower_rate(alarmingLowerRate);
+ serviceConfig.setScale_down_factor(scaleDownFactor);
+
+ // add to host name tracker
+ lbConfig.addToHostNameTrackerMap(service, serviceConfig.getHosts());
+
+ // add to host contexts
+ lbConfig.addToHostContextMap(hostName, ctxt);
+
+ // finally add this service config
+ lbConfig.addServiceConfiguration(serviceConfig);
+ }
+
+ protected HostContext createGroupMgtAgentIfNotExists(ClusterDomain cluster) {
+
+ String domain = cluster.getDomain();
+ String subDomain = cluster.getSubDomain();
+ String hostName = cluster.getHostName();
+ String tenantRange = cluster.getTenantRange();
+
+ // sub domain can be null, but others can't
+ if (domain == null || hostName == null || tenantRange == null) {
+ String msg =
+ "Invalid value/s detected - domain: " + domain + "\n host name: " +
+ hostName + "\n tenant range: " + tenantRange;
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ if (subDomain == null || subDomain.isEmpty()) {
+ // uses default sub domain
+ subDomain = Constants.DEFAULT_SUB_DOMAIN;
+ }
+
+ ClusteringAgent clusteringAgent = null;
+
+ try {
+ clusteringAgent =
+ ConfigHolder.getInstance().getAxisConfiguration()
+ .getClusteringAgent();
+
+ } catch (Exception e) {
+ String msg = "Failed to retrieve Clustering Agent.";
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
+
+ }
+
+ if (clusteringAgent == null) {
+ String msg = "Clustering Agent is null.";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ /*
+ * Add Group Management Agent if one is not already present for this domain and sub
+ * domain
+ */
+
+ if (clusteringAgent.getGroupManagementAgent(domain, subDomain) == null) {
+ clusteringAgent.addGroupManagementAgent(new SubDomainAwareGroupManagementAgent(subDomain),
+ domain, subDomain,-1);
+
+ if (log.isDebugEnabled()) {
+ log.debug("Group management agent added to cluster domain: " + domain +
+ " and sub domain: " + subDomain);
+ }
+
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Group management agent is already available for cluster domain: " +
+ domain + " and sub domain: " + subDomain);
+ }
+ }
+
+ TenantLoadBalanceMembershipHandler handler =
+ ConfigHolder.getInstance()
+ .getTenantLoadBalanceMembershipHandler();
+
+ if (handler == null) {
+ String msg = "TenantLoadBalanceMembershipHandler is null. Thus, We cannot proceed.";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ HostContext hostCtxt;
+
+ // if there's an already registered HostContext use it
+ if((hostCtxt = handler.getHostContext(hostName)) == null){
+ hostCtxt = new HostContext(hostName);
+ }
+
+ List<TenantDomainContext> ctxts;
+ ctxts = new ArrayList<TenantDomainContext>(hostCtxt.getTenantDomainContexts());
+
+ // default value is super tenant mode - which is defined by tenant id 0, in this context
+ int tenantId = 0;
+ if(!"*".equals(tenantRange)){
+ tenantId = Integer.parseInt(tenantRange);
+ }
+
+ ctxts.add(new TenantDomainContext(tenantId, domain, subDomain));
+
+ hostCtxt.addTenantDomainContexts(ctxts);
+
+ handler.addHostContext(hostCtxt);
+
+ return hostCtxt;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
new file mode 100644
index 0000000..2945265
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -0,0 +1,531 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.endpoint;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Set;
+
+import org.apache.axiom.om.OMElement;
+import org.apache.axiom.om.util.AXIOMUtil;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.Member;
+import org.apache.axis2.clustering.management.GroupManagementAgent;
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.stratos.lb.endpoint.TenantAwareLoadBalanceEndpointException;
+import org.apache.stratos.lb.endpoint.internal.RegistryManager;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.http.protocol.HTTP;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.xml.endpoints.utils.LoadbalanceAlgorithmFactory;
+import org.apache.synapse.core.LoadBalanceMembershipHandler;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.core.axis2.Axis2SynapseEnvironment;
+import org.apache.synapse.endpoints.DynamicLoadbalanceFaultHandler;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.algorithms.LoadbalanceAlgorithm;
+import org.apache.synapse.endpoints.dispatch.HttpSessionDispatcher;
+import org.apache.synapse.endpoints.dispatch.SALSessions;
+import org.apache.synapse.endpoints.dispatch.SessionInformation;
+import org.apache.synapse.transport.nhttp.NhttpConstants;
+import org.wso2.carbon.base.MultitenantConstants;
+import org.apache.stratos.lb.common.conf.LoadBalancerConfiguration;
+import org.apache.stratos.lb.common.conf.util.HostContext;
+import org.apache.stratos.lb.common.conf.util.TenantDomainContext;
+import org.apache.stratos.lb.common.group.mgt.SubDomainAwareGroupManagementAgent;
+import org.apache.stratos.lb.common.util.DomainMapping;
+import org.apache.stratos.lb.common.cache.URLMappingCache;
+import org.apache.stratos.lb.endpoint.TenantLoadBalanceMembershipHandler;
+
+public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints.DynamicLoadbalanceEndpoint implements Serializable {
+
+ private static final long serialVersionUID = 1577351815951789938L;
+ private static final Log log = LogFactory.getLog(TenantAwareLoadBalanceEndpoint.class);
+ /**
+ * Axis2 based membership handler which handles members in multiple clustering domains
+ */
+ private TenantLoadBalanceMembershipHandler tlbMembershipHandler;
+
+ /**
+ * Key - host name
+ * Value - {@link HostContext}
+ */
+ private Map<String, HostContext> hostContexts = new HashMap<String, HostContext>();
+
+ private LoadBalancerConfiguration lbConfig;
+
+ /**
+ * keep the size of cache which used to keep hostNames of url mapping.
+ */
+ private URLMappingCache mappingCache;
+ private RegistryManager registryManager;
+ private int sizeOfCache;
+
+ private boolean initialized;
+
+ private String algorithm;
+ private String configuration;
+ private String failOver;
+
+ @Override
+ public void init(SynapseEnvironment synapseEnvironment) {
+ try {
+
+ lbConfig = ConfigHolder.getInstance().getLbConfig();
+ hostContexts = lbConfig.getHostContextMap();
+ sizeOfCache = lbConfig.getLoadBalancerConfig().getSizeOfCache();
+ mappingCache = URLMappingCache.getInstance(sizeOfCache);
+ setSessionTimeout(lbConfig.getLoadBalancerConfig().getSessionTimeOut());
+ setFailover(lbConfig.getLoadBalancerConfig().getFailOver());
+
+ } catch (Exception e) {
+ String msg = "Failed while reading Load Balancer configuration";
+ log.error(msg, e);
+ throw new TenantAwareLoadBalanceEndpointException(msg, e);
+ }
+
+
+ LoadbalanceAlgorithm algorithm = null;
+ try {
+ OMElement payload = AXIOMUtil.stringToOM(generatePayLoad());
+ algorithm =
+ LoadbalanceAlgorithmFactory.
+ createLoadbalanceAlgorithm(payload, null);
+
+ } catch (Exception e) {
+ String msg = "Error While creating Load balance algorithm";
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
+ }
+
+ if (!initialized) {
+ super.init(synapseEnvironment);
+ ConfigurationContext cfgCtx =
+ ((Axis2SynapseEnvironment) synapseEnvironment).getAxis2ConfigurationContext();
+ ClusteringAgent clusteringAgent = cfgCtx.getAxisConfiguration().getClusteringAgent();
+ if (clusteringAgent == null) {
+ throw new SynapseException("Axis2 ClusteringAgent not defined in axis2.xml");
+ }
+
+ // Add the Axis2 GroupManagement agents
+ if (hostContexts != null) {
+ // iterate through each host context
+ for (HostContext hostCtxt : hostContexts.values()) {
+ // each host can has multiple Tenant Contexts, iterate through them
+ for (TenantDomainContext tenantCtxt : hostCtxt.getTenantDomainContexts()) {
+
+ String domain = tenantCtxt.getDomain();
+ String subDomain = tenantCtxt.getSubDomain();
+
+ if (clusteringAgent.getGroupManagementAgent(domain, subDomain) == null) {
+ String gmAgentClass = lbConfig.getLoadBalancerConfig().getGroupManagementAgentClass();
+ GroupManagementAgent groupManagementAgent;
+ if (gmAgentClass != null) {
+ try {
+ groupManagementAgent = (GroupManagementAgent) Class.forName(gmAgentClass).newInstance();
+ } catch (Exception e) {
+ String msg = "Cannot instantiate GroupManagementAgent. Class: " + gmAgentClass;
+ log.error(msg, e);
+ throw new TenantAwareLoadBalanceEndpointException(msg, e);
+ }
+ } else {
+ groupManagementAgent = new SubDomainAwareGroupManagementAgent(subDomain);
+ }
+ clusteringAgent.addGroupManagementAgent(groupManagementAgent,
+ domain, subDomain,-1);
+ if (log.isDebugEnabled()) {
+ log.debug("Group management agent added to cluster domain: " +
+ domain + " and sub domain: " + subDomain);
+ }
+ }
+ }
+ }
+
+ tlbMembershipHandler =
+ new TenantLoadBalanceMembershipHandler(hostContexts,
+ algorithm, cfgCtx,
+ isClusteringEnabled,
+ getName());
+
+ // set TenantLoadBalanceMembershipHandler for future reference
+ ConfigHolder.getInstance().setTenantLoadBalanceMembershipHandler(tlbMembershipHandler);
+ }
+
+ // Initialize the SAL Sessions if already has not been initialized.
+ SALSessions salSessions = SALSessions.getInstance();
+ if (!salSessions.isInitialized()) {
+ salSessions.initialize(isClusteringEnabled, cfgCtx);
+ }
+ setSessionAffinity(true);
+ setDispatcher(new HttpSessionDispatcher());
+ initialized = true;
+ log.info("Tenant Aware Load Balance Endpoint is initialized.");
+ }
+
+ }
+
+ public void setConfiguration(String paramEle) {
+ configuration = paramEle;
+ }
+
+ public void setAlgorithm(String paramEle) {
+ this.algorithm = paramEle;
+ }
+
+ public void setFailOver(String paramEle) {
+ this.failOver = paramEle;
+ }
+
+
+ public String getName() {
+ return "tlbEndpoint";
+ }
+
+ //TODO remove following hard coded element
+ private String generatePayLoad() {
+ return " <serviceDynamicLoadbalance failover=\"true\"\n" +
+ " algorithm=\"org.apache.synapse.endpoints.algorithms.RoundRobin\"" +
+ //" configuration=\"$system:loadbalancer.xml\"" +
+ "/>";
+ }
+
+ public LoadBalanceMembershipHandler getLbMembershipHandler() {
+ return tlbMembershipHandler;
+ }
+
+
+ public void send(MessageContext synCtx) {
+ /* setCookieHeader(synCtx); */
+ Member currentMember = null;
+ SessionInformation sessionInformation = null;
+ String actualHost = null;
+
+ //Gathering required information for domain mapping
+ org.apache.axis2.context.MessageContext axis2MessageContext =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+ Map<String, String> transportHeaders = (Map<String, String>) axis2MessageContext.
+ getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
+ String targetHost = transportHeaders.get(HTTP.TARGET_HOST);
+
+ String port = "";
+ boolean containsPort = false;
+ if (targetHost.contains(":")) {
+ containsPort = true;
+ port = targetHost.substring(targetHost.indexOf(':') + 1, targetHost.length());
+ targetHost = targetHost.substring(0, targetHost.indexOf(':'));
+ }
+ //Gathering required information for domain mapping done
+
+ boolean isValidHost = tlbMembershipHandler.isAValidHostName(targetHost);
+ DomainMapping domainMapping = null;
+ if(!isValidHost){
+ //check if the host is valid, if not valid, execute following code to check whether it is a mapped domain
+ domainMapping = mappingCache.getMapping(targetHost);
+ if(domainMapping == null){
+ registryManager = new RegistryManager();
+ domainMapping = registryManager.getMapping(targetHost);
+ mappingCache.addValidMapping(targetHost, domainMapping);
+ }
+ if (domainMapping != null) {
+ actualHost = domainMapping.getActualHost();
+
+ if(containsPort){
+ transportHeaders.put(HTTP.TARGET_HOST, actualHost + ":" + port);
+ } else {
+ transportHeaders.put(HTTP.TARGET_HOST, actualHost);
+ }
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext().setProperty("TRANSPORT_HEADERS" , transportHeaders);
+
+ } else {
+ String msg = "Invalid host name : " + targetHost;
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+
+ if (isSessionAffinityBasedLB()) {
+ // first check if this session is associated with a session. if so, get the endpoint
+ // associated for that session.
+ sessionInformation =
+ (SessionInformation) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+
+ currentMember = (Member) synCtx.getProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER);
+
+ if (sessionInformation == null && currentMember == null) {
+ sessionInformation = dispatcher.getSession(synCtx);
+ if (sessionInformation != null) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Current session id : " + sessionInformation.getId());
+ }
+
+ currentMember = sessionInformation.getMember();
+ synCtx.setProperty(
+ SynapseConstants.PROP_SAL_ENDPOINT_CURRENT_MEMBER, currentMember);
+ // This is for reliably recovery any session information if while response is getting ,
+ // session information has been removed by cleaner.
+ // This will not be a cost as session information a not heavy data structure
+ synCtx.setProperty(
+ SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION, sessionInformation);
+ }
+ }
+
+ }
+
+ // Dispatch request the relevant member
+// String targetHost = getTargetHost(synCtx);
+ ConfigurationContext configCtx =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext().getConfigurationContext();
+
+ if (tlbMembershipHandler.getConfigurationContext() == null) {
+ tlbMembershipHandler.setConfigurationContext(configCtx);
+ }
+
+ if(tlbMembershipHandler.getClusteringAgent() == null) {
+ tlbMembershipHandler.setConfigurationContext(configCtx);
+ }
+
+ TenantDynamicLoadBalanceFaultHandlerImpl faultHandler = new TenantDynamicLoadBalanceFaultHandlerImpl();
+ log.debug("************* Actual Host: "+actualHost +" ****** Target Host: "+targetHost);
+ faultHandler.setHost(actualHost != null ? actualHost : targetHost);
+
+ if (sessionInformation != null && currentMember != null) {
+ //send message on current session
+ sessionInformation.updateExpiryTime();
+ sendToApplicationMember(synCtx, currentMember, faultHandler, false);
+ } else {
+// prepare for a new session
+ int tenantId = getTenantId(synCtx);
+ //check if this is a valid host name registered in ELB
+ if(tlbMembershipHandler.isAValidHostName(targetHost)){
+ currentMember = tlbMembershipHandler.getNextApplicationMember(targetHost, tenantId);
+ if (currentMember == null) {
+ String msg = "No application members available";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ sendToApplicationMember(synCtx, currentMember, faultHandler, true);
+ } else {
+ if(domainMapping == null){
+ registryManager = new RegistryManager();
+ domainMapping = registryManager.getMapping(targetHost);
+ mappingCache.addValidMapping(targetHost, domainMapping);
+ }
+ if(domainMapping != null) {
+
+ actualHost = domainMapping.getActualHost();
+
+ log.debug("************* Actual Host: "+actualHost +" ****** Target Host: "+targetHost);
+ faultHandler.setHost(actualHost != null ? actualHost : targetHost);
+
+ if(containsPort){
+ transportHeaders.put(HTTP.TARGET_HOST, actualHost + ":" + port);
+ } else {
+ transportHeaders.put(HTTP.TARGET_HOST, actualHost);
+ }
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext().setProperty("TRANSPORT_HEADERS" , transportHeaders);
+
+ currentMember = tlbMembershipHandler.getNextApplicationMember(actualHost,tenantId);
+ sendToApplicationMember(synCtx,currentMember,faultHandler,true);
+ }else {
+ String msg = "Invalid host name : " + targetHost;
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+ }
+ }
+
+
+// public List<HostContext> getHostContexts() {
+// return Collections.unmodifiableList(hostContexts);
+// }
+
+ /**
+ * This FaultHandler will try to resend the message to another member if an error occurs
+ * while sending to some member. This is a failover mechanism
+ */
+
+ /**
+ * @param url to url for target
+ * @return tenantID if tenant id available else 0
+ */
+ private int getTenantId(String url) {
+ String servicesPrefix = "/t/";
+ if (url != null && url.contains(servicesPrefix)) {
+ int domainNameStartIndex =
+ url.indexOf(servicesPrefix) + servicesPrefix.length();
+ int domainNameEndIndex = url.indexOf('/', domainNameStartIndex);
+ String domainName = url.substring(domainNameStartIndex,
+ domainNameEndIndex == -1 ? url.length() : domainNameEndIndex);
+
+ // return tenant id if domain name is not null
+ if (domainName != null) {
+ try {
+ return ConfigHolder.getInstance().getRealmService().getTenantManager().getTenantId(domainName);
+ } catch (org.wso2.carbon.user.api.UserStoreException e) {
+ log.error("An error occurred while obtaining the tenant id.", e);
+ }
+ }
+ }
+ // return 0 if the domain name is null
+ return 0;
+ }
+
+ private int getTenantId(MessageContext synCtx){
+ String url = synCtx.getTo().toString();
+ int tenantId = getTenantId(url);
+ // tenantId = 0 because domain name was null. May be this is the SSO response
+ if(tenantId == 0 && url.contains(MultitenantConstants.TENANT_DOMAIN+"=")){
+ // OK,this is the SAML SSO response from the IS
+ // e.g url = https://localhost:9444/acs?teantDomain=domain
+ String domainName = url.split("=").clone()[1];
+ // return tenant id if domain name is not null
+ if (domainName != null) {
+ try {
+ return ConfigHolder.getInstance().getRealmService().getTenantManager().getTenantId(domainName);
+ } catch (org.wso2.carbon.user.api.UserStoreException e) {
+ log.error("An error occurred while obtaining the tenant id.", e);
+ }
+ }
+ }
+ return tenantId;
+ }
+
+
+ /**
+ * This FaultHandler will try to resend the message to another member if an error occurs
+ * while sending to some member. This is a failover mechanism
+ */
+ private class TenantDynamicLoadBalanceFaultHandlerImpl extends DynamicLoadbalanceFaultHandler {
+
+ private EndpointReference to;
+ private Member currentMember;
+ private Endpoint currentEp;
+ private String host;
+
+ private static final int MAX_RETRY_COUNT = 5;
+
+ // ThreadLocal variable to keep track of how many times this fault handler has been
+ // called
+ private ThreadLocal<Integer> callCount = new ThreadLocal<Integer>() {
+ protected Integer initialValue() {
+ return 0;
+ }
+ };
+
+ public void setHost(String host) {
+ log.debug("Setting host name: "+host);
+ this.host = host;
+ }
+
+ public void setCurrentMember(Member currentMember) {
+ this.currentMember = currentMember;
+ }
+
+ public void setTo(EndpointReference to) {
+ this.to = to;
+ }
+
+ private TenantDynamicLoadBalanceFaultHandlerImpl() {
+ }
+
+ public void onFault(MessageContext synCtx) {
+ if (currentMember == null || to == null) {
+ return;
+ }
+
+ // Prevent infinite retrying to failed members
+ callCount.set(callCount.get() + 1);
+ if (callCount.get() >= MAX_RETRY_COUNT) {
+ log.debug("Retrying to a failed member has stopped.");
+ return;
+ }
+
+ //cleanup endpoint if exists
+ if (currentEp != null) {
+ currentEp.destroy();
+ }
+ Integer errorCode = (Integer) synCtx.getProperty(SynapseConstants.ERROR_CODE);
+ if (errorCode != null) {
+ if (errorCode.equals(NhttpConstants.CONNECTION_FAILED)) {
+ currentMember.suspend(10000); // TODO: Make this configurable.
+ log.info("Suspended member " + currentMember + " for 10s due to connection failure to that member");
+ }
+ if (errorCode.equals(NhttpConstants.CONNECTION_FAILED) ||
+ errorCode.equals(NhttpConstants.CONNECT_CANCEL) ||
+ errorCode.equals(NhttpConstants.CONNECT_TIMEOUT)) {
+
+ if (!synCtx.getFaultStack().isEmpty()) {
+ synCtx.getFaultStack().pop();
+ }
+ // Try to resend to another member
+ Member newMember = tlbMembershipHandler.getNextApplicationMember(host, getTenantId(synCtx.toString()));
+ if (newMember == null || newMember.isSuspended()) {
+ String msg = "No application members available having host name : "+host+
+ " and tenant id : "+getTenantId(synCtx.toString()+" and which is not suspended.");
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ synCtx.setTo(to);
+ if (isSessionAffinityBasedLB()) {
+ // We are sending this message on a new session,
+ // hence we need to remove previous session information
+ Set pros = synCtx.getPropertyKeySet();
+ if (pros != null) {
+ pros.remove(SynapseConstants.PROP_SAL_CURRENT_SESSION_INFORMATION);
+ }
+ }
+ try {
+ Thread.sleep(1000); // Sleep for sometime before retrying
+ } catch (InterruptedException ignored) {
+ }
+
+ if(synCtx == null || to == null) {
+ return;
+ }
+ log.info("Failed over to " + newMember);
+ sendToApplicationMember(synCtx, newMember, this, true);
+ } else if (errorCode.equals(NhttpConstants.SND_IO_ERROR_SENDING) ||
+ errorCode.equals(NhttpConstants.CONNECTION_CLOSED)) {
+ // TODO: Envelope is consumed
+ String msg = "Error sending request! Connection to host "+host+
+ " might be closed. Error code: "+errorCode;
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+ // We cannot failover since we are using binary relay
+ }
+
+ public void setCurrentEp(Endpoint currentEp) {
+ this.currentEp = currentEp;
+ }
+ }
+}
+
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java
new file mode 100644
index 0000000..cfd2bf8
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java
@@ -0,0 +1,80 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.group.mgt;
+
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.management.GroupManagementAgent;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.stratos.lb.common.group.mgt.SubDomainAwareGroupManagementAgent;
+
+/**
+ * Responsible for building {@link GroupManagementAgent}s.
+ */
+public class GroupMgtAgentBuilder {
+
+ private static final Log log = LogFactory.getLog(GroupMgtAgentBuilder.class);
+
+ /**
+ * Creates a {@link SubDomainAwareGroupManagementAgent} corresponds to the given
+ * parameters, if and only if there's no existing agent.
+ * @param domain clustering domain.
+ * @param subDomain clustering sub domain.
+ */
+ public static void createGroupMgtAgent(String domain, String subDomain) {
+
+ ClusteringAgent clusteringAgent =
+ ConfigHolder.getInstance().getAxisConfiguration().getClusteringAgent();
+
+ if (clusteringAgent == null) {
+ throw new SynapseException("Axis2 Clustering Agent not defined in axis2.xml");
+ }
+
+ // checks the existence.
+ if (clusteringAgent.getGroupManagementAgent(domain, subDomain) == null) {
+
+ clusteringAgent.addGroupManagementAgent(new SubDomainAwareGroupManagementAgent(subDomain),
+ domain, subDomain,-1);
+
+ log.info("Group management agent added to cluster domain: " +
+ domain + " and sub domain: " + subDomain);
+ }
+ }
+
+ public static void resetGroupMgtAgent(String domain, String subDomain) {
+
+ ClusteringAgent clusteringAgent =
+ ConfigHolder.getInstance().getAxisConfiguration().getClusteringAgent();
+
+ if (clusteringAgent == null) {
+ throw new SynapseException("Axis2 Clustering Agent not defined in axis2.xml");
+ }
+
+ // checks the existence.
+ if (clusteringAgent.getGroupManagementAgent(domain, subDomain) != null) {
+
+ clusteringAgent.resetGroupManagementAgent(domain, subDomain);
+
+ log.info("Group management agent of cluster domain: " +
+ domain + " and sub domain: " + subDomain+" is removed.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java
new file mode 100644
index 0000000..2763dab
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java
@@ -0,0 +1,397 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.internal;
+
+import org.apache.axis2.deployment.DeploymentEngine;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.stratos.lb.endpoint.TenantAwareLoadBalanceEndpointException;
+import org.apache.stratos.lb.endpoint.builder.TopologySyncher;
+import org.apache.stratos.lb.endpoint.cluster.manager.ClusterDomainManagerImpl;
+import org.apache.stratos.lb.endpoint.endpoint.TenantAwareLoadBalanceEndpoint;
+import org.apache.stratos.lb.endpoint.subscriber.TopologySubscriber;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.deployers.SynapseArtifactDeploymentStore;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.mediators.builtin.SendMediator;
+import org.apache.synapse.mediators.filters.InMediator;
+import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.cartridge.messages.CreateClusterDomainMessage;
+import org.wso2.carbon.mediation.dependency.mgt.services.DependencyManagementService;
+import org.wso2.carbon.mediation.initializer.ServiceBusConstants;
+import org.wso2.carbon.mediation.initializer.ServiceBusUtils;
+import org.wso2.carbon.mediation.initializer.services.SynapseConfigurationService;
+import org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService;
+import org.wso2.carbon.mediation.initializer.services.SynapseRegistrationsService;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.registry.core.service.RegistryService;
+import org.wso2.carbon.utils.ConfigurationContextService;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+import org.wso2.carbon.user.core.service.RealmService;
+import org.apache.stratos.lb.common.service.LoadBalancerConfigurationService;
+import org.apache.stratos.lb.endpoint.EndpointDeployer;
+import org.apache.stratos.lb.endpoint.util.TopologyConstants;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @scr.component name="org.apache.stratos.load.balancer.endpoint" immediate="true"
+ * @scr.reference name="configuration.context.service"
+ * interface="org.wso2.carbon.utils.ConfigurationContextService" cardinality="1..1"
+ * policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+ * @scr.reference name="synapse.config.service"
+ * interface="org.wso2.carbon.mediation.initializer.services.SynapseConfigurationService"
+ * cardinality="1..1" policy="dynamic" bind="setSynapseConfigurationService"
+ * unbind="unsetSynapseConfigurationService"
+ * @scr.reference name="synapse.env.service"
+ * interface="org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService"
+ * cardinality="1..n" policy="dynamic" bind="setSynapseEnvironmentService"
+ * unbind="unsetSynapseEnvironmentService"
+ * @scr.reference name="registry.service"
+ * interface="org.wso2.carbon.registry.core.service.RegistryService"
+ * cardinality="1..1" policy="dynamic"
+ * bind="setRegistryService" unbind="unsetRegistryService"
+ * @scr.reference name="dependency.mgt.service"
+ * interface="org.wso2.carbon.mediation.dependency.mgt.services.DependencyManagementService"
+ * cardinality="0..1" policy="dynamic"
+ * bind="setDependencyManager" unbind="unsetDependencyManager"
+ * @scr.reference name="synapse.registrations.service"
+ * interface="org.wso2.carbon.mediation.initializer.services.SynapseRegistrationsService"
+ * cardinality="1..n" policy="dynamic" bind="setSynapseRegistrationsService"
+ * unbind="unsetSynapseRegistrationsService"
+ * @scr.reference name="user.realmservice.default"
+ * interface="org.wso2.carbon.user.core.service.RealmService"
+ * cardinality="1..1" policy="dynamic" bind="setRealmService"
+ * unbind="unsetRealmService"
+ * @scr.reference name="org.apache.stratos.lb.common"
+ * interface="org.apache.stratos.lb.common.service.LoadBalancerConfigurationService"
+ * cardinality="1..1" policy="dynamic" bind="setLoadBalancerConfigurationService"
+ * unbind="unsetLoadBalancerConfigurationService"
+ */
+@SuppressWarnings({"UnusedDeclaration", "JavaDoc"})
+public class LoadBalanceEndpointServiceComponent {
+
+ private static final Log log = LogFactory.getLog(LoadBalanceEndpointServiceComponent.class);
+
+ private boolean activated = false;
+
+ protected void activate(ComponentContext ctxt) {
+ try {
+ SynapseEnvironmentService synEnvService =
+ ConfigHolder.getInstance()
+ .getSynapseEnvironmentService(MultitenantConstants.SUPER_TENANT_ID);
+
+ registerDeployer(ConfigHolder.getInstance().getAxisConfiguration(),
+ synEnvService.getSynapseEnvironment());
+
+ if (ConfigHolder.getInstance().getConfigCtxt() != null) {
+ ConfigHolder
+ .getInstance()
+ .getConfigCtxt()
+ .setNonReplicableProperty(
+ CreateClusterDomainMessage.CLUSTER_DOMAIN_MANAGER,
+ new ClusterDomainManagerImpl());
+ log.debug("Setting property Cluster Domain MANAGER ... ");
+
+ }
+
+ SynapseEnvironment synapseEnv = synEnvService.getSynapseEnvironment();
+
+ /* Registering Tenant Aware Load Balance Endpoint */
+
+ // get the main sequence mediator
+ SequenceMediator mainSequence =
+ (SequenceMediator) synapseEnv.getSynapseConfiguration()
+ .getSequence("main");
+
+ boolean successfullyRegistered = false;
+
+ // iterate through its child mediators
+ for (Mediator child : mainSequence.getList()) {
+
+ // find the InMediator
+ if (child instanceof InMediator) {
+
+ for(Mediator inChild : ((InMediator)child).getList()){
+
+ // find the SendMediator
+ if (inChild instanceof SendMediator) {
+
+ SendMediator sendMediator = (SendMediator) inChild;
+
+ /* add Tenant Aware LB endpoint */
+
+ TenantAwareLoadBalanceEndpoint tenantAwareEp = new TenantAwareLoadBalanceEndpoint();
+
+ tenantAwareEp.init(synapseEnv);
+
+ sendMediator.setEndpoint(tenantAwareEp);
+
+ successfullyRegistered = true;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Added Tenant Aware Endpoint: " +
+ sendMediator.getEndpoint().getName() + "" +
+ " to Send Mediator.");
+ }
+ }
+ }
+ }
+ }
+
+ if(!successfullyRegistered){
+ String msg = "Failed to register Tenant Aware Load Balance Endpoint in Send Mediator.";
+ log.fatal(msg);
+ throw new TenantAwareLoadBalanceEndpointException(msg);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint Admin bundle is activated ");
+ }
+
+ if (ConfigHolder.getInstance().getLbConfig().getLoadBalancerConfig().getMbServerUrl() != null) {
+
+ // start consumer
+ // initialize TopologyBuilder Consumer
+ Thread topologyConsumer =
+ new Thread(new TopologySyncher(ConfigHolder.getInstance().getSharedTopologyDiffQueue()));
+ // start consumer
+ topologyConsumer.start();
+
+ TopologySubscriber.subscribe(TopologyConstants.TOPIC_NAME);
+
+ }
+ activated = true;
+ } catch (Throwable e) {
+ log.error("Failed to activate Endpoint Admin bundle ", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext context) {
+ try {
+ Set<Map.Entry<Integer, SynapseEnvironmentService>> entrySet =
+ ConfigHolder.getInstance().getSynapseEnvironmentServices().entrySet();
+ for (Map.Entry<Integer, SynapseEnvironmentService> entry : entrySet) {
+ unregisterDeployer(
+ entry.getValue().getConfigurationContext().getAxisConfiguration(),
+ entry.getValue().getSynapseEnvironment());
+ }
+ } catch (Exception e) {
+ log.warn("Couldn't remove the EndpointDeployer");
+ }
+ }
+
+ /**
+ * Un-registers the Endpoint deployer.
+ *
+ * @param axisConfig AxisConfiguration to which this deployer belongs
+ * @param synapseEnvironment SynapseEnvironment to which this deployer belongs
+ */
+ private void unregisterDeployer(AxisConfiguration axisConfig, SynapseEnvironment synapseEnvironment)
+ throws TenantAwareLoadBalanceEndpointException {
+ if (axisConfig != null) {
+ DeploymentEngine deploymentEngine = (DeploymentEngine) axisConfig.getConfigurator();
+ String synapseConfigPath = ServiceBusUtils.getSynapseConfigAbsPath(
+ synapseEnvironment.getServerContextInformation());
+ String endpointDirPath = synapseConfigPath
+ + File.separator + MultiXMLConfigurationBuilder.ENDPOINTS_DIR;
+ deploymentEngine.removeDeployer(
+ endpointDirPath, ServiceBusConstants.ARTIFACT_EXTENSION);
+ }
+ }
+
+ /**
+ * Registers the Endpoint deployer.
+ *
+ * @param axisConfig AxisConfiguration to which this deployer belongs
+ * @param synapseEnvironment SynapseEnvironment to which this deployer belongs
+ */
+ private void registerDeployer(AxisConfiguration axisConfig, SynapseEnvironment synapseEnvironment)
+ throws TenantAwareLoadBalanceEndpointException {
+ SynapseConfiguration synCfg = synapseEnvironment.getSynapseConfiguration();
+ DeploymentEngine deploymentEngine = (DeploymentEngine) axisConfig.getConfigurator();
+ SynapseArtifactDeploymentStore deploymentStore = synCfg.getArtifactDeploymentStore();
+
+ String synapseConfigPath = ServiceBusUtils.getSynapseConfigAbsPath(
+ synapseEnvironment.getServerContextInformation());
+ String endpointDirPath = synapseConfigPath
+ + File.separator + MultiXMLConfigurationBuilder.ENDPOINTS_DIR;
+
+ for (Endpoint ep : synCfg.getDefinedEndpoints().values()) {
+ if (ep.getFileName() != null) {
+ deploymentStore.addRestoredArtifact(
+ endpointDirPath + File.separator + ep.getFileName());
+ }
+ }
+ deploymentEngine.addDeployer(
+ new EndpointDeployer(), endpointDirPath, ServiceBusConstants.ARTIFACT_EXTENSION);
+ }
+
+ protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
+ ConfigHolder.getInstance().setAxisConfiguration(
+ cfgCtxService.getServerConfigContext().getAxisConfiguration());
+ ConfigHolder.getInstance().setConfigCtxt(cfgCtxService.getServerConfigContext());
+ }
+
+ protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
+ ConfigHolder.getInstance().setAxisConfiguration(null);
+ ConfigHolder.getInstance().setConfigCtxt(null);
+ }
+
+ protected void setSynapseConfigurationService(
+ SynapseConfigurationService synapseConfigurationService) {
+
+ ConfigHolder.getInstance().setSynapseConfiguration(
+ synapseConfigurationService.getSynapseConfiguration());
+ }
+
+ protected void unsetSynapseConfigurationService(
+ SynapseConfigurationService synapseConfigurationService) {
+
+ ConfigHolder.getInstance().setSynapseConfiguration(null);
+ }
+
+ /**
+ * Here we receive an event about the creation of a SynapseEnvironment. If this is
+ * SuperTenant we have to wait until all the other constraints are met and actual
+ * initialization is done in the activate method. Otherwise we have to do the activation here.
+ *
+ * @param synapseEnvironmentService SynapseEnvironmentService which contains information
+ * about the new Synapse Instance
+ */
+ protected void setSynapseEnvironmentService(
+ SynapseEnvironmentService synapseEnvironmentService) {
+ boolean alreadyCreated = ConfigHolder.getInstance().getSynapseEnvironmentServices().
+ containsKey(synapseEnvironmentService.getTenantId());
+
+ ConfigHolder.getInstance().addSynapseEnvironmentService(
+ synapseEnvironmentService.getTenantId(),
+ synapseEnvironmentService);
+ if (activated) {
+ if (!alreadyCreated) {
+ try {
+ registerDeployer(synapseEnvironmentService.getConfigurationContext().getAxisConfiguration(),
+ synapseEnvironmentService.getSynapseEnvironment());
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint Admin bundle is activated ");
+ }
+ } catch (Throwable e) {
+ log.error("Failed to activate Endpoint Admin bundle ", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Here we receive an event about Destroying a SynapseEnvironment. This can be the super tenant
+ * destruction or a tenant destruction.
+ *
+ * @param synapseEnvironmentService synapseEnvironment
+ */
+ protected void unsetSynapseEnvironmentService(
+ SynapseEnvironmentService synapseEnvironmentService) {
+ ConfigHolder.getInstance().removeSynapseEnvironmentService(
+ synapseEnvironmentService.getTenantId());
+ }
+
+ protected void setRegistryService(RegistryService regService) {
+ if (log.isDebugEnabled()) {
+ log.debug("RegistryService bound to the endpoint component");
+ }
+ try {
+ ConfigHolder.getInstance().setConfigRegistry(regService.getConfigSystemRegistry());
+ ConfigHolder.getInstance().setGovernanceRegistry(regService.getGovernanceSystemRegistry());
+ } catch (RegistryException e) {
+ log.error("Couldn't retrieve the registry from the registry service");
+ }
+ }
+
+ protected void unsetRegistryService(RegistryService regService) {
+ if (log.isDebugEnabled()) {
+ log.debug("RegistryService unbound from the endpoint component");
+ }
+ ConfigHolder.getInstance().setConfigRegistry(null);
+ }
+
+ protected void setDependencyManager(DependencyManagementService dependencyMgr) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dependency management service bound to the endpoint component");
+ }
+ ConfigHolder.getInstance().setDependencyManager(dependencyMgr);
+ }
+
+ protected void unsetDependencyManager(DependencyManagementService dependencyMgr) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dependency management service unbound from the endpoint component");
+ }
+ ConfigHolder.getInstance().setDependencyManager(null);
+ }
+
+ protected void setSynapseRegistrationsService(
+ SynapseRegistrationsService synapseRegistrationsService) {
+
+ }
+
+ protected void unsetSynapseRegistrationsService(
+ SynapseRegistrationsService synapseRegistrationsService) {
+ int tenantId = synapseRegistrationsService.getTenantId();
+ if (ConfigHolder.getInstance().getSynapseEnvironmentServices().containsKey(tenantId)) {
+ SynapseEnvironment env = ConfigHolder.getInstance().
+ getSynapseEnvironmentService(tenantId).getSynapseEnvironment();
+
+ ConfigHolder.getInstance().removeSynapseEnvironmentService(
+ synapseRegistrationsService.getTenantId());
+
+ AxisConfiguration axisConfig = synapseRegistrationsService.getConfigurationContext().
+ getAxisConfiguration();
+ if (axisConfig != null) {
+ try {
+ unregisterDeployer(axisConfig, env);
+ } catch (Exception e) {
+ log.warn("Couldn't remove the EndpointDeployer");
+ }
+ }
+ }
+ }
+
+ protected void setRealmService(RealmService realmService) {
+ ConfigHolder.getInstance().setRealmService(realmService);
+ }
+
+ protected void unsetRealmService(RealmService realmService) {
+ ConfigHolder.getInstance().setRealmService(null);
+ }
+
+ protected void setLoadBalancerConfigurationService(LoadBalancerConfigurationService lbConfigSer){
+ ConfigHolder.getInstance().setLbConfigService(lbConfigSer);
+ }
+
+ protected void unsetLoadBalancerConfigurationService(LoadBalancerConfigurationService lbConfigSer){
+ ConfigHolder.getInstance().setLbConfigService(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java
new file mode 100644
index 0000000..2f91002
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java
@@ -0,0 +1,54 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.internal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.stratos.lb.common.util.DomainMapping;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.registry.core.session.UserRegistry;
+
+public class RegistryManager {
+ UserRegistry governanceRegistry = ConfigHolder.getInstance().getGovernanceRegistry();
+ private static final Log log = LogFactory.getLog(RegistryManager.class);
+ /**
+ *
+ */
+ private Resource resource = null;
+ public static final String HOST_INFO = "hostinfo/";
+ public static final String ACTUAL_HOST = "actual.host";
+
+ public DomainMapping getMapping(String hostName) {
+ DomainMapping domainMapping;
+ try {
+ if (governanceRegistry.resourceExists(HOST_INFO + hostName)) {
+ resource = governanceRegistry.get(HOST_INFO + hostName);
+ domainMapping = new DomainMapping(hostName);
+ domainMapping.setActualHost(resource.getProperty(ACTUAL_HOST));
+ return domainMapping;
+ }
+ } catch (RegistryException e) {
+ log.info("Error while getting registry resource");
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java
new file mode 100644
index 0000000..e9da865
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.subscriber;
+
+import javax.jms.TopicSubscriber;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This health checker runs forever, and is responsible for re-establishing a connection
+ * between ELB and CC.
+ */
+public class TopicHealthChecker implements Runnable{
+
+ private static final Log log = LogFactory.getLog(TopicHealthChecker.class);
+ private String topicName;
+ private TopicSubscriber subscriber;
+
+ public TopicHealthChecker(String topicName, TopicSubscriber subscriber) {
+ this.topicName = topicName;
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void run() {
+ log.info("Topic Health Checker is running... ");
+
+ while (true) {
+ try {
+ subscriber.getTopic();
+
+ // health checker runs in every 30s
+ Thread.sleep(30000);
+
+ } catch (Exception e) {
+ // implies connection is not established
+ // sleep for 5s and retry
+ try {
+ log.info("Health checker failed and will retry to establish a connection after a 5s.");
+ Thread.sleep(5000);
+ break;
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ }
+
+ TopologySubscriber.subscribe(topicName);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/ca25b1f7/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java
----------------------------------------------------------------------
diff --git a/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java
new file mode 100644
index 0000000..ee7a3ca
--- /dev/null
+++ b/components/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+
+ * http://www.apache.org/licenses/LICENSE-2.0
+
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.stratos.lb.endpoint.subscriber;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TopologyListener implements MessageListener {
+
+ private static final Log log = LogFactory.getLog(TopologyListener.class);
+
+ @Override
+ public void onMessage(Message message) {
+ TextMessage receivedMessage = (TextMessage) message;
+ try {
+
+ ConfigHolder.getInstance().getSharedTopologyDiffQueue().add(receivedMessage.getText());
+
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ }
+
+}