You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by is...@apache.org on 2013/07/01 15:47:25 UTC
[05/12] changing package structure to org.apache.stratos.lb.endpoint
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java
new file mode 100644
index 0000000..6575f97
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/group/mgt/GroupMgtAgentBuilder.java
@@ -0,0 +1,62 @@
+package org.apache.stratos.lb.endpoint.group.mgt;
+
+import org.apache.axis2.clustering.ClusteringAgent;
+import org.apache.axis2.clustering.management.GroupManagementAgent;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.SynapseException;
+import org.apache.stratos.lb.common.group.mgt.SubDomainAwareGroupManagementAgent;
+
+/**
+ * Responsible for building {@link GroupManagementAgent}s.
+ */
+public class GroupMgtAgentBuilder {
+
+ private static final Log log = LogFactory.getLog(GroupMgtAgentBuilder.class);
+
+ /**
+ * Creates a {@link SubDomainAwareGroupManagementAgent} corresponds to the given
+ * parameters, if and only if there's no existing agent.
+ * @param domain clustering domain.
+ * @param subDomain clustering sub domain.
+ */
+ public static void createGroupMgtAgent(String domain, String subDomain) {
+
+ ClusteringAgent clusteringAgent =
+ ConfigHolder.getInstance().getAxisConfiguration().getClusteringAgent();
+
+ if (clusteringAgent == null) {
+ throw new SynapseException("Axis2 Clustering Agent not defined in axis2.xml");
+ }
+
+ // checks the existence.
+ if (clusteringAgent.getGroupManagementAgent(domain, subDomain) == null) {
+
+ clusteringAgent.addGroupManagementAgent(new SubDomainAwareGroupManagementAgent(subDomain),
+ domain, subDomain,-1);
+
+ log.info("Group management agent added to cluster domain: " +
+ domain + " and sub domain: " + subDomain);
+ }
+ }
+
+ public static void resetGroupMgtAgent(String domain, String subDomain) {
+
+ ClusteringAgent clusteringAgent =
+ ConfigHolder.getInstance().getAxisConfiguration().getClusteringAgent();
+
+ if (clusteringAgent == null) {
+ throw new SynapseException("Axis2 Clustering Agent not defined in axis2.xml");
+ }
+
+ // checks the existence.
+ if (clusteringAgent.getGroupManagementAgent(domain, subDomain) != null) {
+
+ clusteringAgent.resetGroupManagementAgent(domain, subDomain);
+
+ log.info("Group management agent of cluster domain: " +
+ domain + " and sub domain: " + subDomain+" is removed.");
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java
new file mode 100644
index 0000000..ececb66
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/LoadBalanceEndpointServiceComponent.java
@@ -0,0 +1,394 @@
+/*
+ * Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.internal;
+
+import org.apache.axis2.deployment.DeploymentEngine;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.stratos.lb.endpoint.TenantAwareLoadBalanceEndpointException;
+import org.apache.stratos.lb.endpoint.builder.TopologySyncher;
+import org.apache.stratos.lb.endpoint.cluster.manager.ClusterDomainManagerImpl;
+import org.apache.stratos.lb.endpoint.endpoint.TenantAwareLoadBalanceEndpoint;
+import org.apache.stratos.lb.endpoint.subscriber.TopologySubscriber;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.Mediator;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.synapse.config.xml.MultiXMLConfigurationBuilder;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.deployers.SynapseArtifactDeploymentStore;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.mediators.base.SequenceMediator;
+import org.apache.synapse.mediators.builtin.SendMediator;
+import org.apache.synapse.mediators.filters.InMediator;
+import org.osgi.service.component.ComponentContext;
+import org.wso2.carbon.cartridge.messages.CreateClusterDomainMessage;
+import org.wso2.carbon.mediation.dependency.mgt.services.DependencyManagementService;
+import org.wso2.carbon.mediation.initializer.ServiceBusConstants;
+import org.wso2.carbon.mediation.initializer.ServiceBusUtils;
+import org.wso2.carbon.mediation.initializer.services.SynapseConfigurationService;
+import org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService;
+import org.wso2.carbon.mediation.initializer.services.SynapseRegistrationsService;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.registry.core.service.RegistryService;
+import org.wso2.carbon.utils.ConfigurationContextService;
+import org.wso2.carbon.utils.multitenancy.MultitenantConstants;
+import org.wso2.carbon.user.core.service.RealmService;
+import org.apache.stratos.lb.common.service.LoadBalancerConfigurationService;
+import org.apache.stratos.lb.endpoint.EndpointDeployer;
+import org.apache.stratos.lb.endpoint.util.TopologyConstants;
+
+import java.io.File;
+import java.util.Map;
+import java.util.Set;
+
+/**
+ * @scr.component name="org.apache.stratos.load.balancer.endpoint" immediate="true"
+ * @scr.reference name="configuration.context.service"
+ * interface="org.wso2.carbon.utils.ConfigurationContextService" cardinality="1..1"
+ * policy="dynamic" bind="setConfigurationContextService" unbind="unsetConfigurationContextService"
+ * @scr.reference name="synapse.config.service"
+ * interface="org.wso2.carbon.mediation.initializer.services.SynapseConfigurationService"
+ * cardinality="1..1" policy="dynamic" bind="setSynapseConfigurationService"
+ * unbind="unsetSynapseConfigurationService"
+ * @scr.reference name="synapse.env.service"
+ * interface="org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService"
+ * cardinality="1..n" policy="dynamic" bind="setSynapseEnvironmentService"
+ * unbind="unsetSynapseEnvironmentService"
+ * @scr.reference name="registry.service"
+ * interface="org.wso2.carbon.registry.core.service.RegistryService"
+ * cardinality="1..1" policy="dynamic"
+ * bind="setRegistryService" unbind="unsetRegistryService"
+ * @scr.reference name="dependency.mgt.service"
+ * interface="org.wso2.carbon.mediation.dependency.mgt.services.DependencyManagementService"
+ * cardinality="0..1" policy="dynamic"
+ * bind="setDependencyManager" unbind="unsetDependencyManager"
+ * @scr.reference name="synapse.registrations.service"
+ * interface="org.wso2.carbon.mediation.initializer.services.SynapseRegistrationsService"
+ * cardinality="1..n" policy="dynamic" bind="setSynapseRegistrationsService"
+ * unbind="unsetSynapseRegistrationsService"
+ * @scr.reference name="user.realmservice.default"
+ * interface="org.wso2.carbon.user.core.service.RealmService"
+ * cardinality="1..1" policy="dynamic" bind="setRealmService"
+ * unbind="unsetRealmService"
+ * @scr.reference name="org.apache.stratos.lb.common"
+ * interface="org.apache.stratos.lb.common.service.LoadBalancerConfigurationService"
+ * cardinality="1..1" policy="dynamic" bind="setLoadBalancerConfigurationService"
+ * unbind="unsetLoadBalancerConfigurationService"
+ */
+@SuppressWarnings({"UnusedDeclaration", "JavaDoc"})
+public class LoadBalanceEndpointServiceComponent {
+
+ private static final Log log = LogFactory.getLog(LoadBalanceEndpointServiceComponent.class);
+
+ private boolean activated = false;
+
+ protected void activate(ComponentContext ctxt) {
+ try {
+ SynapseEnvironmentService synEnvService =
+ ConfigHolder.getInstance()
+ .getSynapseEnvironmentService(MultitenantConstants.SUPER_TENANT_ID);
+
+ registerDeployer(ConfigHolder.getInstance().getAxisConfiguration(),
+ synEnvService.getSynapseEnvironment());
+
+ if (ConfigHolder.getInstance().getConfigCtxt() != null) {
+ ConfigHolder
+ .getInstance()
+ .getConfigCtxt()
+ .setNonReplicableProperty(
+ CreateClusterDomainMessage.CLUSTER_DOMAIN_MANAGER,
+ new ClusterDomainManagerImpl());
+ log.debug("Setting property Cluster Domain MANAGER ... ");
+
+ }
+
+ SynapseEnvironment synapseEnv = synEnvService.getSynapseEnvironment();
+
+ /* Registering Tenant Aware Load Balance Endpoint */
+
+ // get the main sequence mediator
+ SequenceMediator mainSequence =
+ (SequenceMediator) synapseEnv.getSynapseConfiguration()
+ .getSequence("main");
+
+ boolean successfullyRegistered = false;
+
+ // iterate through its child mediators
+ for (Mediator child : mainSequence.getList()) {
+
+ // find the InMediator
+ if (child instanceof InMediator) {
+
+ for(Mediator inChild : ((InMediator)child).getList()){
+
+ // find the SendMediator
+ if (inChild instanceof SendMediator) {
+
+ SendMediator sendMediator = (SendMediator) inChild;
+
+ /* add Tenant Aware LB endpoint */
+
+ TenantAwareLoadBalanceEndpoint tenantAwareEp = new TenantAwareLoadBalanceEndpoint();
+
+ tenantAwareEp.init(synapseEnv);
+
+ sendMediator.setEndpoint(tenantAwareEp);
+
+ successfullyRegistered = true;
+
+ if (log.isDebugEnabled()) {
+ log.debug("Added Tenant Aware Endpoint: " +
+ sendMediator.getEndpoint().getName() + "" +
+ " to Send Mediator.");
+ }
+ }
+ }
+ }
+ }
+
+ if(!successfullyRegistered){
+ String msg = "Failed to register Tenant Aware Load Balance Endpoint in Send Mediator.";
+ log.fatal(msg);
+ throw new TenantAwareLoadBalanceEndpointException(msg);
+ }
+
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint Admin bundle is activated ");
+ }
+
+ if (ConfigHolder.getInstance().getLbConfig().getLoadBalancerConfig().getMbServerUrl() != null) {
+
+ // start consumer
+ // initialize TopologyBuilder Consumer
+ Thread topologyConsumer =
+ new Thread(new TopologySyncher(ConfigHolder.getInstance().getSharedTopologyDiffQueue()));
+ // start consumer
+ topologyConsumer.start();
+
+ TopologySubscriber.subscribe(TopologyConstants.TOPIC_NAME);
+
+ }
+ activated = true;
+ } catch (Throwable e) {
+ log.error("Failed to activate Endpoint Admin bundle ", e);
+ }
+ }
+
+ protected void deactivate(ComponentContext context) {
+ try {
+ Set<Map.Entry<Integer, SynapseEnvironmentService>> entrySet =
+ ConfigHolder.getInstance().getSynapseEnvironmentServices().entrySet();
+ for (Map.Entry<Integer, SynapseEnvironmentService> entry : entrySet) {
+ unregisterDeployer(
+ entry.getValue().getConfigurationContext().getAxisConfiguration(),
+ entry.getValue().getSynapseEnvironment());
+ }
+ } catch (Exception e) {
+ log.warn("Couldn't remove the EndpointDeployer");
+ }
+ }
+
+ /**
+ * Un-registers the Endpoint deployer.
+ *
+ * @param axisConfig AxisConfiguration to which this deployer belongs
+ * @param synapseEnvironment SynapseEnvironment to which this deployer belongs
+ */
+ private void unregisterDeployer(AxisConfiguration axisConfig, SynapseEnvironment synapseEnvironment)
+ throws TenantAwareLoadBalanceEndpointException {
+ if (axisConfig != null) {
+ DeploymentEngine deploymentEngine = (DeploymentEngine) axisConfig.getConfigurator();
+ String synapseConfigPath = ServiceBusUtils.getSynapseConfigAbsPath(
+ synapseEnvironment.getServerContextInformation());
+ String endpointDirPath = synapseConfigPath
+ + File.separator + MultiXMLConfigurationBuilder.ENDPOINTS_DIR;
+ deploymentEngine.removeDeployer(
+ endpointDirPath, ServiceBusConstants.ARTIFACT_EXTENSION);
+ }
+ }
+
+ /**
+ * Registers the Endpoint deployer.
+ *
+ * @param axisConfig AxisConfiguration to which this deployer belongs
+ * @param synapseEnvironment SynapseEnvironment to which this deployer belongs
+ */
+ private void registerDeployer(AxisConfiguration axisConfig, SynapseEnvironment synapseEnvironment)
+ throws TenantAwareLoadBalanceEndpointException {
+ SynapseConfiguration synCfg = synapseEnvironment.getSynapseConfiguration();
+ DeploymentEngine deploymentEngine = (DeploymentEngine) axisConfig.getConfigurator();
+ SynapseArtifactDeploymentStore deploymentStore = synCfg.getArtifactDeploymentStore();
+
+ String synapseConfigPath = ServiceBusUtils.getSynapseConfigAbsPath(
+ synapseEnvironment.getServerContextInformation());
+ String endpointDirPath = synapseConfigPath
+ + File.separator + MultiXMLConfigurationBuilder.ENDPOINTS_DIR;
+
+ for (Endpoint ep : synCfg.getDefinedEndpoints().values()) {
+ if (ep.getFileName() != null) {
+ deploymentStore.addRestoredArtifact(
+ endpointDirPath + File.separator + ep.getFileName());
+ }
+ }
+ deploymentEngine.addDeployer(
+ new EndpointDeployer(), endpointDirPath, ServiceBusConstants.ARTIFACT_EXTENSION);
+ }
+
+ protected void setConfigurationContextService(ConfigurationContextService cfgCtxService) {
+ ConfigHolder.getInstance().setAxisConfiguration(
+ cfgCtxService.getServerConfigContext().getAxisConfiguration());
+ ConfigHolder.getInstance().setConfigCtxt(cfgCtxService.getServerConfigContext());
+ }
+
+ protected void unsetConfigurationContextService(ConfigurationContextService cfgCtxService) {
+ ConfigHolder.getInstance().setAxisConfiguration(null);
+ ConfigHolder.getInstance().setConfigCtxt(null);
+ }
+
+ protected void setSynapseConfigurationService(
+ SynapseConfigurationService synapseConfigurationService) {
+
+ ConfigHolder.getInstance().setSynapseConfiguration(
+ synapseConfigurationService.getSynapseConfiguration());
+ }
+
+ protected void unsetSynapseConfigurationService(
+ SynapseConfigurationService synapseConfigurationService) {
+
+ ConfigHolder.getInstance().setSynapseConfiguration(null);
+ }
+
+ /**
+ * Here we receive an event about the creation of a SynapseEnvironment. If this is
+ * SuperTenant we have to wait until all the other constraints are met and actual
+ * initialization is done in the activate method. Otherwise we have to do the activation here.
+ *
+ * @param synapseEnvironmentService SynapseEnvironmentService which contains information
+ * about the new Synapse Instance
+ */
+ protected void setSynapseEnvironmentService(
+ SynapseEnvironmentService synapseEnvironmentService) {
+ boolean alreadyCreated = ConfigHolder.getInstance().getSynapseEnvironmentServices().
+ containsKey(synapseEnvironmentService.getTenantId());
+
+ ConfigHolder.getInstance().addSynapseEnvironmentService(
+ synapseEnvironmentService.getTenantId(),
+ synapseEnvironmentService);
+ if (activated) {
+ if (!alreadyCreated) {
+ try {
+ registerDeployer(synapseEnvironmentService.getConfigurationContext().getAxisConfiguration(),
+ synapseEnvironmentService.getSynapseEnvironment());
+ if (log.isDebugEnabled()) {
+ log.debug("Endpoint Admin bundle is activated ");
+ }
+ } catch (Throwable e) {
+ log.error("Failed to activate Endpoint Admin bundle ", e);
+ }
+ }
+ }
+ }
+
+ /**
+ * Here we receive an event about Destroying a SynapseEnvironment. This can be the super tenant
+ * destruction or a tenant destruction.
+ *
+ * @param synapseEnvironmentService synapseEnvironment
+ */
+ protected void unsetSynapseEnvironmentService(
+ SynapseEnvironmentService synapseEnvironmentService) {
+ ConfigHolder.getInstance().removeSynapseEnvironmentService(
+ synapseEnvironmentService.getTenantId());
+ }
+
+ protected void setRegistryService(RegistryService regService) {
+ if (log.isDebugEnabled()) {
+ log.debug("RegistryService bound to the endpoint component");
+ }
+ try {
+ ConfigHolder.getInstance().setConfigRegistry(regService.getConfigSystemRegistry());
+ ConfigHolder.getInstance().setGovernanceRegistry(regService.getGovernanceSystemRegistry());
+ } catch (RegistryException e) {
+ log.error("Couldn't retrieve the registry from the registry service");
+ }
+ }
+
+ protected void unsetRegistryService(RegistryService regService) {
+ if (log.isDebugEnabled()) {
+ log.debug("RegistryService unbound from the endpoint component");
+ }
+ ConfigHolder.getInstance().setConfigRegistry(null);
+ }
+
+ protected void setDependencyManager(DependencyManagementService dependencyMgr) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dependency management service bound to the endpoint component");
+ }
+ ConfigHolder.getInstance().setDependencyManager(dependencyMgr);
+ }
+
+ protected void unsetDependencyManager(DependencyManagementService dependencyMgr) {
+ if (log.isDebugEnabled()) {
+ log.debug("Dependency management service unbound from the endpoint component");
+ }
+ ConfigHolder.getInstance().setDependencyManager(null);
+ }
+
+ protected void setSynapseRegistrationsService(
+ SynapseRegistrationsService synapseRegistrationsService) {
+
+ }
+
+ protected void unsetSynapseRegistrationsService(
+ SynapseRegistrationsService synapseRegistrationsService) {
+ int tenantId = synapseRegistrationsService.getTenantId();
+ if (ConfigHolder.getInstance().getSynapseEnvironmentServices().containsKey(tenantId)) {
+ SynapseEnvironment env = ConfigHolder.getInstance().
+ getSynapseEnvironmentService(tenantId).getSynapseEnvironment();
+
+ ConfigHolder.getInstance().removeSynapseEnvironmentService(
+ synapseRegistrationsService.getTenantId());
+
+ AxisConfiguration axisConfig = synapseRegistrationsService.getConfigurationContext().
+ getAxisConfiguration();
+ if (axisConfig != null) {
+ try {
+ unregisterDeployer(axisConfig, env);
+ } catch (Exception e) {
+ log.warn("Couldn't remove the EndpointDeployer");
+ }
+ }
+ }
+ }
+
+ protected void setRealmService(RealmService realmService) {
+ ConfigHolder.getInstance().setRealmService(realmService);
+ }
+
+ protected void unsetRealmService(RealmService realmService) {
+ ConfigHolder.getInstance().setRealmService(null);
+ }
+
+ protected void setLoadBalancerConfigurationService(LoadBalancerConfigurationService lbConfigSer){
+ ConfigHolder.getInstance().setLbConfigService(lbConfigSer);
+ }
+
+ protected void unsetLoadBalancerConfigurationService(LoadBalancerConfigurationService lbConfigSer){
+ ConfigHolder.getInstance().setLbConfigService(null);
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java
new file mode 100644
index 0000000..b76495b
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/internal/RegistryManager.java
@@ -0,0 +1,51 @@
+/*
+ * Copyright WSO2 Inc.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.stratos.lb.endpoint.internal;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.stratos.lb.common.util.DomainMapping;
+import org.wso2.carbon.registry.core.Resource;
+import org.wso2.carbon.registry.core.exceptions.RegistryException;
+import org.wso2.carbon.registry.core.session.UserRegistry;
+
+public class RegistryManager {
+ UserRegistry governanceRegistry = ConfigHolder.getInstance().getGovernanceRegistry();
+ private static final Log log = LogFactory.getLog(RegistryManager.class);
+ /**
+ *
+ */
+ private Resource resource = null;
+ public static final String HOST_INFO = "hostinfo/";
+ public static final String ACTUAL_HOST = "actual.host";
+
+ public DomainMapping getMapping(String hostName) {
+ DomainMapping domainMapping;
+ try {
+ if (governanceRegistry.resourceExists(HOST_INFO + hostName)) {
+ resource = governanceRegistry.get(HOST_INFO + hostName);
+ domainMapping = new DomainMapping(hostName);
+ domainMapping.setActualHost(resource.getProperty(ACTUAL_HOST));
+ return domainMapping;
+ }
+ } catch (RegistryException e) {
+ log.info("Error while getting registry resource");
+ throw new RuntimeException(e);
+ }
+ return null;
+ }
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java
new file mode 100644
index 0000000..4e7783e
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopicHealthChecker.java
@@ -0,0 +1,67 @@
+/*
+* Copyright (c) 2005-2011, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+*
+* WSO2 Inc. licenses this file to you under the Apache License,
+* Version 2.0 (the "License"); you may not use this file except
+* in compliance with the License.
+* You may obtain a copy of the License at
+*
+* http://www.apache.org/licenses/LICENSE-2.0
+*
+* Unless required by applicable law or agreed to in writing,
+* software distributed under the License is distributed on an
+* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+* KIND, either express or implied. See the License for the
+* specific language governing permissions and limitations
+* under the License.
+*/
+package org.apache.stratos.lb.endpoint.subscriber;
+
+import javax.jms.TopicSubscriber;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * This health checker runs forever, and is responsible for re-establishing a connection
+ * between ELB and CC.
+ */
+public class TopicHealthChecker implements Runnable{
+
+ private static final Log log = LogFactory.getLog(TopicHealthChecker.class);
+ private String topicName;
+ private TopicSubscriber subscriber;
+
+ public TopicHealthChecker(String topicName, TopicSubscriber subscriber) {
+ this.topicName = topicName;
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void run() {
+ log.info("Topic Health Checker is running... ");
+
+ while (true) {
+ try {
+ subscriber.getTopic();
+
+ // health checker runs in every 30s
+ Thread.sleep(30000);
+
+ } catch (Exception e) {
+ // implies connection is not established
+ // sleep for 5s and retry
+ try {
+ log.info("Health checker failed and will retry to establish a connection after a 5s.");
+ Thread.sleep(5000);
+ break;
+ } catch (InterruptedException ignore) {
+ }
+ }
+
+ }
+
+ TopologySubscriber.subscribe(topicName);
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java
new file mode 100644
index 0000000..186aad7
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologyListener.java
@@ -0,0 +1,29 @@
+package org.apache.stratos.lb.endpoint.subscriber;
+
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.TextMessage;
+
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class TopologyListener implements MessageListener {
+
+ private static final Log log = LogFactory.getLog(TopologyListener.class);
+
+ @Override
+ public void onMessage(Message message) {
+ TextMessage receivedMessage = (TextMessage) message;
+ try {
+
+ ConfigHolder.getInstance().getSharedTopologyDiffQueue().add(receivedMessage.getText());
+
+ } catch (JMSException e) {
+ log.error(e.getMessage(), e);
+ }
+
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologySubscriber.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologySubscriber.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologySubscriber.java
new file mode 100644
index 0000000..129d271
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/subscriber/TopologySubscriber.java
@@ -0,0 +1,77 @@
+package org.apache.stratos.lb.endpoint.subscriber;
+
+import java.util.Properties;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.lb.endpoint.util.ConfigHolder;
+import org.apache.stratos.lb.endpoint.util.TopologyConstants;
+
+public class TopologySubscriber {
+
+ private static final Log log = LogFactory.getLog(TopologySubscriber.class);
+
+ public static void subscribe(String topicName) {
+ Properties initialContextProperties = new Properties();
+ TopicSubscriber topicSubscriber = null;
+ TopicSession topicSession = null;
+ TopicConnection topicConnection = null;
+ InitialContext initialContext = null;
+
+ initialContextProperties.put("java.naming.factory.initial",
+ "org.wso2.andes.jndi.PropertiesFileInitialContextFactory");
+
+ String mbServerUrl = null;
+ if (ConfigHolder.getInstance().getLbConfig() != null) {
+ mbServerUrl = ConfigHolder.getInstance().getLbConfig().getLoadBalancerConfig().getMbServerUrl();
+ }
+ String connectionString =
+ "amqp://admin:admin@clientID/carbon?brokerlist='tcp://" +
+ (mbServerUrl == null ? TopologyConstants.DEFAULT_MB_SERVER_URL : mbServerUrl) + "'&reconnect='true'";
+ initialContextProperties.put("connectionfactory.qpidConnectionfactory", connectionString);
+
+ try {
+ initialContext = new InitialContext(initialContextProperties);
+ TopicConnectionFactory topicConnectionFactory =
+ (TopicConnectionFactory) initialContext.lookup("qpidConnectionfactory");
+ topicConnection = topicConnectionFactory.createTopicConnection();
+ topicConnection.start();
+ topicSession =
+ topicConnection.createTopicSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Topic topic = topicSession.createTopic(topicName);
+ topicSubscriber =
+ topicSession.createSubscriber(topic);
+
+ topicSubscriber.setMessageListener(new TopologyListener());
+
+ } catch (Exception e) {
+ log.error(e.getMessage(), e);
+
+ try {
+ if (topicSubscriber != null) {
+ topicSubscriber.close();
+ }
+
+ if (topicSession != null) {
+ topicSession.close();
+ }
+
+ if (topicConnection != null) {
+ topicConnection.close();
+ }
+ } catch (JMSException e1) {
+ // ignore
+ }
+
+ }
+ finally {
+ // start the health checker
+ Thread healthChecker = new Thread(new TopicHealthChecker(topicName, topicSubscriber));
+ healthChecker.start();
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/ConfigHolder.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/ConfigHolder.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/ConfigHolder.java
new file mode 100644
index 0000000..34551be
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/ConfigHolder.java
@@ -0,0 +1,189 @@
+/**
+ * Copyright (c) 2009, WSO2 Inc. (http://www.wso2.org) All Rights Reserved.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.stratos.lb.endpoint.util;
+
+import org.apache.axis2.context.ConfigurationContext;
+import org.apache.axis2.engine.AxisConfiguration;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.config.SynapseConfiguration;
+import org.apache.stratos.lb.common.conf.LoadBalancerConfiguration;
+import org.apache.stratos.lb.common.service.LoadBalancerConfigurationService;
+import org.apache.stratos.lb.endpoint.TenantAwareLoadBalanceEndpointException;
+import org.apache.stratos.lb.endpoint.TenantLoadBalanceMembershipHandler;
+import org.wso2.carbon.mediation.dependency.mgt.services.DependencyManagementService;
+import org.wso2.carbon.mediation.initializer.services.SynapseEnvironmentService;
+import org.wso2.carbon.registry.core.session.UserRegistry;
+import org.wso2.carbon.user.core.service.RealmService;
+
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+/**
+ *
+ */
+public class ConfigHolder {
+
+ private static ConfigHolder instance;
+ private static final Log log = LogFactory.getLog(ConfigHolder.class);
+
+ private SynapseConfiguration synapseConfiguration;
+ private ConfigurationContext configCtxt;
+ private AxisConfiguration axisConfiguration;
+ private UserRegistry configRegistry;
+ private UserRegistry governanceRegistry;
+ private DependencyManagementService dependencyManager;
+ private TenantLoadBalanceMembershipHandler tenantMembershipHandler;
+ private LoadBalancerConfigurationService lbConfigService;
+ private BlockingQueue<String> sharedTopologyQueue = new LinkedBlockingQueue<String>();
+ private String previousMsg;
+
+
+ private Map<Integer, SynapseEnvironmentService> synapseEnvironmentServices =
+ new HashMap<Integer, SynapseEnvironmentService>();
+
+ public RealmService getRealmService() {
+ return realmService;
+ }
+
+ public void setRealmService(RealmService realmService) {
+ this.realmService = realmService;
+ }
+
+ private RealmService realmService;
+
+ private ConfigHolder() {
+ }
+
+ public static ConfigHolder getInstance() {
+ if (instance == null) {
+ instance = new ConfigHolder();
+ }
+ return instance;
+ }
+
+ public SynapseConfiguration getSynapseConfiguration() throws TenantAwareLoadBalanceEndpointException{
+ assertNull("SynapseConfiguration", synapseConfiguration);
+ return synapseConfiguration;
+ }
+
+ public void setSynapseConfiguration(SynapseConfiguration synapseConfiguration) {
+ this.synapseConfiguration = synapseConfiguration;
+ }
+
+ public AxisConfiguration getAxisConfiguration() throws TenantAwareLoadBalanceEndpointException {
+ assertNull("AxisConfiguration", axisConfiguration);
+ return axisConfiguration;
+ }
+
+ public void setAxisConfiguration(AxisConfiguration axisConfiguration) {
+ this.axisConfiguration = axisConfiguration;
+ }
+
+ public UserRegistry getConfigRegistry() throws TenantAwareLoadBalanceEndpointException {
+ assertNull("Registry", configRegistry);
+ return configRegistry;
+ }
+
+ public void setConfigRegistry(UserRegistry configRegistry) {
+ this.configRegistry = configRegistry;
+ }
+
+ public DependencyManagementService getDependencyManager() {
+ return dependencyManager;
+ }
+
+ public void setDependencyManager(DependencyManagementService dependencyManager) {
+ this.dependencyManager = dependencyManager;
+ }
+
+ private void assertNull(String name, Object object) throws TenantAwareLoadBalanceEndpointException {
+ if (object == null) {
+ String message = name + " reference in the proxy admin config holder is null";
+ log.error(message);
+ throw new TenantAwareLoadBalanceEndpointException(message);
+ }
+ }
+
+ public UserRegistry getGovernanceRegistry() {
+ return governanceRegistry;
+ }
+
+ public void setGovernanceRegistry(UserRegistry governanceRegistry) {
+ this.governanceRegistry = governanceRegistry;
+ }
+
+ public SynapseEnvironmentService getSynapseEnvironmentService(int id) {
+ return synapseEnvironmentServices.get(id);
+ }
+
+ public void addSynapseEnvironmentService(int id,
+ SynapseEnvironmentService synapseEnvironmentService) {
+ synapseEnvironmentServices.put(id, synapseEnvironmentService);
+ }
+
+ public void removeSynapseEnvironmentService(int id) {
+ synapseEnvironmentServices.remove(id);
+ }
+
+ public Map<Integer, SynapseEnvironmentService> getSynapseEnvironmentServices() {
+ return synapseEnvironmentServices;
+ }
+
+ public void setTenantLoadBalanceMembershipHandler(TenantLoadBalanceMembershipHandler handler) {
+ tenantMembershipHandler = handler;
+ }
+
+ public TenantLoadBalanceMembershipHandler getTenantLoadBalanceMembershipHandler() {
+ return tenantMembershipHandler;
+ }
+
+ public ConfigurationContext getConfigCtxt() {
+ return configCtxt;
+ }
+
+ public void setConfigCtxt(ConfigurationContext configCtxt) {
+ this.configCtxt = configCtxt;
+ }
+
+ public void setLbConfigService(LoadBalancerConfigurationService lbConfigSer) {
+ this.lbConfigService = lbConfigSer;
+ }
+
+ public LoadBalancerConfiguration getLbConfig() {
+ return (LoadBalancerConfiguration) lbConfigService.getLoadBalancerConfig();
+ }
+
+ public BlockingQueue<String> getSharedTopologyDiffQueue() {
+ return sharedTopologyQueue;
+ }
+
+ public void setSharedTopologyDiffQueue(BlockingQueue<String> sharedTopologyDiffQueue) {
+ this.sharedTopologyQueue = sharedTopologyDiffQueue;
+ }
+
+ public String getPreviousMsg() {
+ return previousMsg;
+ }
+
+ public void setPreviousMsg(String previousMsg) {
+ this.previousMsg = previousMsg;
+ }
+
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/753d34ac/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/TopologyConstants.java
----------------------------------------------------------------------
diff --git a/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/TopologyConstants.java b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/TopologyConstants.java
new file mode 100644
index 0000000..24e072f
--- /dev/null
+++ b/components/load-balancer/lb-endpoint/org.apache.stratos.lb.endpoint/4.1.3/src/main/java/org/apache/stratos/lb/endpoint/util/TopologyConstants.java
@@ -0,0 +1,13 @@
+package org.apache.stratos.lb.endpoint.util;
+
+public class TopologyConstants {
+
+ public static final String TOPIC_NAME = "cloud-controller-topology";
+ public static final String MB_SERVER_URL = "mb.server.ip";
+ public static final String DEFAULT_MB_SERVER_URL = "localhost:5672";
+
+ public static final String TOPOLOGY_SYNC_CRON = "1 * * * * ? *";
+ public static final String TOPOLOGY_SYNC_TASK_NAME = "TopologySubscriberTaskOfADC";
+ public static final String TOPOLOGY_SYNC_TASK_TYPE = "TOPOLOGY_SUBSCRIBER_TASK";
+
+}