You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by ma...@apache.org on 2014/07/08 09:41:55 UTC
[05/14] Clustering changes for stratos
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 3d71a0a..884d5f6 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -28,14 +28,17 @@ import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
import org.apache.stratos.load.balancer.conf.domain.MemberIpType;
import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable;
import org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable;
import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor;
import org.apache.stratos.load.balancer.util.Constants;
import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Port;
import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
@@ -59,7 +62,7 @@ import java.util.regex.Pattern;
public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints.LoadbalanceEndpoint implements Serializable {
- private static final String PORT_MAPPING_PREFIX = "port.mapping.";
+ private static final long serialVersionUID = -6612900240087164008L;
/* Request delegator identifies the next member */
private RequestDelegator requestDelegator;
@@ -75,6 +78,8 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
/* Sessions time out interval */
private long sessionTimeout = -1;
+
+ private final static Pattern LAST_INT_PATTERN = Pattern.compile("[^0-9]+([0-9]+)$");
@Override
public void init(SynapseEnvironment synapseEnvironment) {
@@ -118,6 +123,8 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
TenantAwareLoadBalanceFaultHandler faultHandler = new TenantAwareLoadBalanceFaultHandler();
if (sessionInformation != null && currentMember != null) {
+ // Update axis2 member ports
+ updateAxis2MemberPorts(synCtx, currentMember);
// Send request to the member with the existing session
sessionInformation.updateExpiryTime();
sendToApplicationMember(synCtx, currentMember, faultHandler, false);
@@ -150,8 +157,17 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
private void setupLoadBalancerContextProperties(MessageContext synCtx, org.apache.axis2.clustering.Member currentMember) {
String lbHostName = extractTargetHost(synCtx);
org.apache.axis2.context.MessageContext axis2MsgCtx = ((Axis2MessageContext) synCtx).getAxis2MessageContext();
- TransportInDescription httpTransportIn = axis2MsgCtx.getConfigurationContext().getAxisConfiguration().getTransportIn("http");
- TransportInDescription httpsTransportIn = axis2MsgCtx.getConfigurationContext().getAxisConfiguration().getTransportIn("https");
+
+ String httpTransportName = "http", httpsTransportName = "https";
+ String transportId = getTransportId(extractIncomingTransport(synCtx));
+
+ if (transportId != null) {
+ httpsTransportName = httpsTransportName.concat(transportId);
+ httpTransportName = httpTransportName.concat(transportId);
+ }
+
+ TransportInDescription httpTransportIn = axis2MsgCtx.getConfigurationContext().getAxisConfiguration().getTransportIn(httpTransportName);
+ TransportInDescription httpsTransportIn = axis2MsgCtx.getConfigurationContext().getAxisConfiguration().getTransportIn(httpsTransportName);
String lbHttpPort = (String) httpTransportIn.getParameter("port").getValue();
String lbHttpsPort = (String) httpsTransportIn.getParameter("port").getValue();
String clusterId = currentMember.getProperties().getProperty(Constants.CLUSTER_ID);
@@ -161,6 +177,16 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
synCtx.setProperty(Constants.LB_HTTPS_PORT, lbHttpsPort);
synCtx.setProperty(Constants.CLUSTER_ID, clusterId);
}
+
+ protected String getTransportId(String incomingTransportName) {
+ // pattern match and find the transport id.
+ Matcher matcher = LAST_INT_PATTERN.matcher(incomingTransportName);
+ if (matcher.find()) {
+ return matcher.group(1);
+ }
+
+ return null;
+ }
/**
@@ -212,112 +238,255 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
}
String url = extractUrl(synCtx);
int tenantId = scanUrlForTenantId(url);
- if (tenantExists(tenantId)) {
+ if(tenantId == -1) {
+ // If there is no tenant involves in the URL, Find next member from host name
+ member = requestDelegator.findNextMemberFromHostName(targetHost);
+ } else if (tenantExists(tenantId)) {
+ // Tenant found, find member from hostname and tenant id
member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId);
} else {
- // Multi-tenant cluster not found, try single tenant
- member = requestDelegator.findNextMemberFromHostName(targetHost);
+ // Tenant id not found in the subscription for the URL which has tenant domain.
+ throwSynapseException(synCtx, 403, String.format("You are unauthorized to access"));
}
} else {
- // Find next member from single tenant cluster map
- member = requestDelegator.findNextMemberFromHostName(targetHost);
+
}
if (member == null)
return null;
// Create Axi2 member object
+ org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(
+ getMemberIp(synCtx, member), -1);
+ axis2Member.setDomain(member.getClusterId());
+ axis2Member.setActive(member.isActive());
+ // Set cluster id and member id in member properties
+ axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId());
+ axis2Member.getProperties().setProperty(Constants.MEMBER_ID, member.getMemberId());
+ // Update axis2 member ports
+ updateAxis2MemberPorts(synCtx, axis2Member);
+ return axis2Member;
+ }
+
+ /**
+ * Update http/https port in axis2 member according to incoming request port.
+ *
+ * @param synCtx
+ * @param axis2Member
+ */
+ private void updateAxis2MemberPorts(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member) {
+ if(log.isDebugEnabled()) {
+ log.debug("Updating axis2 member port");
+ }
+
+ // Find mapping outgoing port for incoming port
+ int incomingPort = findIncomingPort(synCtx);
String transport = extractTransport(synCtx);
- Port transportPort = member.getPort(transport);
- if (transportPort == null) {
+ Port outgoingPort = findOutgoingPort(synCtx, axis2Member, transport, incomingPort);
+ if (outgoingPort == null) {
if (log.isErrorEnabled()) {
- log.error(String.format("Port not found for transport %s in member %s", transport, member.getMemberId()));
+ log.error(String.format("Could not find the port for proxy port %d in member %s", incomingPort,
+ axis2Member.getProperties().getProperty(Constants.MEMBER_ID)));
}
throwSynapseException(synCtx, 500, "Internal server error");
}
+ if (Constants.HTTP.equals(transport)) {
+ axis2Member.setHttpPort(outgoingPort.getValue());
+ } else if (Constants.HTTPS.equals(transport)) {
+ axis2Member.setHttpsPort(outgoingPort.getValue());
+ }
+ }
- int memberPort = transportPort.getValue();
- org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(getMemberIp(synCtx, member), memberPort);
- axis2Member.setDomain(member.getClusterId());
- Port httpPort = member.getPort("http");
- if (httpPort != null)
- axis2Member.setHttpPort(httpPort.getValue());
- Port httpsPort = member.getPort("https");
- if (httpsPort != null)
- axis2Member.setHttpsPort(httpsPort.getValue());
- axis2Member.setActive(member.isActive());
- // Set cluster id and partition id in message context
- axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId());
- return axis2Member;
+ /**
+ * Find incoming port from request URL.
+ *
+ * @param synCtx
+ * @return
+ * @throws MalformedURLException
+ */
+ private int findIncomingPort(MessageContext synCtx) {
+ org.apache.axis2.context.MessageContext msgCtx =
+ ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+ try {
+ String servicePrefix = (String) msgCtx.getProperty(Constants.AXIS2_MSG_CTX_SERVICE_PREFIX);
+ if (servicePrefix == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("%s property not found in axis2 message context", Constants.AXIS2_MSG_CTX_SERVICE_PREFIX));
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ URL servicePrefixUrl = new URL(servicePrefix);
+ return servicePrefixUrl.getPort();
+ } catch (MalformedURLException e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not find incoming request port");
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ return -1;
}
+ /**
+ * Find mapping outgoing port for incoming port.
+ *
+ * @param synCtx
+ * @param axis2Member
+ * @param transport
+ * @param incomingPort
+ * @return
+ * @throws MalformedURLException
+ */
+ private Port findOutgoingPort(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member, String transport, int incomingPort) {
+ Member member = findMemberFromAxis2Member(synCtx, axis2Member);
+ if ((member != null) && (member.getPorts() != null)) {
+ Port outgoingPort = member.getPort(incomingPort);
+ if (outgoingPort != null) {
+ if (!transport.startsWith(outgoingPort.getProtocol())) {
+ if (log.isErrorEnabled()) {
+ String message = String.format("Transport %s is not valid for port %d", transport, incomingPort);
+ if (log.isErrorEnabled()) {
+ log.error(message);
+ }
+ throwSynapseException(synCtx, 500, message);
+ }
+ }
+ if (log.isDebugEnabled()) {
+ log.debug("Outgoing request port found: " + outgoingPort.getValue());
+ }
+ return outgoingPort;
+ }
+ }
+ return null;
+ }
+
+ /**
+ * Find topology member from axis2 member using cluster id and member id defined in axis2 member properties.
+ *
+ * @param synCtx
+ * @param axis2Member
+ * @return
+ */
+ private Member findMemberFromAxis2Member(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member) {
+ String clusterId = axis2Member.getProperties().getProperty(Constants.CLUSTER_ID);
+ String memberId = axis2Member.getProperties().getProperty(Constants.MEMBER_ID);
+ if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(memberId)) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Could not find cluster id and/or member id properties in axis2 member: [cluster-id] %s " +
+ "[member-id] %s", clusterId, memberId));
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ try {
+ TopologyManager.acquireReadLock();
+ Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterId);
+ if (cluster == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Cluster not found in load balancer context: [cluster-id] %s ", clusterId));
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Member not found in load balancer context: [cluster-id] %s [member-id] %s", clusterId, memberId));
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ return member;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ /**
+ * Get members private or public ip according to load balancer configuration.
+ *
+ * @param synCtx
+ * @param member
+ * @return
+ */
private String getMemberIp(MessageContext synCtx, Member member) {
- if(LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) {
- if(LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) {
+ if (LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) {
+ if (LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) {
// Return member's public IP address
- if(StringUtils.isBlank(member.getMemberPublicIp())) {
+ if (StringUtils.isBlank(member.getMemberPublicIp())) {
if (log.isErrorEnabled()) {
log.error(String.format("Member public IP address not found: [member] %s", member.getMemberId()));
}
throwSynapseException(synCtx, 500, "Internal server error");
}
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("Using member public IP address: [member] %s [ip] %s", member.getMemberId(), member.getMemberPublicIp()));
}
return member.getMemberPublicIp();
}
}
// Return member's private IP address
- if(StringUtils.isBlank(member.getMemberIp())) {
+ if (StringUtils.isBlank(member.getMemberIp())) {
if (log.isErrorEnabled()) {
log.error(String.format("Member IP address not found: [member] %s", member.getMemberId()));
}
throwSynapseException(synCtx, 500, "Internal server error");
}
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("Using member IP address: [member] %s [ip] %s", member.getMemberId(), member.getMemberIp()));
}
return member.getMemberIp();
}
+ /**
+ * Extract incoming request URL from message context.
+ *
+ * @param synCtx
+ * @return
+ */
private String extractUrl(MessageContext synCtx) {
Axis2MessageContext axis2smc = (Axis2MessageContext) synCtx;
org.apache.axis2.context.MessageContext axis2MessageCtx = axis2smc.getAxis2MessageContext();
return (String) axis2MessageCtx.getProperty(Constants.AXIS2_MSG_CTX_TRANSPORT_IN_URL);
}
+ /**
+ * Scan given URL for tenant id.
+ *
+ * @param url
+ * @return
+ */
private int scanUrlForTenantId(String url) {
int tenantId = -1;
- String regex = LoadBalancerConfiguration.getInstance().getTenantIdentifierRegex();
- if (log.isDebugEnabled()) {
- log.debug(String.format("Request URL: %s ", url));
- log.debug(String.format("Tenant identifier regex: %s ", regex));
- }
- Pattern pattern = Pattern.compile(regex);
- Matcher matcher = pattern.matcher(url);
- if (matcher.find()) {
- if (LoadBalancerConfiguration.getInstance().getTenantIdentifier() == TenantIdentifier.TenantId) {
- if (log.isDebugEnabled()) {
- log.debug("Identifying tenant using tenant id...");
- }
- tenantId = Integer.parseInt(matcher.group(1));
- if (log.isDebugEnabled()) {
- log.debug(String.format("Tenant identifier found: [tenant-id] %d", tenantId));
- }
- } else if (LoadBalancerConfiguration.getInstance().getTenantIdentifier() == TenantIdentifier.TenantDomain) {
- if (log.isDebugEnabled()) {
- log.debug("Identifying tenant using tenant domain...");
+ List<String> regexList = LoadBalancerConfiguration.getInstance().getTenantIdentifierRegexList();
+ for(String regex : regexList) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Request URL: %s ", url));
+ log.debug(String.format("Tenant identifier regex: %s ", regex));
+ }
+ Pattern pattern = Pattern.compile(regex);
+ Matcher matcher = pattern.matcher(url);
+ if (matcher.find()) {
+ if (LoadBalancerConfiguration.getInstance().getTenantIdentifier() == TenantIdentifier.TenantId) {
+ if (log.isDebugEnabled()) {
+ log.debug("Identifying tenant using tenant id...");
+ }
+ tenantId = Integer.parseInt(matcher.group(1));
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant identifier found: [tenant-id] %d", tenantId));
+ }
+ } else if (LoadBalancerConfiguration.getInstance().getTenantIdentifier() == TenantIdentifier.TenantDomain) {
+ if (log.isDebugEnabled()) {
+ log.debug("Identifying tenant using tenant domain...");
+ }
+ String tenantDomain = matcher.group(1);
+ tenantId = findTenantIdFromTenantDomain(tenantDomain);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Tenant identifier found: [tenant-domain] %s [tenant-id] %d", tenantDomain, tenantId));
+ }
}
- String tenantDomain = matcher.group(1);
- tenantId = findTenantIdFromTenantDomain(tenantDomain);
+ break;
+ } else {
if (log.isDebugEnabled()) {
- log.debug(String.format("Tenant identifier found: [tenant-domain] %s [tenant-id] %d", tenantDomain, tenantId));
+ log.debug("Tenant identifier not found in URL");
}
}
- } else {
- if (log.isDebugEnabled()) {
- log.debug("Tenant identifier not found in URL");
- }
}
return tenantId;
}
@@ -386,6 +555,11 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
org.apache.axis2.context.MessageContext axis2MessageContext = ((Axis2MessageContext) synCtx).getAxis2MessageContext();
return axis2MessageContext.getTransportIn().getName();
}
+
+ private String extractIncomingTransport(MessageContext synCtx) {
+ org.apache.axis2.context.MessageContext axis2MessageContext = ((Axis2MessageContext) synCtx).getAxis2MessageContext();
+ return axis2MessageContext.getIncomingTransportName();
+ }
/**
* @param to get an endpoint to send the information
@@ -413,53 +587,71 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
return endpoint;
}
- private EndpointReference getEndpointReferenceAfterURLRewrite(org.apache.axis2.clustering.Member currentMember,
- String transport,
- String address,
- int incomingPort) {
+ private EndpointReference getEndpointReferenceAfterURLRewrite(MessageContext synCtx, org.apache.axis2.clustering.Member currentMember,
+ String transport) {
+ try {
+ if (transport.startsWith(Constants.HTTPS)) {
+ transport = Constants.HTTPS;
+ } else if (transport.startsWith(Constants.HTTP)) {
+ transport = Constants.HTTP;
+ } else {
+ String msg = "Cannot load balance for non-HTTP/S transport " + transport;
+ log.error(msg);
+ throwSynapseException(synCtx, 500, msg);
+ }
- if (transport.startsWith("https")) {
- transport = "https";
- } else if (transport.startsWith("http")) {
- transport = "http";
- } else {
- String msg = "Cannot load balance for non-HTTP/S transport " + transport;
- log.error(msg);
- throw new SynapseException(msg);
- }
- // URL Rewrite
- if (transport.startsWith("http") || transport.startsWith("https")) {
- if (address.startsWith("http://") || address.startsWith("https://")) {
+ String address = synCtx.getTo().getAddress();
+ if (address.startsWith(Constants.HTTP + "://") || address.startsWith(Constants.HTTPS + "://")) {
+ // Remove protocol, hostname and port found in address
try {
- String _address = address.indexOf("?") > 0 ? address.substring(address.indexOf("?"), address.length()) : "";
- address = new URL(address).getPath() + _address;
+ URL addressUrl = new URL(address);
+ address = addressUrl.getPath() + (StringUtils.isNotBlank(addressUrl.getQuery()) ?
+ "?" + addressUrl.getQuery() : "");
} catch (MalformedURLException e) {
- String msg = "URL " + address + " is malformed";
+ String msg = String.format("URL is malformed: %s", address);
log.error(msg, e);
throw new SynapseException(msg, e);
}
}
- int port;
- Properties memberProperties = currentMember.getProperties();
- String mappedPort = memberProperties.getProperty(PORT_MAPPING_PREFIX + incomingPort);
- if (mappedPort != null) {
- port = Integer.parseInt(mappedPort);
- } else if (transport.startsWith("https")) {
- port = currentMember.getHttpsPort();
- } else {
- port = currentMember.getHttpPort();
+ String hostName = extractTargetHost(synCtx);
+ if (LoadBalancerContext.getInstance().getHostNameAppContextMap().contains(hostName)) {
+ String appContext = LoadBalancerContext.getInstance().getHostNameAppContextMap().getAppContext(hostName);
+ if (StringUtils.isNotBlank(appContext)) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Domain mapping found with application context: [domain-name] %s [app-context] %s", hostName, appContext));
+ log.debug(String.format("Incoming request address: %s", address));
+ }
+ address = "/" + cleanURLPath(appContext) + "/" + cleanURLPath(address);
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Outgoing request address: %s", address));
+ }
+ }
}
- String remoteHost = memberProperties.getProperty("remoteHost");
- String hostName = (remoteHost == null) ? currentMember.getHostName() : remoteHost;
- return new EndpointReference(transport + "://" + hostName +
- ":" + port + address);
- } else {
- String msg = "Cannot load balance for non-HTTP/S transport " + transport;
- log.error(msg);
- throw new SynapseException(msg);
+ String memberHostName = currentMember.getHostName();
+ int memberPort = (transport.startsWith(Constants.HTTPS)) ? currentMember.getHttpsPort() : currentMember.getHttpPort();
+ return new EndpointReference(new URL(transport, memberHostName, memberPort, address).toString());
+
+ } catch (MalformedURLException e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not create endpoint reference", e);
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ return null;
+ }
+ }
+
+ private String cleanURLPath(String path) {
+ if (StringUtils.isNotBlank(path)) {
+ if (path.startsWith("/")) {
+ path = path.replaceFirst("/", "");
+ }
+ if (path.endsWith("/")) {
+ path = path.substring(0, path.length() - 2);
+ }
}
+ return path;
}
/*
@@ -507,9 +699,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
axis2MsgCtx.removeProperty(NhttpConstants.REST_URL_POSTFIX);
String transport = axis2MsgCtx.getTransportIn().getName();
- String address = synCtx.getTo().getAddress();
- int incomingPort = extractPort(synCtx, transport);
- EndpointReference to = getEndpointReferenceAfterURLRewrite(currentMember, transport, address, incomingPort);
+ EndpointReference to = getEndpointReferenceAfterURLRewrite(synCtx, currentMember, transport);
synCtx.setTo(to);
Endpoint endpoint = getEndpoint(to, currentMember, synCtx);
@@ -565,14 +755,13 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
private void incrementInFlightRequestCount(MessageContext messageContext) {
try {
String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID);
- if(StringUtils.isBlank(clusterId)) {
+ if (StringUtils.isBlank(clusterId)) {
throw new RuntimeException("Cluster id not found in message context");
}
FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestIncrementCallable(clusterId));
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
- }
- catch (Exception e) {
- if(log.isDebugEnabled()) {
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
log.debug("Could not increment in-flight request count", e);
}
}
@@ -581,14 +770,13 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
private void decrementInFlightRequestCount(MessageContext messageContext) {
try {
String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID);
- if(StringUtils.isBlank(clusterId)) {
+ if (StringUtils.isBlank(clusterId)) {
throw new RuntimeException("Cluster id not found in message context");
}
FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId));
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
- }
- catch (Exception e) {
- if(log.isDebugEnabled()) {
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
log.debug("Could not decrement in-flight request count", e);
}
}
@@ -661,7 +849,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
decrementInFlightRequestCount(synCtx);
if (isFailover()) {
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("Fail-over enabled, trying to send the message to the next available member");
}
@@ -670,7 +858,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
currentEp.destroy();
}
if (currentMember == null) {
- if(log.isErrorEnabled()) {
+ if (log.isErrorEnabled()) {
log.error("Current member is null, could not fail-over");
}
return;
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
index ff64d20..a29b980 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
@@ -22,12 +22,17 @@ package org.apache.stratos.load.balancer.util;
public class Constants {
public static final String CLUSTER_ID = "cluster_id";
+ public static final String MEMBER_ID = "member_id";
+
+ public static final String HTTP = "http";
+ public static final String HTTPS = "https";
+
+ public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL";
+ public static final String AXIS2_MSG_CTX_SERVICE_PREFIX = "SERVICE_PREFIX";
public static final String LB_HOST_NAME = "LB_HOST_NAME";
public static final String LB_HTTP_PORT = "LB_HTTP_PORT";
public static final String LB_HTTPS_PORT = "LB_HTTPS_PORT";
public static final String ALGORITHM_CONTEXT_CACHE = "algorithm.context.cache";
-
- public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL";
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
index 8256b80..6b5b97f 100755
--- a/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
+++ b/components/org.apache.stratos.load.balancer/src/test/java/org/apache/stratos/load/balancer/test/LoadBalancerConfigurationTest.java
@@ -18,18 +18,18 @@
*/
package org.apache.stratos.load.balancer.test;
-import java.io.File;
-import java.net.URL;
-
+import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
import org.apache.stratos.messaging.domain.topology.*;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.junit.Assert;
import org.junit.Test;
import org.junit.runner.RunWith;
import org.junit.runners.JUnit4;
import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
-import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
+import java.io.File;
+import java.net.URL;
/**
* Test sample load balancer configurations.
@@ -84,7 +84,7 @@ public class LoadBalancerConfigurationTest {
Assert.assertEquals(String.format("%s, network partition id is not valid", validationError), "network-partition-1", configuration.getNetworkPartitionId());
Assert.assertTrue(String.format("%s, multi-tenancy is not true", validationError), configuration.isMultiTenancyEnabled());
Assert.assertEquals(String.format("%s, tenant-identifier is not valid", validationError), TenantIdentifier.TenantDomain, configuration.getTenantIdentifier());
- Assert.assertEquals(String.format("%s, tenant-identifier-regex is not valid", validationError), "t/([^/]*)/", configuration.getTenantIdentifierRegex());
+ Assert.assertEquals(String.format("%s, tenant-identifier-regex is not valid", validationError), "t/([^/]*)/", configuration.getTenantIdentifierRegexList().get(0));
} finally {
LoadBalancerConfiguration.clear();
}
@@ -131,11 +131,11 @@ public class LoadBalancerConfigurationTest {
Assert.assertNotNull(String.format("%s, member not found: [member] %s", validationError, memberId), m1);
Assert.assertEquals(String.format("%s, member ip not valid", validationError), "10.0.0.10", m1.getMemberIp());
- String portName = "http";
- Port m1Http = m1.getPort(portName);
- Assert.assertNotNull(String.format("%s, port not found: [member] %s [port] %s", validationError, memberId, portName), m1Http);
- Assert.assertEquals(String.format("%s, port value not valid: [member] %s [port] %s", validationError, memberId, portName), 8080, m1Http.getValue());
- Assert.assertEquals(String.format("%s, port proxy not valid: [member] %s [port] %s", validationError, memberId, portName), 80, m1Http.getProxy());
+ int proxyPort = 80;
+ Port m1Http = m1.getPort(proxyPort);
+ Assert.assertNotNull(String.format("%s, port not found: [member] %s [proxy-port] %d", validationError, memberId, proxyPort), m1Http);
+ Assert.assertEquals(String.format("%s, port value not valid: [member] %s [proxy-port] %d", validationError, memberId, proxyPort), 8080, m1Http.getValue());
+ Assert.assertEquals(String.format("%s, port proxy not valid: [member] %s [proxy-port] %d", validationError, memberId, proxyPort), 80, m1Http.getProxy());
} finally {
TopologyManager.releaseReadLock();
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/behaviour/CartridgeMgtBehaviour.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/behaviour/CartridgeMgtBehaviour.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/behaviour/CartridgeMgtBehaviour.java
index 56f7e10..352c5b0 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/behaviour/CartridgeMgtBehaviour.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/behaviour/CartridgeMgtBehaviour.java
@@ -87,6 +87,7 @@ public abstract class CartridgeMgtBehaviour implements Serializable {
PayloadData payloadData = PayloadFactory.getPayloadDataInstance(cartridgeInfo.getProvider(),
cartridgeInfo.getType(), basicPayloadData);
+ boolean isDeploymentParam = false;
// get the payload parameters defined in the cartridge definition file for this cartridge type
if (cartridgeInfo.getProperties() != null && cartridgeInfo.getProperties().length != 0) {
@@ -97,11 +98,21 @@ public abstract class CartridgeMgtBehaviour implements Serializable {
if (property.getName()
.startsWith(CartridgeConstants.CUSTOM_PAYLOAD_PARAM_NAME_PREFIX)) {
String payloadParamName = property.getName();
- payloadData.add(payloadParamName.substring(payloadParamName.indexOf(".") + 1), property.getValue());
+ String payloadParamSubstring = payloadParamName.substring(payloadParamName.indexOf(".") + 1);
+ if("DEPLOYMENT".equals(payloadParamSubstring)) {
+ isDeploymentParam = true;
+ }
+ payloadData.add(payloadParamSubstring, property.getValue());
}
}
}
+ // DEPLOYMENT payload param must be set because its used by puppet agent
+ // to generate the hostname. Therefore, if DEPLOYMENT is not set in cartridge properties,
+ // adding the DEPLOYMENT="default" param
+ if(!isDeploymentParam) {
+ payloadData.add("DEPLOYMENT", "default");
+ }
//check if there are any custom payload entries defined
if (customPayloadEntries != null) {
//add them to the payload
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/dao/Cluster.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/dao/Cluster.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/dao/Cluster.java
index bacb4d8..c515461 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/dao/Cluster.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/dao/Cluster.java
@@ -75,9 +75,9 @@ public class Cluster implements Serializable {
this.hostName = hostName;
}
- public int getId() {
- return id;
- }
+ //public int getId() {
+ // return id;
+ //}
public void setId(int id) {
this.id = id;
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/Service.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/Service.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/Service.java
index adfe60f..6281ec3 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/Service.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/Service.java
@@ -62,7 +62,7 @@ public abstract class Service extends CartridgeMgtBehaviour {
setClusterId(generateClusterId(null, type));
//host name is the hostname defined in cartridge definition
- setHostName(cartridgeInfo.getHostName());
+ setHostName(generateHostName(null, cartridgeInfo.getHostName()));
// create and set PayloadData instance
setPayloadData(createPayload(cartridgeInfo, subscriptionKey, null, cluster, null, null, null));
@@ -70,7 +70,7 @@ public abstract class Service extends CartridgeMgtBehaviour {
protected String generateClusterId (String alias, String cartridgeType) {
- String clusterId = cartridgeType + cartridgeInfo.getHostName() + ".domain";
+ String clusterId = cartridgeType + "." + cartridgeInfo.getHostName() + ".domain";
// limit the cartridge alias to 30 characters in length
if (clusterId.length() > 30) {
clusterId = CartridgeSubscriptionUtils.limitLengthOfString(clusterId, 30);
@@ -79,6 +79,11 @@ public abstract class Service extends CartridgeMgtBehaviour {
return clusterId;
}
+ protected String generateHostName (String alias, String cartridgeDefinitionHostName) {
+
+ return cartridgeDefinitionHostName;
+ }
+
public void deploy (Properties properties) throws ADCException, UnregisteredCartridgeException {
register(getCartridgeInfo(), getCluster(), getPayloadData(), getAutoscalingPolicyName(), getDeploymentPolicyName(), properties);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/ServiceDeploymentManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/ServiceDeploymentManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/ServiceDeploymentManager.java
index 2c3045e..ed99d14 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/ServiceDeploymentManager.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/ServiceDeploymentManager.java
@@ -28,7 +28,10 @@ import org.apache.stratos.cloud.controller.stub.pojo.Properties;
import org.apache.stratos.cloud.controller.stub.pojo.Property;
import org.apache.stratos.manager.client.CloudControllerServiceClient;
import org.apache.stratos.manager.deploy.service.multitenant.MultiTenantService;
-import org.apache.stratos.manager.deploy.service.multitenant.lb.MultiTenantLBService;
+import org.apache.stratos.manager.deploy.service.multitenant.lb.DefaultLBService;
+import org.apache.stratos.manager.deploy.service.multitenant.lb.ExistingLBService;
+import org.apache.stratos.manager.deploy.service.multitenant.lb.LBService;
+import org.apache.stratos.manager.deploy.service.multitenant.lb.ServiceAwareLBService;
import org.apache.stratos.manager.exception.*;
import org.apache.stratos.manager.lb.category.*;
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
@@ -185,7 +188,7 @@ public class ServiceDeploymentManager {
// if (!AutoscalerServiceClient.getServiceClient().checkDefaultLBExistenceAgainstPolicy(deploymentPolicyName)) {
//
// // if lb cluster doesn't exist
-// lbService = new MultiTenantLBService(lbCartridgeType,
+// lbService = new LBService(lbCartridgeType,
// lbCartridgeInfo.getDefaultAutoscalingPolicy(),
// deploymentPolicyName, tenantId,
// lbCartridgeInfo,
@@ -261,7 +264,7 @@ public class ServiceDeploymentManager {
// lbCartridgeInfo.addProperties(property);
// lbCartridgeInfo.addProperties(loadBalancedServiceTypeProperty);
//
-// lbService = new MultiTenantLBService(lbCartridgeType,
+// lbService = new LBService(lbCartridgeType,
// lbCartridgeInfo.getDefaultAutoscalingPolicy(),
// deploymentPolicyName, tenantId,
// lbCartridgeInfo,
@@ -319,9 +322,11 @@ public class ServiceDeploymentManager {
Service service = new MultiTenantService(type, autoscalingPolicyName, deploymentPolicyName, tenantId, cartridgeInfo, tenantRange);
Properties serviceClusterProperties = null;
- if (lbDataCtxt.getLoadBalancedServiceProperties() != null && !lbDataCtxt.getLoadBalancedServiceProperties().isEmpty()) {
- serviceClusterProperties = new Properties();
- serviceClusterProperties.setProperties(lbDataCtxt.getLoadBalancedServiceProperties().toArray(new Property[0]));
+ if (lbDataCtxt != null) {
+ if (lbDataCtxt.getLoadBalancedServiceProperties() != null && !lbDataCtxt.getLoadBalancedServiceProperties().isEmpty()) {
+ serviceClusterProperties = new Properties();
+ serviceClusterProperties.setProperties(lbDataCtxt.getLoadBalancedServiceProperties().toArray(new Property[0]));
+ }
}
// create
@@ -343,35 +348,37 @@ public class ServiceDeploymentManager {
return;
}
- LoadBalancerCategory loadBalancerCategory = null;
+ LBService lbService = null;
if (lbDataCtxt.getLbCategory().equals(Constants.EXISTING_LOAD_BALANCERS)) {
- loadBalancerCategory = new ExistingLoadBalancerCategory();
+ lbService = new ExistingLBService(lbDataCtxt.getLbCartridgeInfo().getType(), lbDataCtxt.getAutoscalePolicy(),
+ lbDataCtxt.getDeploymentPolicy(), -1234, lbDataCtxt.getLbCartridgeInfo(),
+ tenantRange);
} else if (lbDataCtxt.getLbCategory().equals(Constants.DEFAULT_LOAD_BALANCER)) {
- loadBalancerCategory = new DefaultLoadBalancerCategory();
+ lbService = new DefaultLBService(lbDataCtxt.getLbCartridgeInfo().getType(), lbDataCtxt.getAutoscalePolicy(),
+ lbDataCtxt.getDeploymentPolicy(), -1234, lbDataCtxt.getLbCartridgeInfo(),
+ tenantRange);
} else if (lbDataCtxt.getLbCategory().equals(Constants.SERVICE_AWARE_LOAD_BALANCER)) {
- loadBalancerCategory = new ServiceLevelLoadBalancerCategory();
+ lbService = new ServiceAwareLBService(lbDataCtxt.getLbCartridgeInfo().getType(), lbDataCtxt.getAutoscalePolicy(),
+ lbDataCtxt.getDeploymentPolicy(), -1234, lbDataCtxt.getLbCartridgeInfo(),
+ tenantRange);
}
- if (loadBalancerCategory == null) {
+ if (lbService == null) {
throw new ADCException("The given Load Balancer category " + lbDataCtxt.getLbCategory() + " not found");
}
// Set the load balanced service type
- loadBalancerCategory.setLoadBalancedServiceType(loadBalancedService);
+ lbService.setLoadBalancedServiceType(loadBalancedService);
// Set if the load balanced service is multi tenant or not
- loadBalancerCategory.setLoadBalancedServiceMultiTenant(true); // TODO --- temp hack
+ //lbService.setLoadBalancedServiceMultiTenant(true); // TODO --- temp hack
// set the relevant deployment policy
- log.info(" ******* Setting Deployment Policy name : ------> " + lbDataCtxt.getDeploymentPolicy());
- loadBalancerCategory.setDeploymentPolicyName(lbDataCtxt.getDeploymentPolicy());
-
- Service lbService = new MultiTenantLBService(lbDataCtxt.getLbCartridgeInfo().getType(), lbDataCtxt.getAutoscalePolicy(),
- lbDataCtxt.getDeploymentPolicy(), -1234, lbDataCtxt.getLbCartridgeInfo(),
- tenantRange, loadBalancerCategory);
+ //log.info(" ******* Setting Deployment Policy name : ------> " + lbDataCtxt.getDeploymentPolicy());
+ //loadBalancerCategory.setDeploymentPolicyName(lbDataCtxt.getDeploymentPolicy());
Properties lbProperties = null;
if (lbDataCtxt.getLbProperperties() != null && !lbDataCtxt.getLbProperperties().isEmpty()) {
@@ -383,7 +390,9 @@ public class ServiceDeploymentManager {
lbService.create();
// add LB category to the payload
- lbService.getPayloadData().add(CartridgeConstants.LB_CATEGORY, lbDataCtxt.getLbCategory());
+ if (lbService.getPayloadData() != null) {
+ lbService.getPayloadData().add(CartridgeConstants.LB_CATEGORY, lbDataCtxt.getLbCategory());
+ }
// delpoy
lbService.deploy(lbProperties);
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/DefaultLBService.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/DefaultLBService.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/DefaultLBService.java
new file mode 100644
index 0000000..580186e
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/DefaultLBService.java
@@ -0,0 +1,110 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.deploy.service.multitenant.lb;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.stub.pojo.CartridgeInfo;
+import org.apache.stratos.cloud.controller.stub.pojo.ClusterContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.manager.client.AutoscalerServiceClient;
+import org.apache.stratos.manager.client.CloudControllerServiceClient;
+import org.apache.stratos.manager.dao.Cluster;
+import org.apache.stratos.manager.exception.ADCException;
+import org.apache.stratos.manager.exception.AlreadySubscribedException;
+import org.apache.stratos.manager.exception.UnregisteredCartridgeException;
+import org.apache.stratos.manager.payload.PayloadData;
+import org.apache.stratos.manager.repository.Repository;
+import org.apache.stratos.manager.subscriber.Subscriber;
+
+import java.rmi.RemoteException;
+import java.util.Map;
+
+public class DefaultLBService extends LBService {
+
+ private static Log log = LogFactory.getLog(DefaultLBService.class);
+ private boolean defaultLBServiceExists = false;
+
+ public DefaultLBService (String type, String autoscalingPolicyName, String deploymentPolicyName, int tenantId,
+ CartridgeInfo cartridgeInfo, String tenantRange) {
+
+ super(type, autoscalingPolicyName, deploymentPolicyName, tenantId, cartridgeInfo, tenantRange);
+ }
+
+ public PayloadData create(String alias, Cluster cluster, Subscriber subscriber, Repository repository, CartridgeInfo cartridgeInfo,
+ String subscriptionKey, Map<String, String> customPayloadEntries)
+ throws ADCException, AlreadySubscribedException {
+
+ // call the relevant method to get the cluster id, using deployment policy
+ String clusterId = null;
+ try {
+ clusterId = AutoscalerServiceClient.getServiceClient().getDefaultLBClusterId(getDeploymentPolicyName());
+ } catch (Exception e) {
+ log.error("Error occurred in retrieving default LB cluster id" + e.getMessage());
+ throw new ADCException(e);
+ }
+
+ if (clusterId != null) {
+ //set the cluster id to Cluster object
+ cluster.setClusterDomain(clusterId);
+ if (log.isDebugEnabled()) {
+ log.debug("Set existing default LB cluster id " + clusterId);
+ }
+ defaultLBServiceExists = true;
+
+ //get the hostname for this cluster and set it
+ ClusterContext clusterContext;
+ try {
+ clusterContext = CloudControllerServiceClient.getServiceClient().getClusterContext(clusterId);
+
+ } catch (RemoteException e) {
+ log.error("Error occurred in retrieving Cluster Context for default LB ", e);
+ throw new ADCException(e);
+ }
+
+ if (clusterContext != null) {
+ cluster.setHostName(clusterContext.getHostName());
+ if (log.isDebugEnabled()) {
+ log.debug("Set existing default LB hostname " + clusterContext.getHostName());
+ }
+ }
+
+ return null;
+
+ } else {
+ // set cluster domain
+ cluster.setClusterDomain(generateClusterId(null, cartridgeInfo.getType()));
+ // set hostname
+ cluster.setHostName(generateHostName(null, cartridgeInfo.getHostName()));
+
+ return createPayload(cartridgeInfo, subscriptionKey, subscriber, cluster, repository, alias, customPayloadEntries);
+ }
+ }
+
+ public void register(CartridgeInfo cartridgeInfo, Cluster cluster, PayloadData payloadData, String autoscalePolicyName, String deploymentPolicyName, Properties properties) throws ADCException, UnregisteredCartridgeException {
+
+ //log.info("Register service with payload data ["+payloadData+"] ");
+ if (!defaultLBServiceExists) {
+ super.register(cartridgeInfo, cluster, payloadData, autoscalePolicyName, deploymentPolicyName, properties);
+ }else {
+ log.info("Default LB already exists for deployment policy: " + getDeploymentPolicyName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ExistingLBService.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ExistingLBService.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ExistingLBService.java
new file mode 100644
index 0000000..5475e42
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ExistingLBService.java
@@ -0,0 +1,52 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.deploy.service.multitenant.lb;
+
+import org.apache.stratos.cloud.controller.stub.pojo.CartridgeInfo;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.manager.dao.Cluster;
+import org.apache.stratos.manager.exception.ADCException;
+import org.apache.stratos.manager.exception.AlreadySubscribedException;
+import org.apache.stratos.manager.exception.UnregisteredCartridgeException;
+import org.apache.stratos.manager.payload.PayloadData;
+import org.apache.stratos.manager.repository.Repository;
+import org.apache.stratos.manager.subscriber.Subscriber;
+
+import java.util.Map;
+
+public class ExistingLBService extends LBService {
+
+ public ExistingLBService(String type, String autoscalingPolicyName, String deploymentPolicyName, int tenantId, CartridgeInfo cartridgeInfo, String tenantRange) {
+ super(type, autoscalingPolicyName, deploymentPolicyName, tenantId, cartridgeInfo, tenantRange);
+ }
+
+ public PayloadData create(String alias, Cluster cluster, Subscriber subscriber, Repository repository, CartridgeInfo cartridgeInfo,
+ String subscriptionKey, Map<String, String> customPayloadEntries)
+ throws ADCException, AlreadySubscribedException {
+
+ // TODO
+ return null;
+ }
+
+ public void register(CartridgeInfo cartridgeInfo, Cluster cluster, PayloadData payloadData, String autoscalePolicyName, String deploymentPolicyName, Properties properties) throws ADCException, UnregisteredCartridgeException {
+
+ // TODO
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/LBService.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/LBService.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/LBService.java
new file mode 100644
index 0000000..52c33d3
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/LBService.java
@@ -0,0 +1,69 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.deploy.service.multitenant.lb;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.stub.pojo.CartridgeInfo;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.manager.deploy.service.Service;
+import org.apache.stratos.manager.exception.ADCException;
+import org.apache.stratos.manager.exception.AlreadySubscribedException;
+import org.apache.stratos.manager.exception.UnregisteredCartridgeException;
+import org.apache.stratos.manager.lb.category.LoadBalancerCategory;
+
+public class LBService extends Service {
+
+ private static Log log = LogFactory.getLog(LBService.class);
+
+ private String loadBalancedServiceType;
+
+ public LBService(String type, String autoscalingPolicyName, String deploymentPolicyName, int tenantId,
+ CartridgeInfo cartridgeInfo, String tenantRange) {
+
+ super(type, autoscalingPolicyName, deploymentPolicyName, tenantId, cartridgeInfo, tenantRange);
+ }
+
+ public void create () throws ADCException {
+
+ try {
+ setPayloadData(create(null, getCluster(), null, null, getCartridgeInfo(), getSubscriptionKey(), null));
+
+ } catch (AlreadySubscribedException e) {
+ throw new ADCException(e);
+ }
+ }
+
+ @Override
+ public void deploy(Properties properties) throws ADCException, UnregisteredCartridgeException {
+
+ //register the service
+ register(getCartridgeInfo(), getCluster(), getPayloadData(), getAutoscalingPolicyName(), getDeploymentPolicyName(),
+ properties);
+ }
+
+ public String getLoadBalancedServiceType() {
+ return loadBalancedServiceType;
+ }
+
+ public void setLoadBalancedServiceType(String loadBalancedServiceType) {
+ this.loadBalancedServiceType = loadBalancedServiceType;
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ServiceAwareLBService.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ServiceAwareLBService.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ServiceAwareLBService.java
new file mode 100644
index 0000000..282cbf3
--- /dev/null
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/deploy/service/multitenant/lb/ServiceAwareLBService.java
@@ -0,0 +1,137 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.stratos.manager.deploy.service.multitenant.lb;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.stratos.cloud.controller.stub.pojo.CartridgeInfo;
+import org.apache.stratos.cloud.controller.stub.pojo.ClusterContext;
+import org.apache.stratos.cloud.controller.stub.pojo.Properties;
+import org.apache.stratos.manager.client.AutoscalerServiceClient;
+import org.apache.stratos.manager.client.CloudControllerServiceClient;
+import org.apache.stratos.manager.dao.Cluster;
+import org.apache.stratos.manager.exception.ADCException;
+import org.apache.stratos.manager.exception.AlreadySubscribedException;
+import org.apache.stratos.manager.exception.UnregisteredCartridgeException;
+import org.apache.stratos.manager.payload.PayloadData;
+import org.apache.stratos.manager.repository.Repository;
+import org.apache.stratos.manager.subscriber.Subscriber;
+import org.apache.stratos.manager.subscription.utils.CartridgeSubscriptionUtils;
+import org.apache.stratos.manager.utils.CartridgeConstants;
+
+import java.rmi.RemoteException;
+import java.util.Map;
+
+public class ServiceAwareLBService extends LBService {
+
+ public ServiceAwareLBService(String type, String autoscalingPolicyName, String deploymentPolicyName, int tenantId,
+ CartridgeInfo cartridgeInfo, String tenantRange) {
+
+ super(type, autoscalingPolicyName, deploymentPolicyName, tenantId, cartridgeInfo, tenantRange);
+ }
+
+ private static Log log = LogFactory.getLog(ServiceAwareLBService.class);
+
+ private boolean serviceAwareLBExists;
+
+ public PayloadData create (String alias, Cluster cluster, Subscriber subscriber, Repository repository, CartridgeInfo cartridgeInfo,
+ String subscriptionKey, Map<String, String> customPayloadEntries) throws ADCException, AlreadySubscribedException {
+
+ // call the relevant method to get the cluster id, using deployment policy and type
+ String clusterId = null;
+
+ try {
+ clusterId = AutoscalerServiceClient.getServiceClient().getServiceLBClusterId(getLoadBalancedServiceType(), getDeploymentPolicyName());
+
+ } catch (Exception e) {
+ log.error("Error occurred in retrieving Service LB cluster id" + e.getMessage());
+ throw new ADCException(e);
+ }
+
+ if (clusterId != null) {
+
+ //set the cluster id to Cluster object
+ cluster.setClusterDomain(clusterId);
+ if (log.isDebugEnabled()) {
+ log.debug("Set existing Service LB cluster id " + clusterId);
+ }
+ serviceAwareLBExists = true;
+
+ //get the hostname for this cluster and set it
+ ClusterContext clusterContext;
+ try {
+ clusterContext = CloudControllerServiceClient.getServiceClient().getClusterContext(clusterId);
+
+ } catch (RemoteException e) {
+ log.error("Error occurred in retrieving Cluster Context for Service LB ", e);
+ throw new ADCException(e);
+ }
+
+ if (clusterContext != null) {
+ cluster.setHostName(clusterContext.getHostName());
+ if (log.isDebugEnabled()) {
+ log.debug("Set existing Service LB hostname " + clusterContext.getHostName());
+ }
+ }
+
+ return null;
+
+ } else {
+
+ // set cluster domain
+ cluster.setClusterDomain(generateClusterId(getLoadBalancedServiceType(), cartridgeInfo.getType()));
+ // set hostname
+ cluster.setHostName(generateHostName(getLoadBalancedServiceType(), cartridgeInfo.getHostName()));
+
+ PayloadData serviceLevelLbPayloadData = createPayload(cartridgeInfo, subscriptionKey, subscriber, cluster,
+ repository, alias, customPayloadEntries);
+
+ // add payload entry for load balanced service type
+ serviceLevelLbPayloadData.add(CartridgeConstants.LOAD_BALANCED_SERVICE_TYPE, getLoadBalancedServiceType());
+ return serviceLevelLbPayloadData;
+ }
+ }
+
+ protected String generateClusterId (String loadBalancedServiceType, String cartridgeType) {
+
+ String clusterId = cartridgeType + "." + loadBalancedServiceType + "." + getCartridgeInfo().getHostName() + ".domain";
+ // limit the cartridge alias to 30 characters in length
+ if (clusterId.length() > 30) {
+ clusterId = CartridgeSubscriptionUtils.limitLengthOfString(clusterId, 30);
+ }
+
+ return clusterId;
+ }
+
+ protected String generateHostName (String loadBalancedServiceType, String cartridgeDefinitionHostName) {
+
+ return getCartridgeInfo().getType() + "." + loadBalancedServiceType + "." + cartridgeDefinitionHostName;
+ }
+
+ public void register(CartridgeInfo cartridgeInfo, Cluster cluster, PayloadData payloadData, String autoscalePolicyName, String deploymentPolicyName, Properties properties) throws ADCException, UnregisteredCartridgeException {
+
+ if (!serviceAwareLBExists) {
+ super.register(cartridgeInfo, cluster, payloadData, autoscalePolicyName, deploymentPolicyName, properties);
+
+ }else {
+ log.info("Service Aware LB already exists for cartridge type: " + getLoadBalancedServiceType() + ", deployment policy: " + getDeploymentPolicyName());
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/DefaultLoadBalancerCategory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/DefaultLoadBalancerCategory.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/DefaultLoadBalancerCategory.java
index e49dbde..8470cb5 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/DefaultLoadBalancerCategory.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/DefaultLoadBalancerCategory.java
@@ -89,20 +89,19 @@ public class DefaultLoadBalancerCategory extends LoadBalancerCategory {
// set hostname
cluster.setHostName(generateHostName(alias, cartridgeInfo.getHostName()));
- return createPayload(cartridgeInfo, subscriptionKey, subscriber,
- cluster, repository, alias, customPayloadEntries);
+ return createPayload(cartridgeInfo, subscriptionKey, subscriber, cluster, repository, alias, customPayloadEntries);
}
}
public void register(CartridgeInfo cartridgeInfo, Cluster cluster, PayloadData payloadData, String autoscalePolicyName, String deploymentPolicyName, Properties properties) throws ADCException, UnregisteredCartridgeException {
- log.info("Register service with payload data ["+payloadData+"] ");
+ //log.info("Register service with payload data ["+payloadData+"] ");
if (!isDefaultLBExists()) {
- if(payloadData != null) {
- log.info("Payload: " + payloadData.getCompletePayloadData().toString());
- }
+ //if(payloadData != null) {
+ //log.info("Payload: " + payloadData.getCompletePayloadData().toString());
+ //}
super.register(cartridgeInfo, cluster, payloadData, autoscalePolicyName, deploymentPolicyName, properties);
}else {
- log.info(" Default LB exists... Not registering...");
+ log.info("Default LB already exists for deployment policy: " + getDeploymentPolicyName());
}
}
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/ServiceLevelLoadBalancerCategory.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/ServiceLevelLoadBalancerCategory.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/ServiceLevelLoadBalancerCategory.java
index fc47261..131b4dc 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/ServiceLevelLoadBalancerCategory.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/lb/category/ServiceLevelLoadBalancerCategory.java
@@ -106,9 +106,9 @@ public class ServiceLevelLoadBalancerCategory extends LoadBalancerCategory {
if (!serviceLbExists) {
- if(payloadData != null) {
- log.info("Payload: " + payloadData.getCompletePayloadData().toString());
- }
+ // if(payloadData != null) {
+ // log.info("Payload: " + payloadData.getCompletePayloadData().toString());
+ // }
super.register(cartridgeInfo, cluster, payloadData, autoscalePolicyName, deploymentPolicyName, properties);
}else {
http://git-wip-us.apache.org/repos/asf/stratos/blob/7b35e29e/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
index e7058f5..ce04409 100644
--- a/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
+++ b/components/org.apache.stratos.manager/src/main/java/org/apache/stratos/manager/manager/CartridgeSubscriptionManager.java
@@ -21,6 +21,7 @@ package org.apache.stratos.manager.manager;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.openjpa.util.java$util$ArrayList$proxy;
import org.apache.stratos.cloud.controller.stub.CloudControllerServiceUnregisteredCartridgeExceptionException;
import org.apache.stratos.cloud.controller.stub.pojo.CartridgeInfo;
import org.apache.stratos.cloud.controller.stub.pojo.LoadbalancerConfig;
@@ -28,8 +29,10 @@ import org.apache.stratos.cloud.controller.stub.pojo.Properties;
import org.apache.stratos.cloud.controller.stub.pojo.Property;
import org.apache.stratos.manager.client.CloudControllerServiceClient;
import org.apache.stratos.manager.dao.CartridgeSubscriptionInfo;
+import org.apache.stratos.manager.deploy.service.Service;
import org.apache.stratos.manager.dto.SubscriptionInfo;
import org.apache.stratos.manager.exception.*;
+import org.apache.stratos.manager.internal.DataHolder;
import org.apache.stratos.manager.lb.category.*;
import org.apache.stratos.manager.repository.Repository;
import org.apache.stratos.manager.retriever.DataInsertionAndRetrievalManager;
@@ -37,6 +40,7 @@ import org.apache.stratos.manager.subscriber.Subscriber;
import org.apache.stratos.manager.subscription.CartridgeSubscription;
import org.apache.stratos.manager.subscription.PersistenceContext;
import org.apache.stratos.manager.subscription.SubscriptionData;
+import org.apache.stratos.manager.subscription.SubscriptionDomain;
import org.apache.stratos.manager.subscription.factory.CartridgeSubscriptionFactory;
import org.apache.stratos.manager.subscription.tenancy.SubscriptionMultiTenantBehaviour;
import org.apache.stratos.manager.subscription.tenancy.SubscriptionSingleTenantBehaviour;
@@ -46,14 +50,17 @@ import org.apache.stratos.manager.topology.model.TopologyClusterInformationModel
import org.apache.stratos.manager.utils.ApplicationManagementUtil;
import org.apache.stratos.manager.utils.CartridgeConstants;
import org.apache.stratos.manager.utils.RepoPasswordMgtUtil;
+import org.apache.stratos.messaging.broker.publish.EventPublisher;
+import org.apache.stratos.messaging.broker.publish.EventPublisherPool;
import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainAddedEvent;
+import org.apache.stratos.messaging.event.tenant.SubscriptionDomainRemovedEvent;
import org.apache.stratos.messaging.util.Constants;
import org.wso2.carbon.context.CarbonContext;
import org.apache.stratos.manager.publisher.CartridgeSubscriptionDataPublisher;
-import java.util.Collection;
-import java.util.Random;
+import java.util.*;
/**
* Manager class for the purpose of managing CartridgeSubscriptionInfo subscriptions, groupings, etc.
@@ -290,25 +297,64 @@ public class CartridgeSubscriptionManager {
// Create the CartridgeSubscription instance
CartridgeSubscription cartridgeSubscription = CartridgeSubscriptionFactory.getCartridgeSubscriptionInstance(cartridgeInfo, tenancyBehaviour);
- // Generate and set the key
- String subscriptionKey = CartridgeSubscriptionUtils.generateSubscriptionKey();
- cartridgeSubscription.setSubscriptionKey(subscriptionKey);
- String encryptedRepoPassword;
- String repositoryPassword = subscriptionData.getRepositoryPassword();
- if(repositoryPassword != null && !repositoryPassword.isEmpty()) {
- encryptedRepoPassword = RepoPasswordMgtUtil.encryptPassword(repositoryPassword, subscriptionKey);
- } else {
- encryptedRepoPassword = "";
+ // For MT cartridges subscription key should not be generated for every subscription,
+ // instead use the already generated key at the time of service deployment
+ String subscriptionKey = null;
+ if(cartridgeInfo.getMultiTenant()) {
+ try {
+ Service service = new DataInsertionAndRetrievalManager().getService(subscriptionData.getCartridgeType());
+ if(service != null) {
+ subscriptionKey = service.getSubscriptionKey();
+ }else {
+ String msg = "Could not find service for cartridge type [" + subscriptionData.getCartridgeType() + "] " ;
+ log.error(msg);
+ throw new ADCException(msg);
+ }
+ } catch (Exception e) {
+ String msg = "Exception has occurred in get service for cartridge type [" + subscriptionData.getCartridgeType() + "] " ;
+ log.error(msg);
+ throw new ADCException(msg, e);
+ }
+ }else {
+ // Generate and set the key
+ subscriptionKey = CartridgeSubscriptionUtils.generateSubscriptionKey();
+ }
+
+ cartridgeSubscription.setSubscriptionKey(subscriptionKey);
+
+ if(log.isDebugEnabled()) {
+ log.debug("Repository with url: " + subscriptionData.getRepositoryURL() +
+ " username: " + subscriptionData.getRepositoryUsername() +
+ " Type: " + subscriptionData.getRepositoryType());
}
+
+ // Create subscriber
+ Subscriber subscriber = new Subscriber(subscriptionData.getTenantAdminUsername(), subscriptionData.getTenantId(), subscriptionData.getTenantDomain());
+ cartridgeSubscription.setSubscriber(subscriber);
+ cartridgeSubscription.setAlias(subscriptionData.getCartridgeAlias());
// Create repository
Repository repository = cartridgeSubscription.manageRepository(subscriptionData.getRepositoryURL(), subscriptionData.getRepositoryUsername(),
- encryptedRepoPassword,
+ subscriptionData.getRepositoryPassword(),
subscriptionData.isPrivateRepository());
- // Create subscriber
- Subscriber subscriber = new Subscriber(subscriptionData.getTenantAdminUsername(), subscriptionData.getTenantId(), subscriptionData.getTenantDomain());
+ // Update repository attributes
+ if(repository != null) {
+
+ repository.setCommitEnabled(subscriptionData.isCommitsEnabled());
+
+ // Encrypt repository password
+ String encryptedRepoPassword;
+ String repositoryPassword = repository.getPassword();
+ if(repositoryPassword != null && !repositoryPassword.isEmpty()) {
+ encryptedRepoPassword = RepoPasswordMgtUtil.encryptPassword(repositoryPassword, subscriptionKey);
+ } else {
+ encryptedRepoPassword = "";
+ }
+ repository.setPassword(encryptedRepoPassword);
+
+ }
// set the LB cluster id relevant to this service cluster
cartridgeSubscription.setLbClusterId(lbClusterId);
@@ -375,13 +421,154 @@ public class CartridgeSubscriptionManager {
log.info("Successful Subscription: " + cartridgeSubscription.toString());
// Publish tenant subscribed event to message broker
+ Set<String> clusterIds = new HashSet<String>();
+ clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
CartridgeSubscriptionUtils.publishTenantSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
- cartridgeSubscription.getCartridgeInfo().getType());
+ cartridgeSubscription.getCartridgeInfo().getType(), clusterIds);
return ApplicationManagementUtil.
createSubscriptionResponse(cartridgeSubscriptionInfo, cartridgeSubscription.getRepository());
}
+ public void addSubscriptionDomain(int tenantId, String subscriptionAlias, String domainName, String applicationContext)
+ throws ADCException {
+
+ CartridgeSubscription cartridgeSubscription;
+ try {
+ cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+ if(cartridgeSubscription == null) {
+ throw new ADCException("Cartridge subscription not found");
+ }
+
+ if(!isSubscriptionDomainValid(domainName)) {
+ throw new ADCException(String.format("Domain name %s already registered", domainName));
+ }
+
+ cartridgeSubscription.addSubscriptionDomain(new SubscriptionDomain(domainName, applicationContext));
+ new DataInsertionAndRetrievalManager().cacheAndUpdateSubscription(cartridgeSubscription);
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Could not add domain to cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domain-name] " + domainName + " [application-context] " + applicationContext;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+
+ log.info("Successfully added domains to cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domain-name] " + domainName + " [application-context] " +applicationContext);
+
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
+ Set<String> clusterIds = new HashSet<String>();
+ clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
+ SubscriptionDomainAddedEvent event = new SubscriptionDomainAddedEvent(tenantId, cartridgeSubscription.getType(),
+ clusterIds, domainName, applicationContext);
+ eventPublisher.publish(event);
+ }
+
+ public void removeSubscriptionDomain(int tenantId, String subscriptionAlias, String domainName)
+ throws ADCException {
+
+ CartridgeSubscription cartridgeSubscription;
+ try {
+ cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+ if(cartridgeSubscription == null) {
+ throw new ADCException("Cartridge subscription not found");
+ }
+ cartridgeSubscription.removeSubscriptionDomain(domainName);
+ new DataInsertionAndRetrievalManager().cacheAndUpdateSubscription(cartridgeSubscription);
+ } catch (PersistenceManagerException e) {
+ String errorMsg = "Could not remove domain from cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domain-name] " + domainName;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+
+ log.info("Successfully removed domain from cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias +
+ " [domain-name] " + domainName);
+
+ EventPublisher eventPublisher = EventPublisherPool.getPublisher(Constants.TENANT_TOPIC);
+ Set<String> clusterIds = new HashSet<String>();
+ clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
+ SubscriptionDomainRemovedEvent event = new SubscriptionDomainRemovedEvent(tenantId, cartridgeSubscription.getType(),
+ clusterIds, domainName);
+ eventPublisher.publish(event);
+ }
+
+ public List<SubscriptionDomain> getSubscriptionDomains(int tenantId, String subscriptionAlias)
+ throws ADCException {
+
+ try {
+ CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+ if(cartridgeSubscription == null) {
+ throw new ADCException("Cartridge subscription not found");
+ }
+
+ //return (List<SubscriptionDomain>) cartridgeSubscription.getSubscriptionDomains();
+ return new ArrayList<SubscriptionDomain>(cartridgeSubscription.getSubscriptionDomains());
+ } catch (Exception e) {
+ String errorMsg = "Could not get domains of cartridge subscription: [tenant-id] " + tenantId + " [subscription-alias] " + subscriptionAlias;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+ }
+
+ public SubscriptionDomain getSubscriptionDomain(int tenantId, String subscriptionAlias, String domain)
+ throws ADCException {
+
+ try {
+ CartridgeSubscription cartridgeSubscription = getCartridgeSubscription(tenantId, subscriptionAlias);
+ if(cartridgeSubscription == null) {
+ throw new ADCException("Cartridge subscription not found");
+ }
+
+ return cartridgeSubscription.getSubscriptionDomain(domain);
+ } catch (Exception e) {
+ String errorMsg = "Could not check [domain] "+domain+" against cartridge subscription: [tenant-id] "
+ + tenantId + " [subscription-alias] " + subscriptionAlias;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+ }
+
+ public boolean isSubscriptionDomainValid(String domainName) throws ADCException {
+ try {
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Validating domain: %s", domainName));
+ }
+ org.wso2.carbon.user.core.tenant.TenantManager tenantManager = DataHolder.getRealmService().getTenantManager();
+ org.wso2.carbon.user.api.Tenant[] tenants = tenantManager.getAllTenants();
+ if((tenants != null) && (tenants.length > 0)) {
+ DataInsertionAndRetrievalManager manager = new DataInsertionAndRetrievalManager();
+ for (org.wso2.carbon.user.api.Tenant tenant : tenants) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Reading subscriptions for tenant: [tenant-id] %d [tenant-domain] %s",
+ tenant.getId(), tenant.getDomain()));
+ }
+ Collection<CartridgeSubscription> subscriptions = manager.getCartridgeSubscriptions(tenant.getId());
+ if (subscriptions == null) {
+ continue;
+ }
+ for (CartridgeSubscription subscription : subscriptions) {
+ if (log.isDebugEnabled()) {
+ log.debug(String.format("Reading domain names in subscription: [alias] %s [domain-names] %s",
+ subscription.getAlias(), subscription.getSubscriptionDomains()));
+ }
+ if (subscription.subscriptionDomainExists(domainName)) {
+ return false;
+ }
+ }
+ }
+ }
+ if(log.isDebugEnabled()) {
+ log.debug(String.format("Domain name %s is valid", domainName));
+ }
+ return true;
+ } catch (Exception e) {
+ String errorMsg = "Could not validate domain: " + domainName;
+ log.error(errorMsg);
+ throw new ADCException(errorMsg, e);
+ }
+ }
+
public Collection<CartridgeSubscription> getCartridgeSubscriptions (int tenantId, String type) throws ADCException {
if (type == null || type.isEmpty()) {
@@ -430,8 +617,11 @@ public class CartridgeSubscriptionManager {
}
// Publish tenant un-subscribed event to message broker
- CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(cartridgeSubscription.getSubscriber().getTenantId(),
- cartridgeSubscription.getCartridgeInfo().getType());
+ Set<String> clusterIds = new HashSet<String>();
+ clusterIds.add(cartridgeSubscription.getCluster().getClusterDomain());
+ CartridgeSubscriptionUtils.publishTenantUnSubscribedEvent(
+ cartridgeSubscription.getSubscriber().getTenantId(),
+ cartridgeSubscription.getCartridgeInfo().getType(), clusterIds);
// publishing to the unsubscribed event details to bam
CartridgeSubscriptionDataPublisher.publish(cartridgeSubscription