You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/05/02 12:43:57 UTC
git commit: Fixed default port issue and used axis2 member http/https
port properties
Repository: incubator-stratos
Updated Branches:
refs/heads/master ace6d562d -> 2dbaae431
Fixed default port issue and used axis2 member http/https port properties
Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/2dbaae43
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/2dbaae43
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/2dbaae43
Branch: refs/heads/master
Commit: 2dbaae431887aee8ebd1349c5366ed386815883d
Parents: ace6d56
Author: Imesh Gunaratne <im...@apache.org>
Authored: Fri May 2 16:13:23 2014 +0530
Committer: Imesh Gunaratne <im...@apache.org>
Committed: Fri May 2 16:13:23 2014 +0530
----------------------------------------------------------------------
.../TenantAwareLoadBalanceEndpoint.java | 291 ++++++++++++-------
.../stratos/load/balancer/util/Constants.java | 9 +-
2 files changed, 191 insertions(+), 109 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2dbaae43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 709ad10..1ffdb50 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -28,14 +28,17 @@ import org.apache.stratos.load.balancer.algorithm.LoadBalanceAlgorithmFactory;
import org.apache.stratos.load.balancer.conf.LoadBalancerConfiguration;
import org.apache.stratos.load.balancer.conf.domain.MemberIpType;
import org.apache.stratos.load.balancer.conf.domain.TenantIdentifier;
+import org.apache.stratos.load.balancer.context.LoadBalancerContext;
import org.apache.stratos.load.balancer.statistics.InFlightRequestDecrementCallable;
import org.apache.stratos.load.balancer.statistics.InFlightRequestIncrementCallable;
import org.apache.stratos.load.balancer.statistics.LoadBalancerStatisticsExecutor;
import org.apache.stratos.load.balancer.util.Constants;
import org.apache.stratos.messaging.domain.tenant.Tenant;
+import org.apache.stratos.messaging.domain.topology.Cluster;
import org.apache.stratos.messaging.domain.topology.Member;
import org.apache.stratos.messaging.domain.topology.Port;
import org.apache.stratos.messaging.message.receiver.tenant.TenantManager;
+import org.apache.stratos.messaging.message.receiver.topology.TopologyManager;
import org.apache.synapse.MessageContext;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
@@ -87,7 +90,6 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
@Override
public void send(MessageContext synCtx) {
-
SessionInformation sessionInformation = null;
org.apache.axis2.clustering.Member currentMember = null;
if (isSessionAffinityBasedLB()) {
@@ -118,6 +120,8 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
TenantAwareLoadBalanceFaultHandler faultHandler = new TenantAwareLoadBalanceFaultHandler();
if (sessionInformation != null && currentMember != null) {
+ // Update axis2 member ports
+ updateAxis2MemberPorts(synCtx, currentMember);
// Send request to the member with the existing session
sessionInformation.updateExpiryTime();
sendToApplicationMember(synCtx, currentMember, faultHandler, false);
@@ -199,90 +203,98 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
}
private org.apache.axis2.clustering.Member findNextMember(MessageContext synCtx) {
- try {
- String targetHost = extractTargetHost(synCtx);
- if (!requestDelegator.isTargetHostValid(targetHost)) {
- throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost));
- }
+ String targetHost = extractTargetHost(synCtx);
+ if (!requestDelegator.isTargetHostValid(targetHost)) {
+ throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost));
+ }
- Member member = null;
- if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) {
- // Try to find next member from multi-tenant cluster map
- if (log.isDebugEnabled()) {
- log.debug("Multi-tenancy enabled, scanning URL for tenant...");
- }
- String url = extractUrl(synCtx);
- int tenantId = scanUrlForTenantId(url);
- if (tenantExists(tenantId)) {
- // Tenant found, find member from hostname and tenant id
- member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId);
- } else {
- // Tenant id not found in URL, find member from host name
- member = requestDelegator.findNextMemberFromHostName(targetHost);
- }
+ Member member = null;
+ if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) {
+ // Try to find next member from multi-tenant cluster map
+ if (log.isDebugEnabled()) {
+ log.debug("Multi-tenancy enabled, scanning URL for tenant...");
+ }
+ String url = extractUrl(synCtx);
+ int tenantId = scanUrlForTenantId(url);
+ if (tenantExists(tenantId)) {
+ // Tenant found, find member from hostname and tenant id
+ member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId);
} else {
- // Find next member from host name
+ // Tenant id not found in URL, find member from host name
member = requestDelegator.findNextMemberFromHostName(targetHost);
}
+ } else {
+ // Find next member from host name
+ member = requestDelegator.findNextMemberFromHostName(targetHost);
+ }
+
+ if (member == null)
+ return null;
+
+ // Create Axi2 member object
+ org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(
+ getMemberIp(synCtx, member), -1);
+ axis2Member.setDomain(member.getClusterId());
+ axis2Member.setActive(member.isActive());
+ // Set cluster id and member id in member properties
+ axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId());
+ axis2Member.getProperties().setProperty(Constants.MEMBER_ID, member.getMemberId());
+ // Update axis2 member ports
+ updateAxis2MemberPorts(synCtx, axis2Member);
+ return axis2Member;
+ }
- if (member == null)
- return null;
-
- // Find mapping outgoing port for incoming port
- int incomingPort = findIncomingPort(synCtx);
- Port outgoingPort = findOutgoingPort(member, incomingPort);
- if (outgoingPort == null) {
- if (log.isErrorEnabled()) {
- log.error(String.format("Could not find the port for proxy port %d in member %s", incomingPort,
- member.getMemberId()));
- }
- throwSynapseException(synCtx, 500, "Internal server error");
- }
-
- // Create Axi2 member object
- org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(
- getMemberIp(synCtx, member), outgoingPort.getValue());
- axis2Member.setDomain(member.getClusterId());
- axis2Member.setActive(member.isActive());
-
- // Set cluster id and partition id in message context
- axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId());
- return axis2Member;
+ /**
+ * Update http/https port in axis2 member according to incoming request port.
+ *
+ * @param synCtx
+ * @param axis2Member
+ */
+ private void updateAxis2MemberPorts(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member) {
+ if(log.isDebugEnabled()) {
+ log.debug("Updating axis2 member port");
}
- catch (Exception e) {
- if(log.isErrorEnabled()) {
- log.error("Could not find a member to serve the request");
+
+ // Find mapping outgoing port for incoming port
+ int incomingPort = findIncomingPort(synCtx);
+ String transport = extractTransport(synCtx);
+ Port outgoingPort = findOutgoingPort(synCtx, axis2Member, transport, incomingPort);
+ if (outgoingPort == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Could not find the port for proxy port %d in member %s", incomingPort,
+ axis2Member.getProperties().getProperty(Constants.MEMBER_ID)));
}
throwSynapseException(synCtx, 500, "Internal server error");
}
- return null;
+ if (Constants.HTTP.equals(transport)) {
+ axis2Member.setHttpPort(outgoingPort.getValue());
+ } else if (Constants.HTTPS.equals(transport)) {
+ axis2Member.setHttpsPort(outgoingPort.getValue());
+ }
}
- /***
+ /**
* Find incoming port from request URL.
+ *
* @param synCtx
* @return
* @throws MalformedURLException
*/
- private int findIncomingPort(MessageContext synCtx) throws MalformedURLException {
+ private int findIncomingPort(MessageContext synCtx) {
org.apache.axis2.context.MessageContext msgCtx =
((Axis2MessageContext) synCtx).getAxis2MessageContext();
try {
- Map headerMap = (Map) msgCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
- if (headerMap != null) {
- String hostHeader = (String) headerMap.get(HTTP.TARGET_HOST);
- int index = hostHeader.indexOf(':');
- if (index != -1) {
- int port = Integer.parseInt(hostHeader.trim().substring(index + 1));
- if (log.isDebugEnabled()) {
- log.debug("Incoming request port found: " + port);
- }
- return port;
+ String servicePrefix = (String) msgCtx.getProperty(Constants.AXIS2_MSG_CTX_SERVICE_PREFIX);
+ if (servicePrefix == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("%s property not found in axis2 message context", Constants.AXIS2_MSG_CTX_SERVICE_PREFIX));
}
+ throwSynapseException(synCtx, 500, "Internal server error");
}
- }
- catch (Exception e) {
- if(log.isErrorEnabled()) {
+ URL servicePrefixUrl = new URL(servicePrefix);
+ return servicePrefixUrl.getPort();
+ } catch (MalformedURLException e) {
+ if (log.isErrorEnabled()) {
log.error("Could not find incoming request port");
}
throwSynapseException(synCtx, 500, "Internal server error");
@@ -290,17 +302,30 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
return -1;
}
- /***
+ /**
* Find mapping outgoing port for incoming port.
- * @param member
+ *
+ * @param synCtx
+ * @param axis2Member
+ * @param transport
* @param incomingPort
* @return
* @throws MalformedURLException
*/
- private Port findOutgoingPort(Member member, int incomingPort) throws MalformedURLException {
- if((member != null) && (member.getPorts() != null)) {
+ private Port findOutgoingPort(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member, String transport, int incomingPort) {
+ Member member = findMemberFromAxis2Member(synCtx, axis2Member);
+ if ((member != null) && (member.getPorts() != null)) {
Port outgoingPort = member.getPort(incomingPort);
- if(outgoingPort != null) {
+ if (outgoingPort != null) {
+ if (!transport.equals(outgoingPort.getProtocol())) {
+ if (log.isErrorEnabled()) {
+ String message = String.format("Transport %s is not valid for port %d", transport, incomingPort);
+ if (log.isErrorEnabled()) {
+ log.error(message);
+ }
+ throwSynapseException(synCtx, 500, message);
+ }
+ }
if (log.isDebugEnabled()) {
log.debug("Outgoing request port found: " + outgoingPort.getValue());
}
@@ -310,47 +335,99 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
return null;
}
- /***
+ /**
+ * Find topology member from axis2 member using cluster id and member id defined in axis2 member properties.
+ *
+ * @param synCtx
+ * @param axis2Member
+ * @return
+ */
+ private Member findMemberFromAxis2Member(MessageContext synCtx, org.apache.axis2.clustering.Member axis2Member) {
+ String clusterId = axis2Member.getProperties().getProperty(Constants.CLUSTER_ID);
+ String memberId = axis2Member.getProperties().getProperty(Constants.MEMBER_ID);
+ if (StringUtils.isBlank(clusterId) || StringUtils.isBlank(memberId)) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Could not find cluster id and/or member id properties in axis2 member: [cluster-id] %s " +
+ "[member-id] %s", clusterId, memberId));
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ try {
+ TopologyManager.acquireReadLock();
+ Cluster cluster = LoadBalancerContext.getInstance().getClusterIdClusterMap().getCluster(clusterId);
+ if (cluster == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Cluster not found in load balancer context: [cluster-id] %s ", clusterId));
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ Member member = cluster.getMember(memberId);
+ if (member == null) {
+ if (log.isErrorEnabled()) {
+ log.error(String.format("Member not found in load balancer context: [cluster-id] %s [member-id] %s", clusterId, memberId));
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ }
+ return member;
+ } finally {
+ TopologyManager.releaseReadLock();
+ }
+ }
+
+ /**
* Get members private or public ip according to load balancer configuration.
+ *
* @param synCtx
* @param member
* @return
*/
private String getMemberIp(MessageContext synCtx, Member member) {
- if(LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) {
- if(LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) {
+ if (LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) {
+ if (LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) {
// Return member's public IP address
- if(StringUtils.isBlank(member.getMemberPublicIp())) {
+ if (StringUtils.isBlank(member.getMemberPublicIp())) {
if (log.isErrorEnabled()) {
log.error(String.format("Member public IP address not found: [member] %s", member.getMemberId()));
}
throwSynapseException(synCtx, 500, "Internal server error");
}
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("Using member public IP address: [member] %s [ip] %s", member.getMemberId(), member.getMemberPublicIp()));
}
return member.getMemberPublicIp();
}
}
// Return member's private IP address
- if(StringUtils.isBlank(member.getMemberIp())) {
+ if (StringUtils.isBlank(member.getMemberIp())) {
if (log.isErrorEnabled()) {
log.error(String.format("Member IP address not found: [member] %s", member.getMemberId()));
}
throwSynapseException(synCtx, 500, "Internal server error");
}
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug(String.format("Using member IP address: [member] %s [ip] %s", member.getMemberId(), member.getMemberIp()));
}
return member.getMemberIp();
}
+ /**
+ * Extract incoming request URL from message context.
+ *
+ * @param synCtx
+ * @return
+ */
private String extractUrl(MessageContext synCtx) {
Axis2MessageContext axis2smc = (Axis2MessageContext) synCtx;
org.apache.axis2.context.MessageContext axis2MessageCtx = axis2smc.getAxis2MessageContext();
return (String) axis2MessageCtx.getProperty(Constants.AXIS2_MSG_CTX_TRANSPORT_IN_URL);
}
+ /**
+ * Scan given URL for tenant id.
+ *
+ * @param url
+ * @return
+ */
private int scanUrlForTenantId(String url) {
int tenantId = -1;
String regex = LoadBalancerConfiguration.getInstance().getTenantIdentifierRegex();
@@ -456,22 +533,22 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
return endpoint;
}
- private EndpointReference getEndpointReferenceAfterURLRewrite(org.apache.axis2.clustering.Member currentMember,
+ private EndpointReference getEndpointReferenceAfterURLRewrite(MessageContext synCtx, org.apache.axis2.clustering.Member currentMember,
String transport,
String address) {
+ try {
+ if (transport.startsWith(Constants.HTTPS)) {
+ transport = Constants.HTTPS;
+ } else if (transport.startsWith(Constants.HTTP)) {
+ transport = Constants.HTTP;
+ } else {
+ String msg = "Cannot load balance for non-HTTP/S transport " + transport;
+ log.error(msg);
+ throwSynapseException(synCtx, 500, msg);
+ }
- if (transport.startsWith("https")) {
- transport = "https";
- } else if (transport.startsWith("http")) {
- transport = "http";
- } else {
- String msg = "Cannot load balance for non-HTTP/S transport " + transport;
- log.error(msg);
- throw new SynapseException(msg);
- }
- // URL Rewrite
- if (transport.startsWith("http") || transport.startsWith("https")) {
- if (address.startsWith("http://") || address.startsWith("https://")) {
+ // URL Rewrite
+ if (address.startsWith(Constants.HTTP + "://") || address.startsWith(Constants.HTTPS + "://")) {
try {
String _address = address.indexOf("?") > 0 ? address.substring(address.indexOf("?"), address.length()) : "";
address = new URL(address).getPath() + _address;
@@ -483,13 +560,15 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
}
String hostName = currentMember.getHostName();
- int port = currentMember.getPort();
- return new EndpointReference(transport + "://" + hostName +
- ":" + port + address);
- } else {
- String msg = "Cannot load balance for non-HTTP/S transport " + transport;
- log.error(msg);
- throw new SynapseException(msg);
+ int port = (transport.startsWith(Constants.HTTPS)) ? currentMember.getHttpsPort() : currentMember.getHttpPort();
+ return new EndpointReference(new URL(transport, hostName, port, address).toString());
+
+ } catch (MalformedURLException e) {
+ if (log.isErrorEnabled()) {
+ log.error("Could not create endpoint reference", e);
+ }
+ throwSynapseException(synCtx, 500, "Internal server error");
+ return null;
}
}
@@ -539,7 +618,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
String transport = axis2MsgCtx.getTransportIn().getName();
String address = synCtx.getTo().getAddress();
- EndpointReference to = getEndpointReferenceAfterURLRewrite(currentMember, transport, address);
+ EndpointReference to = getEndpointReferenceAfterURLRewrite(synCtx, currentMember, transport, address);
synCtx.setTo(to);
Endpoint endpoint = getEndpoint(to, currentMember, synCtx);
@@ -595,14 +674,13 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
private void incrementInFlightRequestCount(MessageContext messageContext) {
try {
String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID);
- if(StringUtils.isBlank(clusterId)) {
+ if (StringUtils.isBlank(clusterId)) {
throw new RuntimeException("Cluster id not found in message context");
}
FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestIncrementCallable(clusterId));
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
- }
- catch (Exception e) {
- if(log.isDebugEnabled()) {
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
log.debug("Could not increment in-flight request count", e);
}
}
@@ -611,14 +689,13 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
private void decrementInFlightRequestCount(MessageContext messageContext) {
try {
String clusterId = (String) messageContext.getProperty(Constants.CLUSTER_ID);
- if(StringUtils.isBlank(clusterId)) {
+ if (StringUtils.isBlank(clusterId)) {
throw new RuntimeException("Cluster id not found in message context");
}
FutureTask<Object> task = new FutureTask<Object>(new InFlightRequestDecrementCallable(clusterId));
LoadBalancerStatisticsExecutor.getInstance().getService().submit(task);
- }
- catch (Exception e) {
- if(log.isDebugEnabled()) {
+ } catch (Exception e) {
+ if (log.isDebugEnabled()) {
log.debug("Could not decrement in-flight request count", e);
}
}
@@ -691,7 +768,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
decrementInFlightRequestCount(synCtx);
if (isFailover()) {
- if(log.isDebugEnabled()) {
+ if (log.isDebugEnabled()) {
log.debug("Fail-over enabled, trying to send the message to the next available member");
}
@@ -700,7 +777,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
currentEp.destroy();
}
if (currentMember == null) {
- if(log.isErrorEnabled()) {
+ if (log.isErrorEnabled()) {
log.error("Current member is null, could not fail-over");
}
return;
http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/2dbaae43/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
index ff64d20..a29b980 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/util/Constants.java
@@ -22,12 +22,17 @@ package org.apache.stratos.load.balancer.util;
public class Constants {
public static final String CLUSTER_ID = "cluster_id";
+ public static final String MEMBER_ID = "member_id";
+
+ public static final String HTTP = "http";
+ public static final String HTTPS = "https";
+
+ public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL";
+ public static final String AXIS2_MSG_CTX_SERVICE_PREFIX = "SERVICE_PREFIX";
public static final String LB_HOST_NAME = "LB_HOST_NAME";
public static final String LB_HTTP_PORT = "LB_HTTP_PORT";
public static final String LB_HTTPS_PORT = "LB_HTTPS_PORT";
public static final String ALGORITHM_CONTEXT_CACHE = "algorithm.context.cache";
-
- public static final String AXIS2_MSG_CTX_TRANSPORT_IN_URL = "TransportInURL";
}