You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by us...@apache.org on 2011/11/08 10:38:36 UTC

svn commit: r1199166 - in /synapse/trunk/java/modules/core/src/main/java/org/apache/synapse: config/xml/endpoints/ endpoints/

Author: uswick
Date: Tue Nov  8 09:38:36 2011
New Revision: 1199166

URL: http://svn.apache.org/viewvc?rev=1199166&view=rev
Log:
committing Recipient List Endpoint implementation. https://issues.apache.org/jira/browse/SYNAPSE-808

Added:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java
Modified:
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
    synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java?rev=1199166&r1=1199165&r2=1199166&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java Tue Nov  8 09:38:36 2011
@@ -232,6 +232,12 @@ public abstract class EndpointFactory im
         if (foElement != null) {
             return FailoverEndpointFactory.getInstance();
         }
+        
+        OMElement rcplElement = configElement.getFirstChildWithName
+        		(new QName(SynapseConstants.SYNAPSE_NAMESPACE, "recipientlist"));
+        if(rcplElement != null){
+        	return RecipientListEndpointFactory.getInstance();
+        }
 
         handleException("Invalid endpoint configuration.");
         // just to make the compiler happy : never executes

Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java?rev=1199166&r1=1199165&r2=1199166&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java Tue Nov  8 09:38:36 2011
@@ -19,22 +19,33 @@
 
 package org.apache.synapse.config.xml.endpoints;
 
+import java.util.Collection;
+
+import javax.xml.namespace.QName;
+
 import org.apache.axiom.om.OMElement;
 import org.apache.axiom.om.OMFactory;
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.PropertyInclude;
 import org.apache.synapse.SynapseConstants;
 import org.apache.synapse.SynapseException;
-import org.apache.synapse.PropertyInclude;
-import org.apache.synapse.mediators.MediatorProperty;
-import org.apache.synapse.aspects.statistics.StatisticsConfigurable;
-import org.apache.synapse.config.xml.XMLConfigConstants;
 import org.apache.synapse.config.xml.MediatorPropertySerializer;
-import org.apache.synapse.endpoints.*;
+import org.apache.synapse.endpoints.AbstractEndpoint;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.DefaultEndpoint;
+import org.apache.synapse.endpoints.DynamicLoadbalanceEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
 import org.apache.synapse.endpoints.EndpointDefinition;
-
-import javax.xml.namespace.QName;
-import java.util.Collection;
+import org.apache.synapse.endpoints.FailoverEndpoint;
+import org.apache.synapse.endpoints.IndirectEndpoint;
+import org.apache.synapse.endpoints.LoadbalanceEndpoint;
+import org.apache.synapse.endpoints.RecipientListEndpoint;
+import org.apache.synapse.endpoints.ResolvingEndpoint;
+import org.apache.synapse.endpoints.SALoadbalanceEndpoint;
+import org.apache.synapse.endpoints.TemplateEndpoint;
+import org.apache.synapse.endpoints.WSDLEndpoint;
+import org.apache.synapse.mediators.MediatorProperty;
 
 /**
  * All endpoint serializers should implement this interface. Use EndpointSerializer to
@@ -135,6 +146,8 @@ public abstract class EndpointSerializer
             return new FailoverEndpointSerializer();
         } else if (endpoint instanceof TemplateEndpoint) {
             return new TemplateEndpointSerializer();
+        } else if(endpoint instanceof RecipientListEndpoint){
+        	return new RecipientListEndpointSerializer();
         }
 
         throw new SynapseException("Serializer for endpoint " +

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java?rev=1199166&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java Tue Nov  8 09:38:36 2011
@@ -0,0 +1,173 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml.endpoints;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.clustering.Member;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.xml.ValueFactory;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.RecipientListEndpoint;
+import org.apache.synapse.mediators.Value;
+
+/**
+ * @author nuwan
+ * <p>
+ * Creates {@link RecipientListEndpoint} using an XML configuration.
+ * <p/>
+ * &lt;endpoint [name="name"]&gt;
+ * &lt;recipientlist&gt;
+ * &lt;member hostName="host" httpPort="port" httpsPort="port"&gt;+
+ * &lt;/recipientlist&gt;
+ * &lt;/endpoint&gt;
+ * </p>
+ */
+public class RecipientListEndpointFactory extends EndpointFactory {
+	
+	private static RecipientListEndpointFactory instance = new RecipientListEndpointFactory();
+	
+	private static final QName MEMBER = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "member");
+	private static final QName DYNAMIC_SET = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "endpoints");
+
+	private RecipientListEndpointFactory(){
+	}
+	
+	public static RecipientListEndpointFactory getInstance(){
+		return instance;
+	}
+
+	@Override
+	protected Endpoint createEndpoint(OMElement epConfig,
+			boolean anonymousEndpoint, Properties properties) {
+		
+		OMElement recipientListElement = epConfig.getFirstChildWithName(
+                new QName(SynapseConstants.SYNAPSE_NAMESPACE, "recipientlist"));
+		
+		if(recipientListElement != null){
+			
+			//create endpoint
+			RecipientListEndpoint recipientListEndpoint = new RecipientListEndpoint();
+			
+			// set endpoint name
+            OMAttribute name = epConfig.getAttribute(new QName(
+                    org.apache.synapse.config.xml.XMLConfigConstants.NULL_NAMESPACE, "name"));
+
+            if (name != null) {
+            	recipientListEndpoint.setName(name.getAttributeValue());
+            }
+            
+			// set endpoints or members
+			if (recipientListElement
+					.getFirstChildWithName(XMLConfigConstants.ENDPOINT_ELT) != null) {
+				if (recipientListElement.getChildrenWithName((MEMBER)).hasNext()) {
+					String msg = "Invalid Synapse configuration. "
+							+ "child elements";
+					log.error(msg);
+					throw new SynapseException(msg);
+				}
+				List<Endpoint> endpoints = getEndpoints(recipientListElement,
+						recipientListEndpoint, properties);
+				recipientListEndpoint.setChildren(endpoints);
+			}
+			else if(recipientListElement.getFirstChildWithName(MEMBER) != null){
+				if(recipientListElement.
+                        getChildrenWithName((XMLConfigConstants.ENDPOINT_ELT)).hasNext()){
+                    String msg =
+                            "Invalid Synapse configuration. " +
+                            "recipientListElement element cannot have both member & endpoint " +
+                            "child elements";
+                    log.error(msg);
+                    throw new SynapseException(msg);
+                }
+				
+            	List<Member> members = getMembers(recipientListElement);
+            	recipientListEndpoint.setMembers(members);
+            } else if (recipientListElement.getFirstChildWithName(DYNAMIC_SET) != null) {
+                OMElement dynamicSetElement = recipientListElement.getFirstChildWithName(DYNAMIC_SET);
+                Value dynamicEndpointSet = new ValueFactory().createValue("value", dynamicSetElement);
+                String maxStr = dynamicSetElement.getAttributeValue(new QName("max-cache"));
+                int maxCache = -1;
+                try {
+                    maxCache = Integer.parseInt(maxStr);
+                } catch (NumberFormatException e) {
+
+                }
+                recipientListEndpoint = new RecipientListEndpoint(maxCache < 0 ?
+                                                                  RecipientListEndpoint.DEFAULT_MAX_POOL :
+                                                                  maxCache);
+                if (name != null) {
+                    recipientListEndpoint.setName(name.getAttributeValue());
+                }
+                recipientListEndpoint.setDynamicEnpointSet(dynamicEndpointSet);
+            }
+
+            if (recipientListEndpoint.getChildren() == null && 
+            		recipientListEndpoint.getMembers() == null &&
+                    recipientListEndpoint.getDynamicEnpointSet() == null) {
+                String msg = "Invalid Synapse configuration.\n"
+                    + "A RecipientListEndpoint must have child/member elements, but the RecipientListEndpoint "
+                    + "'" + recipientListEndpoint.getName() + "' does not have any child/member/dynamic endpoint elements.";
+                log.error(msg);
+                throw new SynapseException(msg);
+            }
+			
+			return recipientListEndpoint;
+		}
+		
+		return null;
+	}
+	
+	@SuppressWarnings("rawtypes")
+	private List<Member> getMembers(OMElement loadbalanceElement) {
+        List<Member> members = new ArrayList<Member>();
+        
+        for(Iterator memberIter = loadbalanceElement.getChildrenWithName(MEMBER);
+            memberIter.hasNext();){
+        	
+        	if(log.isDebugEnabled()){
+        		log.debug("Getting Members..");
+        	}
+        	
+            OMElement memberEle = (OMElement) memberIter.next();
+            Member member = new Member(memberEle.getAttributeValue(new QName("hostName")), -1);
+            String http = memberEle.getAttributeValue(new QName("httpPort"));
+            if (http != null) {
+                member.setHttpPort(Integer.parseInt(http));
+            }
+            String https = memberEle.getAttributeValue(new QName("httpsPort"));
+            if (https != null && https.trim().length() != 0) {
+                member.setHttpsPort(Integer.parseInt(https.trim()));
+            }
+            members.add(member);
+        }
+        return members;
+    }
+
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java?rev=1199166&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java Tue Nov  8 09:38:36 2011
@@ -0,0 +1,92 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.config.xml.endpoints;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.clustering.Member;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.ValueSerializer;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.RecipientListEndpoint;
+
+/**
+ * @author nuwan
+ * 
+ * erializes {@link RecipientListEndpoint} to an XML configuration.
+ *
+ * @see RecipientListEndpointFactory
+ *
+ */
+public class RecipientListEndpointSerializer extends EndpointSerializer {
+
+	@Override
+	protected OMElement serializeEndpoint(Endpoint endpoint) {
+		if (!(endpoint instanceof RecipientListEndpoint)) {
+            handleException("Invalid endpoint type.");
+        }
+
+        fac = OMAbstractFactory.getOMFactory();
+        OMElement endpointElement
+                = fac.createOMElement("endpoint", SynapseConstants.SYNAPSE_OMNAMESPACE);
+        
+        RecipientListEndpoint recipientListEndpoint = (RecipientListEndpoint) endpoint;
+
+        // serialize the parameters
+        serializeProperties(recipientListEndpoint, endpointElement);
+
+        serializeCommonAttributes(endpoint,endpointElement);
+
+        OMElement recipientListElement
+                = fac.createOMElement("recipientlist", SynapseConstants.SYNAPSE_OMNAMESPACE);
+        endpointElement.addChild(recipientListElement);
+        
+		// Serialize endpoint elements which are children of the recipientlist
+		// element
+		if (recipientListEndpoint.getChildren() != null) {
+			for (Endpoint childEndpoint : recipientListEndpoint.getChildren()) {
+				recipientListElement.addChild(EndpointSerializer
+						.getElementFromEndpoint(childEndpoint));
+			}
+		} else if (recipientListEndpoint.getMembers() != null) {
+            for (Member member : recipientListEndpoint.getMembers()) {
+                OMElement memberEle = fac.createOMElement(
+                        "member", SynapseConstants.SYNAPSE_OMNAMESPACE, recipientListElement);
+                memberEle.addAttribute(fac.createOMAttribute(
+                        "hostName", null, member.getHostName()));
+                memberEle.addAttribute(fac.createOMAttribute(
+                        "httpPort", null, String.valueOf(member.getHttpPort())));
+                memberEle.addAttribute(fac.createOMAttribute(
+                        "httpsPort", null, String.valueOf(member.getHttpsPort())));
+                recipientListElement.addChild(memberEle);
+            }
+        }else{
+            OMElement dynamicEpEle = fac.createOMElement(
+                    "endpoints", SynapseConstants.SYNAPSE_OMNAMESPACE, recipientListElement);
+            new ValueSerializer().serializeValue(recipientListEndpoint.getDynamicEnpointSet(), "value", dynamicEpEle);
+            dynamicEpEle.addAttribute(fac.createOMAttribute("max-cache", null,
+                                                            String.valueOf(recipientListEndpoint.getCurrentPoolSize())));
+            recipientListElement.addChild(dynamicEpEle);
+        }
+
+        return endpointElement;
+    }
+
+}

Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java?rev=1199166&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java Tue Nov  8 09:38:36 2011
@@ -0,0 +1,354 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *   * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ */
+
+package org.apache.synapse.endpoints;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clustering.Member;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.mediators.Value;
+import org.apache.synapse.mediators.eip.EIPConstants;
+import org.apache.synapse.util.MessageHelper;
+
+/**
+ * @author nuwan
+ * <p>
+ * A Recipient List endpoint can contain multiple child endpoints or member elements. 
+ * It routes cloned copies of messages to each child recipient. This will assume that 
+ * all immediate child endpoints are identical in state (state is replicated) or state 
+ * is not maintained at those endpoints.
+ * </p>
+ */
+public class RecipientListEndpoint extends AbstractEndpoint {
+
+    private static final Log log = LogFactory.getLog(RecipientListEndpoint.class);
+    private static final String DELIMETER = ",";
+    /**
+	 * The list of members to which the message is delivered to
+	 */
+	private List<Member> members;
+
+    private Map<String,Endpoint> dynamicEndpointPool ;
+    private Value dynamicEnpointSet;
+    public final static int DEFAULT_MAX_POOL = 20;
+	/**
+	 * Should this recipient list failover;
+	 */
+	private boolean failover;
+    private int currentPool;
+
+    private SynapseEnvironment env = null;
+
+    public RecipientListEndpoint(int poolsize){
+        dynamicEndpointPool = Collections.synchronizedMap(new DynamicEndpointPool<String, Endpoint>(poolsize));
+        this.currentPool = poolsize;
+    }
+
+    public RecipientListEndpoint(){
+        this.currentPool = DEFAULT_MAX_POOL;
+    }
+
+	@Override
+	public void init(SynapseEnvironment synapseEnvironment) {
+		if (!initialized) {
+			super.init(synapseEnvironment);
+		}
+        this.env = synapseEnvironment;
+	}
+
+	@Override
+	public void destroy() {
+		super.destroy();
+	}
+
+	@Override
+	public void send(MessageContext synCtx) {
+
+		if (log.isDebugEnabled()) {
+			log.debug("Sending using Recipient List " + toString());
+		}
+
+		List<Endpoint> children = getChildren();
+		
+		//Service child endpoints
+        if (children != null && !children.isEmpty()) {
+            sendToEndpointList(synCtx, children);
+        }
+        //Service member elements if specified
+        else if (members != null && !members.isEmpty()) {
+            sendToApplicationMembers(synCtx);
+        }
+        else if (dynamicEnpointSet != null) {
+            sendToDynamicMembers(synCtx);
+        }
+        else {
+            String msg = "No child endpoints nor member elements available";
+            log.error(msg);
+            throw new SynapseException(msg);
+        }
+    }
+
+    private void sendToEndpointList(MessageContext synCtx, List<Endpoint> children) {
+        int i = 0;
+        boolean foundEndpoint = false;
+
+        for (Endpoint childEndpoint : children) {
+
+            if (childEndpoint.readyToSend()) {
+                foundEndpoint = true;
+                MessageContext newCtx = null;
+                try {
+                    newCtx = MessageHelper.cloneMessageContext(synCtx);
+                } catch (AxisFault e) {
+                    handleException("Error cloning the message context", e);
+                }
+
+                //Used when aggregating responses
+                newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+                                   String.valueOf(i++) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+                                   children.size());
+
+                // evaluate the endpoint properties
+                evaluateProperties(newCtx);
+
+                newCtx.pushFaultHandler(this);
+                childEndpoint.send(newCtx);
+            }
+        }
+
+        if (!foundEndpoint) {
+            String msg = "Recipient List endpoint : " +
+                         (getName() != null ? getName() : SynapseConstants.ANONYMOUS_ENDPOINT) +
+                         " - no ready child endpoints";
+            log.warn(msg);
+            informFailure(synCtx, SynapseConstants.ENDPOINT_FO_NONE_READY, msg);
+        }
+    }
+
+    private void sendToDynamicMembers(MessageContext synCtx) {
+        String dynamicUrlStr = dynamicEnpointSet.evaluateValue(synCtx);
+        String[] dynamicUrlSet = dynamicUrlStr.split(DELIMETER);
+        if (dynamicUrlSet.length == 0) {
+            log.warn("No recipient/s was derived from the expression : " + dynamicEnpointSet.toString());
+            return;
+        }
+        List<Endpoint> children = new ArrayList<Endpoint>();
+        for (String url : dynamicUrlSet) {
+            if (url != null && !"".equals(url.trim())) {
+                //get an Endpoint from the pool
+                Endpoint epFromPool = dynamicEndpointPool.get(url);
+                if (epFromPool == null) {
+                    AddressEndpoint endpoint = new AddressEndpoint();
+                    endpoint.setEnableMBeanStats(false);
+                    endpoint.setName("DYNAMIC_RECIPIENT_LIST_EP_" + UUID.randomUUID());
+                    EndpointDefinition definition = new EndpointDefinition();
+                    definition.setReplicationDisabled(true);
+                    definition.setAddress(url);
+                    endpoint.setDefinition(definition);
+                    endpoint.init(env);
+                    //finally add the newly created endpoint to the Pool
+                    dynamicEndpointPool.put(url, endpoint);
+                    children.add(endpoint);
+                } else {
+                    //do nothing endpoint is already in the pool
+                    children.add(epFromPool);
+                }
+            }
+        }
+        if (children.size() > 0) {
+            sendToEndpointList(synCtx, children);
+        } else {
+            if (log.isDebugEnabled()) {
+                log.debug("Halted sending messages to recipients. No recipient found !!! : " + dynamicUrlStr);
+            }
+        }
+    }
+
+    /**<p>Iterates the <b>members</b> list, creates Address Endpoints
+	 * from each member element and routes cloned copies of the message
+	 * to each Address Endpoint.</p>
+	 * @param synCtx - The Original Message received by Synapse
+	 */
+	private void sendToApplicationMembers(MessageContext synCtx){
+		
+		int i = 0;
+		boolean foundEndpoint = false;
+		
+		for (Member member : members) {
+			
+			org.apache.axis2.context.MessageContext axis2MsgCtx = ((Axis2MessageContext) synCtx)
+					.getAxis2MessageContext();
+
+			String transport = axis2MsgCtx.getTransportIn().getName();
+
+			//If the transport is not HTTP nor HTTPS
+			if (!transport.equals("http") && !transport.equals("https")) {
+				//Skip member.
+				log.error("Cannot deliver for non-HTTP/S transport " + transport);
+				continue;
+			}	
+			
+			MessageContext newCtx = null;
+
+			try {
+				newCtx = MessageHelper.cloneMessageContext(synCtx);
+			} catch (AxisFault e) {
+				handleException("Error cloning the message context", e);
+			}
+
+			// Used when aggregating responses
+			newCtx.setProperty(
+					EIPConstants.MESSAGE_SEQUENCE,
+					String.valueOf(i++)
+							+ EIPConstants.MESSAGE_SEQUENCE_DELEMITER
+							+ members.size());
+
+			// evaluate the endpoint properties
+			evaluateProperties(newCtx);
+
+			// URL rewrite
+			String address = newCtx.getTo().getAddress();
+			if (address.indexOf(":") != -1) {
+				try {
+					address = new URL(address).getPath();
+				} catch (MalformedURLException e) {
+					String msg = "URL " + address + " is malformed";
+					log.error(msg, e);
+					throw new SynapseException(msg, e);
+				}
+			}
+
+			EndpointReference epr = new EndpointReference(transport
+					+ "://"
+					+ member.getHostName()
+					+ ":"
+					+ ("http".equals(transport) ? member.getHttpPort()
+							: member.getHttpsPort()) + address);
+
+			newCtx.setTo(epr);
+			newCtx.pushFaultHandler(this);
+
+			AddressEndpoint endpoint = new AddressEndpoint();
+			EndpointDefinition definition = new EndpointDefinition();
+			endpoint.setDefinition(definition);
+			endpoint.init(newCtx.getEnvironment());
+			
+			if(endpoint.readyToSend()){
+				foundEndpoint = true;
+				endpoint.send(newCtx);
+			}
+		}
+		
+		if(!foundEndpoint){
+			String msg = "Recipient List endpoint : " +
+                    (getName() != null ? getName() : SynapseConstants.ANONYMOUS_ENDPOINT) +
+                    " - no ready child members";
+            log.warn(msg);
+            informFailure(synCtx, SynapseConstants.ENDPOINT_FO_NONE_READY, msg);
+		}
+	}
+
+	@Override
+	public boolean readyToSend(){
+		for(Endpoint endpoint : getChildren()){
+			if(endpoint.readyToSend()){
+				if (log.isDebugEnabled()) {
+                    log.debug("Recipient List " + this.toString()
+                            + " has at least one endpoint at ready state");
+                }
+                return true;
+			}
+		}
+		return false;
+	}
+
+    public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
+        //we just log the failed recipient here
+        logOnChildEndpointFail(endpoint, synMessageContext);
+        String msg = "";
+        if (log.isDebugEnabled()) {
+            msg = "Recipient List endpoint : " +
+                         (getName() != null ? getName() : SynapseConstants.ANONYMOUS_ENDPOINT) +
+                         " - one of the recipients encounterd an error while sending the message ";
+            log.debug(msg);
+        }
+        informFailure(synMessageContext,SynapseConstants.ENDPOINT_FO_NONE_READY, msg);
+    }
+
+
+	public List<Member> getMembers() {
+		return members;
+	}
+
+	public void setMembers(List<Member> members) {
+		this.members = members;
+	}
+
+	public boolean isFailover() {
+		return failover;
+	}
+
+	public void setFailover(boolean failover) {
+		this.failover = failover;
+	}
+
+    public Value getDynamicEnpointSet() {
+        return dynamicEnpointSet;
+    }
+
+    public void setDynamicEnpointSet(Value dynamicEnpointSet) {
+        this.dynamicEnpointSet = dynamicEnpointSet;
+    }
+
+    public int getCurrentPoolSize() {
+        return currentPool;
+    }
+
+    /**
+     * create a simple LRU cached Endpoint pool for dynamic endpoints
+     */
+    private static class DynamicEndpointPool<String, Endpoint> extends LinkedHashMap<String, Endpoint> {
+
+        private final int maxPoolSize;
+
+        public DynamicEndpointPool(final int max) {
+            super(max + 1, 1.0f, true);
+            this.maxPoolSize = max;
+        }
+
+        @Override
+        protected boolean removeEldestEntry(final Map.Entry<String, Endpoint> eldest) {
+            return super.size() > maxPoolSize;
+        }
+
+
+    }
+
+}