You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@stratos.apache.org by im...@apache.org on 2014/05/01 07:09:07 UTC

[1/2] git commit: Added ports activated info log in cartridge agent

Repository: incubator-stratos
Updated Branches:
  refs/heads/master b24c71ef6 -> e4b15b6f5


Added ports activated info log in cartridge agent


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/a8e3181b
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/a8e3181b
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/a8e3181b

Branch: refs/heads/master
Commit: a8e3181b1ba4228d8f18bc2ed2864d2cd49c5078
Parents: b24c71e
Author: Imesh Gunaratne <im...@wso2.com>
Authored: Wed Apr 30 22:18:00 2014 +0530
Committer: Imesh Gunaratne <im...@wso2.com>
Committed: Wed Apr 30 22:18:00 2014 +0530

----------------------------------------------------------------------
 .../stratos/cartridge/agent/util/CartridgeAgentUtils.java       | 5 ++++-
 1 file changed, 4 insertions(+), 1 deletion(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/a8e3181b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
index 8abb285..010462d 100644
--- a/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
+++ b/components/org.apache.stratos.cartridge.agent/src/main/java/org/apache/stratos/cartridge/agent/util/CartridgeAgentUtils.java
@@ -94,7 +94,7 @@ public class CartridgeAgentUtils {
         boolean active = false;
         while (!active) {
             if(log.isInfoEnabled()) {
-                log.info("Waiting for ports to be active: [IP] "+ipAddress+" [Ports] "+ports);
+                log.info("Waiting for ports to be active: [ip] "+ipAddress+" [ports] "+ports);
             }
             active = checkPortsActive(ipAddress,  ports);
             long endTime = System.currentTimeMillis();
@@ -107,6 +107,9 @@ public class CartridgeAgentUtils {
             } catch (InterruptedException e) {
             }
         }
+        if(log.isInfoEnabled()) {
+            log.info("Ports activated: [ip] " + ipAddress + " [ports] "+ports);
+        }
     }
 
     public static boolean checkPortsActive(String ipAddress, List<Integer> ports) {


[2/2] git commit: Updated load balancer port mapping logic to find outgoing port according to the incoming port rather than looking at the transport

Posted by im...@apache.org.
Updated load balancer port mapping logic to find outgoing port according to the incoming port rather than looking at the transport


Project: http://git-wip-us.apache.org/repos/asf/incubator-stratos/repo
Commit: http://git-wip-us.apache.org/repos/asf/incubator-stratos/commit/e4b15b6f
Tree: http://git-wip-us.apache.org/repos/asf/incubator-stratos/tree/e4b15b6f
Diff: http://git-wip-us.apache.org/repos/asf/incubator-stratos/diff/e4b15b6f

Branch: refs/heads/master
Commit: e4b15b6f52253b804c5e716fbcf1c1db47111dc2
Parents: a8e3181
Author: Imesh Gunaratne <im...@wso2.com>
Authored: Thu May 1 10:38:21 2014 +0530
Committer: Imesh Gunaratne <im...@wso2.com>
Committed: Thu May 1 10:38:21 2014 +0530

----------------------------------------------------------------------
 .../TenantAwareLoadBalanceEndpoint.java         | 181 ++++++++++---------
 .../messaging/domain/topology/Member.java       |  73 ++++----
 .../messaging/domain/topology/Service.java      |  36 ++--
 .../event/topology/MemberActivatedEvent.java    |  30 ++-
 .../event/topology/ServiceCreatedEvent.java     |  29 ++-
 5 files changed, 171 insertions(+), 178 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
index 364e869..c426a1b 100644
--- a/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
+++ b/components/org.apache.stratos.load.balancer/src/main/java/org/apache/stratos/load/balancer/endpoint/TenantAwareLoadBalanceEndpoint.java
@@ -59,7 +59,7 @@ import java.util.regex.Pattern;
 
 
 public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints.LoadbalanceEndpoint implements Serializable {
-    private static final String PORT_MAPPING_PREFIX = "port.mapping.";
+    private static final long serialVersionUID = -6612900240087164008L;
 
     /* Request delegator identifies the next member */
     private RequestDelegator requestDelegator;
@@ -199,59 +199,111 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
     }
 
     private org.apache.axis2.clustering.Member findNextMember(MessageContext synCtx) {
-        String targetHost = extractTargetHost(synCtx);
-        if (!requestDelegator.isTargetHostValid(targetHost)) {
-            throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost));
-        }
-
-        Member member = null;
-        if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) {
-            // Try to find next member from multi-tenant cluster map
-            if (log.isDebugEnabled()) {
-                log.debug("Multi-tenancy enabled, scanning URL for tenant...");
+        try {
+            String targetHost = extractTargetHost(synCtx);
+            if (!requestDelegator.isTargetHostValid(targetHost)) {
+                throwSynapseException(synCtx, 404, String.format("Unknown host name %s", targetHost));
             }
-            String url = extractUrl(synCtx);
-            int tenantId = scanUrlForTenantId(url);
-            if (tenantExists(tenantId)) {
-                // Tenant found, find member from hostname and tenant id
-                member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId);
+
+            Member member = null;
+            if (LoadBalancerConfiguration.getInstance().isMultiTenancyEnabled()) {
+                // Try to find next member from multi-tenant cluster map
+                if (log.isDebugEnabled()) {
+                    log.debug("Multi-tenancy enabled, scanning URL for tenant...");
+                }
+                String url = extractUrl(synCtx);
+                int tenantId = scanUrlForTenantId(url);
+                if (tenantExists(tenantId)) {
+                    // Tenant found, find member from hostname and tenant id
+                    member = requestDelegator.findNextMemberFromTenantId(targetHost, tenantId);
+                } else {
+                    // Tenant id not found in URL, find member from host name
+                    member = requestDelegator.findNextMemberFromHostName(targetHost);
+                }
             } else {
-                // Tenant id not found in URL, find member from host name
+                // Find next member from host name
                 member = requestDelegator.findNextMemberFromHostName(targetHost);
             }
-        } else {
-            // Find next member from host name
-            member = requestDelegator.findNextMemberFromHostName(targetHost);
-        }
 
-        if (member == null)
-            return null;
+            if (member == null)
+                return null;
 
-        // Create Axi2 member object
-        String transport = extractTransport(synCtx);
-        Port transportPort = member.getPort(transport);
-        if (transportPort == null) {
-            if (log.isErrorEnabled()) {
-                log.error(String.format("Port not found for transport %s in member %s", transport, member.getMemberId()));
+            // Find mapping outgoing port for incoming port
+            int incomingPort = findIncomingPort(synCtx);
+            Port outgoingPort = findOutgoingPort(member, incomingPort);
+            if (outgoingPort == null) {
+                if (log.isErrorEnabled()) {
+                    log.error(String.format("Could not find port for proxy port %d in member %s", incomingPort,
+                            member.getMemberId()));
+                }
+                throwSynapseException(synCtx, 500, "Internal server error");
+            }
+
+            // Create Axi2 member object
+            org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(
+                    getMemberIp(synCtx, member), outgoingPort.getValue());
+            axis2Member.setDomain(member.getClusterId());
+            axis2Member.setActive(member.isActive());
+
+            // Set cluster id and partition id in message context
+            axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId());
+            return axis2Member;
+        }
+        catch (Exception e) {
+            if(log.isErrorEnabled()) {
+                log.error("Could not find a member to serve the request");
             }
             throwSynapseException(synCtx, 500, "Internal server error");
         }
+        return null;
+    }
+
+    /***
+     * Find incoming port from request URL.
+     * @param synCtx
+     * @return
+     * @throws MalformedURLException
+     */
+    private int findIncomingPort(MessageContext synCtx) throws MalformedURLException {
+        try {
+            URL url = new URL(extractUrl(synCtx));
+            if(log.isDebugEnabled()) {
+                log.debug("Incoming request port found: " + url.getPort());
+            }
+            return url.getPort();
+        }
+        catch (MalformedURLException e) {
+            if(log.isErrorEnabled()) {
+                log.error("Could not extract port from incoming request", e);
+            }
+            throw e;
+        }
+    }
 
-        int memberPort = transportPort.getValue();
-        org.apache.axis2.clustering.Member axis2Member = new org.apache.axis2.clustering.Member(getMemberIp(synCtx, member), memberPort);
-        axis2Member.setDomain(member.getClusterId());
-        Port httpPort = member.getPort("http");
-        if (httpPort != null)
-            axis2Member.setHttpPort(httpPort.getValue());
-        Port httpsPort = member.getPort("https");
-        if (httpsPort != null)
-            axis2Member.setHttpsPort(httpsPort.getValue());
-        axis2Member.setActive(member.isActive());
-        // Set cluster id and partition id in message context
-        axis2Member.getProperties().setProperty(Constants.CLUSTER_ID, member.getClusterId());
-        return axis2Member;
+    /***
+     * Find mapping outgoing port for incoming port.
+     * @param member
+     * @param incomingPort
+     * @return
+     * @throws MalformedURLException
+     */
+    private Port findOutgoingPort(Member member, int incomingPort) throws MalformedURLException {
+        if((member != null) && (member.getPorts() != null)) {
+            Port outgoingPort = member.getPort(incomingPort);
+            if(log.isDebugEnabled()) {
+                log.debug("Outgoing request port found: " + outgoingPort.getValue());
+            }
+            return outgoingPort;
+        }
+        return null;
     }
 
+    /***
+     * Get members private or public ip according to load balancer configuration.
+     * @param synCtx
+     * @param member
+     * @return
+     */
     private String getMemberIp(MessageContext synCtx, Member member) {
         if(LoadBalancerConfiguration.getInstance().isTopologyEventListenerEnabled()) {
             if(LoadBalancerConfiguration.getInstance().getTopologyMemberIpType() == MemberIpType.Public) {
@@ -361,28 +413,6 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
         return hostName;
     }
 
-    private int extractPort(MessageContext synCtx, String transport) {
-        org.apache.axis2.context.MessageContext msgCtx =
-                ((Axis2MessageContext) synCtx).getAxis2MessageContext();
-
-        Map headerMap = (Map) msgCtx.getProperty(org.apache.axis2.context.MessageContext.TRANSPORT_HEADERS);
-        int port = -1;
-        if (headerMap != null) {
-            String hostHeader = (String) headerMap.get(HTTP.TARGET_HOST);
-            int index = hostHeader.indexOf(':');
-            if (index != -1) {
-                port = Integer.parseInt(hostHeader.trim().substring(index + 1));
-            } else {
-                if ("http".equals(transport)) {
-                    port = 80;
-                } else if ("https".equals(transport)) {
-                    port = 443;
-                }
-            }
-        }
-        return port;
-    }
-
     private String extractTransport(MessageContext synCtx) {
         org.apache.axis2.context.MessageContext axis2MessageContext = ((Axis2MessageContext) synCtx).getAxis2MessageContext();
         return axis2MessageContext.getTransportIn().getName();
@@ -416,8 +446,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
 
     private EndpointReference getEndpointReferenceAfterURLRewrite(org.apache.axis2.clustering.Member currentMember,
                                                                   String transport,
-                                                                  String address,
-                                                                  int incomingPort) {
+                                                                  String address) {
 
         if (transport.startsWith("https")) {
             transport = "https";
@@ -435,25 +464,14 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
                     String _address = address.indexOf("?") > 0 ? address.substring(address.indexOf("?"), address.length()) : "";
                     address = new URL(address).getPath() + _address;
                 } catch (MalformedURLException e) {
-                    String msg = "URL " + address + " is malformed";
+                    String msg = String.format("URL is malformed: %s", address);
                     log.error(msg, e);
                     throw new SynapseException(msg, e);
                 }
             }
 
-            int port;
-            Properties memberProperties = currentMember.getProperties();
-            String mappedPort = memberProperties.getProperty(PORT_MAPPING_PREFIX + incomingPort);
-            if (mappedPort != null) {
-                port = Integer.parseInt(mappedPort);
-            } else if (transport.startsWith("https")) {
-                port = currentMember.getHttpsPort();
-            } else {
-                port = currentMember.getHttpPort();
-            }
-
-            String remoteHost = memberProperties.getProperty("remoteHost");
-            String hostName = (remoteHost == null) ? currentMember.getHostName() : remoteHost;
+            String hostName = currentMember.getHostName();
+            int port = currentMember.getPort();
             return new EndpointReference(transport + "://" + hostName +
                     ":" + port + address);
         } else {
@@ -509,8 +527,7 @@ public class TenantAwareLoadBalanceEndpoint extends org.apache.synapse.endpoints
 
         String transport = axis2MsgCtx.getTransportIn().getName();
         String address = synCtx.getTo().getAddress();
-        int incomingPort = extractPort(synCtx, transport);
-        EndpointReference to = getEndpointReferenceAfterURLRewrite(currentMember, transport, address, incomingPort);
+        EndpointReference to = getEndpointReferenceAfterURLRewrite(currentMember, transport, address);
         synCtx.setTo(to);
 
         Endpoint endpoint = getEndpoint(to, currentMember, synCtx);

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
index b942e2e..09d38a7 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Member.java
@@ -24,10 +24,7 @@ import org.apache.stratos.messaging.util.bean.type.map.MapAdapter;
 import javax.xml.bind.annotation.XmlRootElement;
 import javax.xml.bind.annotation.adapters.XmlJavaTypeAdapter;
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 /**
  * Defines a member node in a cluster.
@@ -42,13 +39,12 @@ public class Member implements Serializable {
     private final String networkPartitionId;
     private final String partitionId;
     private final String memberId;
-    
+    @XmlJavaTypeAdapter(MapAdapter.class)
+    private final Map<Integer, Port> portMap;
     private String memberPublicIp;
     private MemberStatus status;
     private String memberIp;
     @XmlJavaTypeAdapter(MapAdapter.class)
-    private final Map<String, Port> portMap;
-    @XmlJavaTypeAdapter(MapAdapter.class)
     private Properties properties;
     private String lbClusterId;
 
@@ -58,7 +54,7 @@ public class Member implements Serializable {
         this.networkPartitionId = networkPartitionId;
         this.partitionId = partitionId;
         this.memberId = memberId;
-        this.portMap = new HashMap<String, Port>();
+        this.portMap = new HashMap<Integer, Port>();
     }
 
     public String getServiceName() {
@@ -85,34 +81,31 @@ public class Member implements Serializable {
         return (this.status == MemberStatus.Activated);
     }
 
-    public Collection<Port> getPorts() {
-        return portMap.values();
+    public Port getPort(int proxy) {
+        if(portMap.containsKey(proxy)) {
+            return portMap.get(proxy);
+        }
+        return null;
     }
 
-    public void addPort(Port port) {
-        this.portMap.put(port.getProtocol(), port);
+    public Map<Integer, Port> getPorts() {
+        return Collections.unmodifiableMap(portMap);
     }
 
-    public void addPorts(Collection<Port> ports) {
-        for(Port port: ports) {
-            addPort(port);
-        }
+    public void addPort(Port port) {
+        this.portMap.put(port.getProxy(), port);
     }
 
-    public void removePort(Port port) {
-        this.portMap.remove(port.getProtocol());
+    public void addPorts(Map<Integer, Port> portMap) {
+        this.portMap.putAll(portMap);
     }
 
-    public void removePort(String protocol) {
-        this.portMap.remove(protocol);
+    public void removePort(Port port) {
+        this.portMap.remove(port.getProxy());
     }
 
     public boolean portExists(Port port) {
-        return this.portMap.containsKey(port.getProtocol());
-    }
-
-    public Port getPort(String protocol) {
-        return this.portMap.get(protocol);
+        return this.portMap.containsKey(port.getProxy());
     }
 
     public Properties getProperties() {
@@ -123,37 +116,37 @@ public class Member implements Serializable {
         this.properties = properties;
     }
 
-	public String getMemberIp() {
-	    return memberIp;
+    public String getMemberIp() {
+        return memberIp;
     }
 
-	public void setMemberIp(String memberIp) {
-	    this.memberIp = memberIp;
+    public void setMemberIp(String memberIp) {
+        this.memberIp = memberIp;
     }
 
     public String getPartitionId() {
         return partitionId;
     }
 
-    public void setLbClusterId(String lbClusterId) {
-        this.lbClusterId = lbClusterId;
-    }
-
     public String getLbClusterId() {
         return lbClusterId;
     }
 
+    public void setLbClusterId(String lbClusterId) {
+        this.lbClusterId = lbClusterId;
+    }
+
     public String getNetworkPartitionId() {
         return networkPartitionId;
     }
 
-	public String getMemberPublicIp() {
-		return memberPublicIp;
-	}
+    public String getMemberPublicIp() {
+        return memberPublicIp;
+    }
+
+    public void setMemberPublicIp(String memberPublicIp) {
+        this.memberPublicIp = memberPublicIp;
+    }
 
-	public void setMemberPublicIp(String memberPublicIp) {
-		this.memberPublicIp = memberPublicIp;
-	}
-    
 }
 

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
index 21cc5e7..f35dc40 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/domain/topology/Service.java
@@ -34,14 +34,14 @@ public class Service implements Serializable{
     private final ServiceType serviceType;
     // Key: Cluster.clusterId
     private Map<String, Cluster> clusterIdClusterMap;
-    private Map<String, Port> portMap;
+    private Map<Integer, Port> portMap;
     private Properties properties;
 
     public Service(String serviceName, ServiceType serviceType) {
         this.serviceName = serviceName;
         this.serviceType = serviceType;
         this.clusterIdClusterMap = new HashMap<String, Cluster>();
-        this.portMap = new HashMap<String, Port>();
+        this.portMap = new HashMap<Integer, Port>();
     }
 
     public String getServiceName() {
@@ -76,36 +76,32 @@ public class Service implements Serializable{
         return this.clusterIdClusterMap.get(clusterId);
     }
 
-    public Collection<Port> getPorts() {
-        return portMap.values();
+    public Map<Integer, Port> getPorts() {
+        return Collections.unmodifiableMap(portMap);
     }
 
-    public void addPort(Port port) {
-        this.portMap.put(port.getProtocol(), port);
-    }
-
-    public void addPorts(Collection<Port> ports) {
-        for(Port port: ports) {
-            addPort(port);
+    public Port getPort(int proxy) {
+        if(portMap.containsKey(proxy)) {
+            return portMap.get(proxy);
         }
+        return null;
     }
 
-    public void removePort(Port port) {
-        this.portMap.remove(port.getProtocol());
+    public void addPort(Port port) {
+        this.portMap.put(port.getProxy(), port);
     }
 
-    public void removePort(String protocol) {
-        this.portMap.remove(protocol);
+    public void addPorts(Map<Integer, Port> portSet) {
+        this.portMap.putAll(portSet);
     }
 
-    public boolean portExists(Port port) {
-        return this.portMap.containsKey(port.getProtocol());
+    public void removePort(Port port) {
+        this.portMap.remove(port.getProxy());
     }
 
-    public Port getPort(String protocol) {
-        return this.portMap.get(protocol);
+    public boolean portExists(Port port) {
+        return this.portMap.containsKey(port.getProxy());
     }
-
     public Properties getProperties() {
         return properties;
     }

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java
index 3cda807..22f3735 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/MemberActivatedEvent.java
@@ -20,9 +20,7 @@
 package org.apache.stratos.messaging.event.topology;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.*;
 
 import org.apache.stratos.messaging.domain.topology.Port;
 
@@ -38,7 +36,7 @@ public class MemberActivatedEvent extends TopologyEvent implements Serializable
     private final String networkPartitionId;
     private final String partitionId;
     private final String memberId;
-    private Map<String, Port> portMap;
+    private Map<Integer, Port> portMap;
     private String memberIp;
 
     public MemberActivatedEvent(String serviceName, String clusterId, String networkPartitionId, String partitionId, String memberId) {
@@ -47,7 +45,7 @@ public class MemberActivatedEvent extends TopologyEvent implements Serializable
         this.networkPartitionId = networkPartitionId;
         this.partitionId = partitionId;
         this.memberId = memberId;
-    	this.portMap = new HashMap<String, Port>();
+    	this.portMap = new HashMap<Integer, Port>();
     }
     
     public String getServiceName() {
@@ -69,29 +67,25 @@ public class MemberActivatedEvent extends TopologyEvent implements Serializable
     public String getMemberId() {
         return memberId;
     }
-    
-    public Collection<Port> getPorts() {
-        return portMap.values();
+
+    public Map<Integer, Port> getPorts() {
+        return Collections.unmodifiableMap(portMap);
     }
 
     public void addPort(Port port) {
-        this.portMap.put(port.getProtocol(), port);
+        this.portMap.put(port.getProxy(), port);
     }
 
-    public void removePort(Port port) {
-        this.portMap.remove(port.getProtocol());
+    public void addPorts(Map<Integer, Port> portSet) {
+        this.portMap.putAll(portSet);
     }
 
-    public void removePort(String portName) {
-        this.portMap.remove(portName);
+    public void removePort(Port port) {
+        this.portMap.remove(port.getProxy());
     }
 
     public boolean portExists(Port port) {
-        return this.portMap.containsKey(port.getProtocol());
-    }
-
-    public Port getPort(String portName) {
-        return this.portMap.get(portName);
+        return this.portMap.containsKey(port.getProxy());
     }
 
 	public String getMemberIp() {

http://git-wip-us.apache.org/repos/asf/incubator-stratos/blob/e4b15b6f/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java
----------------------------------------------------------------------
diff --git a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java
index 846091e..1bd5ff4 100644
--- a/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java
+++ b/components/org.apache.stratos.messaging/src/main/java/org/apache/stratos/messaging/event/topology/ServiceCreatedEvent.java
@@ -23,10 +23,7 @@ import org.apache.stratos.messaging.domain.topology.Port;
 import org.apache.stratos.messaging.domain.topology.ServiceType;
 
 import java.io.Serializable;
-import java.util.Collection;
-import java.util.HashMap;
-import java.util.Map;
-import java.util.Properties;
+import java.util.*;
 
 /**
  * This event is fired by Cloud Controller when a service is added to a topology.
@@ -37,13 +34,13 @@ public class ServiceCreatedEvent extends TopologyEvent implements Serializable {
 
     private final String serviceName;
     private final ServiceType serviceType;
-    private final Map<String, Port> portMap;
+    private final Map<Integer, Port> portMap;
     private Properties properties;
 
     public ServiceCreatedEvent(String serviceName, ServiceType serviceType) {
         this.serviceName = serviceName;
         this.serviceType = serviceType;
-        this.portMap = new HashMap<String, Port>();
+        this.portMap = new HashMap<Integer, Port>();
     }
 
     public String getServiceName() {
@@ -54,28 +51,24 @@ public class ServiceCreatedEvent extends TopologyEvent implements Serializable {
         return serviceType;
     }
 
-    public Collection<Port> getPorts() {
-        return portMap.values();
+    public Map<Integer, Port> getPorts() {
+        return Collections.unmodifiableMap(portMap);
     }
 
     public void addPort(Port port) {
-        this.portMap.put(port.getProtocol(), port);
+        this.portMap.put(port.getProxy(), port);
     }
 
-    public void removePort(Port port) {
-        this.portMap.remove(port.getProtocol());
+    public void addPorts(Map<Integer, Port> portSet) {
+        this.portMap.putAll(portSet);
     }
 
-    public void removePort(String portName) {
-        this.portMap.remove(portName);
+    public void removePort(Port port) {
+        this.portMap.remove(port.getProxy());
     }
 
     public boolean portExists(Port port) {
-        return this.portMap.containsKey(port.getProtocol());
-    }
-
-    public Port getPort(String portName) {
-        return this.portMap.get(portName);
+        return this.portMap.containsKey(port.getProxy());
     }
 
     public Properties getProperties() {