You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@synapse.apache.org by us...@apache.org on 2011/11/08 10:38:36 UTC
svn commit: r1199166 - in
/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse:
config/xml/endpoints/ endpoints/
Author: uswick
Date: Tue Nov 8 09:38:36 2011
New Revision: 1199166
URL: http://svn.apache.org/viewvc?rev=1199166&view=rev
Log:
committing Recipient List Endpoint implementation. https://issues.apache.org/jira/browse/SYNAPSE-808
Added:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java
Modified:
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java?rev=1199166&r1=1199165&r2=1199166&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointFactory.java Tue Nov 8 09:38:36 2011
@@ -232,6 +232,12 @@ public abstract class EndpointFactory im
if (foElement != null) {
return FailoverEndpointFactory.getInstance();
}
+
+ OMElement rcplElement = configElement.getFirstChildWithName
+ (new QName(SynapseConstants.SYNAPSE_NAMESPACE, "recipientlist"));
+ if(rcplElement != null){
+ return RecipientListEndpointFactory.getInstance();
+ }
handleException("Invalid endpoint configuration.");
// just to make the compiler happy : never executes
Modified: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java?rev=1199166&r1=1199165&r2=1199166&view=diff
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java (original)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/EndpointSerializer.java Tue Nov 8 09:38:36 2011
@@ -19,22 +19,33 @@
package org.apache.synapse.config.xml.endpoints;
+import java.util.Collection;
+
+import javax.xml.namespace.QName;
+
import org.apache.axiom.om.OMElement;
import org.apache.axiom.om.OMFactory;
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.PropertyInclude;
import org.apache.synapse.SynapseConstants;
import org.apache.synapse.SynapseException;
-import org.apache.synapse.PropertyInclude;
-import org.apache.synapse.mediators.MediatorProperty;
-import org.apache.synapse.aspects.statistics.StatisticsConfigurable;
-import org.apache.synapse.config.xml.XMLConfigConstants;
import org.apache.synapse.config.xml.MediatorPropertySerializer;
-import org.apache.synapse.endpoints.*;
+import org.apache.synapse.endpoints.AbstractEndpoint;
+import org.apache.synapse.endpoints.AddressEndpoint;
+import org.apache.synapse.endpoints.DefaultEndpoint;
+import org.apache.synapse.endpoints.DynamicLoadbalanceEndpoint;
+import org.apache.synapse.endpoints.Endpoint;
import org.apache.synapse.endpoints.EndpointDefinition;
-
-import javax.xml.namespace.QName;
-import java.util.Collection;
+import org.apache.synapse.endpoints.FailoverEndpoint;
+import org.apache.synapse.endpoints.IndirectEndpoint;
+import org.apache.synapse.endpoints.LoadbalanceEndpoint;
+import org.apache.synapse.endpoints.RecipientListEndpoint;
+import org.apache.synapse.endpoints.ResolvingEndpoint;
+import org.apache.synapse.endpoints.SALoadbalanceEndpoint;
+import org.apache.synapse.endpoints.TemplateEndpoint;
+import org.apache.synapse.endpoints.WSDLEndpoint;
+import org.apache.synapse.mediators.MediatorProperty;
/**
* All endpoint serializers should implement this interface. Use EndpointSerializer to
@@ -135,6 +146,8 @@ public abstract class EndpointSerializer
return new FailoverEndpointSerializer();
} else if (endpoint instanceof TemplateEndpoint) {
return new TemplateEndpointSerializer();
+ } else if(endpoint instanceof RecipientListEndpoint){
+ return new RecipientListEndpointSerializer();
}
throw new SynapseException("Serializer for endpoint " +
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java?rev=1199166&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointFactory.java Tue Nov 8 09:38:36 2011
@@ -0,0 +1,173 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml.endpoints;
+
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Properties;
+
+import javax.xml.namespace.QName;
+
+import org.apache.axiom.om.OMAttribute;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.clustering.Member;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.config.xml.ValueFactory;
+import org.apache.synapse.config.xml.XMLConfigConstants;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.RecipientListEndpoint;
+import org.apache.synapse.mediators.Value;
+
+/**
+ * @author nuwan
+ * <p>
+ * Creates {@link RecipientListEndpoint} using an XML configuration.
+ * <p/>
+ * <endpoint [name="name"]>
+ * <recipientlist>
+ * <member hostName="host" httpPort="port" httpsPort="port">+
+ * </recipientlist>
+ * </endpoint>
+ * </p>
+ */
+public class RecipientListEndpointFactory extends EndpointFactory {
+
+ private static RecipientListEndpointFactory instance = new RecipientListEndpointFactory();
+
+ private static final QName MEMBER = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "member");
+ private static final QName DYNAMIC_SET = new QName(SynapseConstants.SYNAPSE_NAMESPACE, "endpoints");
+
+ private RecipientListEndpointFactory(){
+ }
+
+ public static RecipientListEndpointFactory getInstance(){
+ return instance;
+ }
+
+ @Override
+ protected Endpoint createEndpoint(OMElement epConfig,
+ boolean anonymousEndpoint, Properties properties) {
+
+ OMElement recipientListElement = epConfig.getFirstChildWithName(
+ new QName(SynapseConstants.SYNAPSE_NAMESPACE, "recipientlist"));
+
+ if(recipientListElement != null){
+
+ //create endpoint
+ RecipientListEndpoint recipientListEndpoint = new RecipientListEndpoint();
+
+ // set endpoint name
+ OMAttribute name = epConfig.getAttribute(new QName(
+ org.apache.synapse.config.xml.XMLConfigConstants.NULL_NAMESPACE, "name"));
+
+ if (name != null) {
+ recipientListEndpoint.setName(name.getAttributeValue());
+ }
+
+ // set endpoints or members
+ if (recipientListElement
+ .getFirstChildWithName(XMLConfigConstants.ENDPOINT_ELT) != null) {
+ if (recipientListElement.getChildrenWithName((MEMBER)).hasNext()) {
+ String msg = "Invalid Synapse configuration. "
+ + "child elements";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ List<Endpoint> endpoints = getEndpoints(recipientListElement,
+ recipientListEndpoint, properties);
+ recipientListEndpoint.setChildren(endpoints);
+ }
+ else if(recipientListElement.getFirstChildWithName(MEMBER) != null){
+ if(recipientListElement.
+ getChildrenWithName((XMLConfigConstants.ENDPOINT_ELT)).hasNext()){
+ String msg =
+ "Invalid Synapse configuration. " +
+ "recipientListElement element cannot have both member & endpoint " +
+ "child elements";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ List<Member> members = getMembers(recipientListElement);
+ recipientListEndpoint.setMembers(members);
+ } else if (recipientListElement.getFirstChildWithName(DYNAMIC_SET) != null) {
+ OMElement dynamicSetElement = recipientListElement.getFirstChildWithName(DYNAMIC_SET);
+ Value dynamicEndpointSet = new ValueFactory().createValue("value", dynamicSetElement);
+ String maxStr = dynamicSetElement.getAttributeValue(new QName("max-cache"));
+ int maxCache = -1;
+ try {
+ maxCache = Integer.parseInt(maxStr);
+ } catch (NumberFormatException e) {
+
+ }
+ recipientListEndpoint = new RecipientListEndpoint(maxCache < 0 ?
+ RecipientListEndpoint.DEFAULT_MAX_POOL :
+ maxCache);
+ if (name != null) {
+ recipientListEndpoint.setName(name.getAttributeValue());
+ }
+ recipientListEndpoint.setDynamicEnpointSet(dynamicEndpointSet);
+ }
+
+ if (recipientListEndpoint.getChildren() == null &&
+ recipientListEndpoint.getMembers() == null &&
+ recipientListEndpoint.getDynamicEnpointSet() == null) {
+ String msg = "Invalid Synapse configuration.\n"
+ + "A RecipientListEndpoint must have child/member elements, but the RecipientListEndpoint "
+ + "'" + recipientListEndpoint.getName() + "' does not have any child/member/dynamic endpoint elements.";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+
+ return recipientListEndpoint;
+ }
+
+ return null;
+ }
+
+ @SuppressWarnings("rawtypes")
+ private List<Member> getMembers(OMElement loadbalanceElement) {
+ List<Member> members = new ArrayList<Member>();
+
+ for(Iterator memberIter = loadbalanceElement.getChildrenWithName(MEMBER);
+ memberIter.hasNext();){
+
+ if(log.isDebugEnabled()){
+ log.debug("Getting Members..");
+ }
+
+ OMElement memberEle = (OMElement) memberIter.next();
+ Member member = new Member(memberEle.getAttributeValue(new QName("hostName")), -1);
+ String http = memberEle.getAttributeValue(new QName("httpPort"));
+ if (http != null) {
+ member.setHttpPort(Integer.parseInt(http));
+ }
+ String https = memberEle.getAttributeValue(new QName("httpsPort"));
+ if (https != null && https.trim().length() != 0) {
+ member.setHttpsPort(Integer.parseInt(https.trim()));
+ }
+ members.add(member);
+ }
+ return members;
+ }
+
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java?rev=1199166&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/config/xml/endpoints/RecipientListEndpointSerializer.java Tue Nov 8 09:38:36 2011
@@ -0,0 +1,92 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.config.xml.endpoints;
+
+import org.apache.axiom.om.OMAbstractFactory;
+import org.apache.axiom.om.OMElement;
+import org.apache.axis2.clustering.Member;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.config.xml.ValueSerializer;
+import org.apache.synapse.endpoints.Endpoint;
+import org.apache.synapse.endpoints.RecipientListEndpoint;
+
+/**
+ * @author nuwan
+ *
+ * erializes {@link RecipientListEndpoint} to an XML configuration.
+ *
+ * @see RecipientListEndpointFactory
+ *
+ */
+public class RecipientListEndpointSerializer extends EndpointSerializer {
+
+ @Override
+ protected OMElement serializeEndpoint(Endpoint endpoint) {
+ if (!(endpoint instanceof RecipientListEndpoint)) {
+ handleException("Invalid endpoint type.");
+ }
+
+ fac = OMAbstractFactory.getOMFactory();
+ OMElement endpointElement
+ = fac.createOMElement("endpoint", SynapseConstants.SYNAPSE_OMNAMESPACE);
+
+ RecipientListEndpoint recipientListEndpoint = (RecipientListEndpoint) endpoint;
+
+ // serialize the parameters
+ serializeProperties(recipientListEndpoint, endpointElement);
+
+ serializeCommonAttributes(endpoint,endpointElement);
+
+ OMElement recipientListElement
+ = fac.createOMElement("recipientlist", SynapseConstants.SYNAPSE_OMNAMESPACE);
+ endpointElement.addChild(recipientListElement);
+
+ // Serialize endpoint elements which are children of the recipientlist
+ // element
+ if (recipientListEndpoint.getChildren() != null) {
+ for (Endpoint childEndpoint : recipientListEndpoint.getChildren()) {
+ recipientListElement.addChild(EndpointSerializer
+ .getElementFromEndpoint(childEndpoint));
+ }
+ } else if (recipientListEndpoint.getMembers() != null) {
+ for (Member member : recipientListEndpoint.getMembers()) {
+ OMElement memberEle = fac.createOMElement(
+ "member", SynapseConstants.SYNAPSE_OMNAMESPACE, recipientListElement);
+ memberEle.addAttribute(fac.createOMAttribute(
+ "hostName", null, member.getHostName()));
+ memberEle.addAttribute(fac.createOMAttribute(
+ "httpPort", null, String.valueOf(member.getHttpPort())));
+ memberEle.addAttribute(fac.createOMAttribute(
+ "httpsPort", null, String.valueOf(member.getHttpsPort())));
+ recipientListElement.addChild(memberEle);
+ }
+ }else{
+ OMElement dynamicEpEle = fac.createOMElement(
+ "endpoints", SynapseConstants.SYNAPSE_OMNAMESPACE, recipientListElement);
+ new ValueSerializer().serializeValue(recipientListEndpoint.getDynamicEnpointSet(), "value", dynamicEpEle);
+ dynamicEpEle.addAttribute(fac.createOMAttribute("max-cache", null,
+ String.valueOf(recipientListEndpoint.getCurrentPoolSize())));
+ recipientListElement.addChild(dynamicEpEle);
+ }
+
+ return endpointElement;
+ }
+
+}
Added: synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java
URL: http://svn.apache.org/viewvc/synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java?rev=1199166&view=auto
==============================================================================
--- synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java (added)
+++ synapse/trunk/java/modules/core/src/main/java/org/apache/synapse/endpoints/RecipientListEndpoint.java Tue Nov 8 09:38:36 2011
@@ -0,0 +1,354 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+
+package org.apache.synapse.endpoints;
+
+import java.net.MalformedURLException;
+import java.net.URL;
+import java.util.*;
+
+import org.apache.axis2.AxisFault;
+import org.apache.axis2.addressing.EndpointReference;
+import org.apache.axis2.clustering.Member;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.synapse.MessageContext;
+import org.apache.synapse.SynapseConstants;
+import org.apache.synapse.SynapseException;
+import org.apache.synapse.core.SynapseEnvironment;
+import org.apache.synapse.core.axis2.Axis2MessageContext;
+import org.apache.synapse.mediators.Value;
+import org.apache.synapse.mediators.eip.EIPConstants;
+import org.apache.synapse.util.MessageHelper;
+
+/**
+ * @author nuwan
+ * <p>
+ * A Recipient List endpoint can contain multiple child endpoints or member elements.
+ * It routes cloned copies of messages to each child recipient. This will assume that
+ * all immediate child endpoints are identical in state (state is replicated) or state
+ * is not maintained at those endpoints.
+ * </p>
+ */
+public class RecipientListEndpoint extends AbstractEndpoint {
+
+ private static final Log log = LogFactory.getLog(RecipientListEndpoint.class);
+ private static final String DELIMETER = ",";
+ /**
+ * The list of members to which the message is delivered to
+ */
+ private List<Member> members;
+
+ private Map<String,Endpoint> dynamicEndpointPool ;
+ private Value dynamicEnpointSet;
+ public final static int DEFAULT_MAX_POOL = 20;
+ /**
+ * Should this recipient list failover;
+ */
+ private boolean failover;
+ private int currentPool;
+
+ private SynapseEnvironment env = null;
+
+ public RecipientListEndpoint(int poolsize){
+ dynamicEndpointPool = Collections.synchronizedMap(new DynamicEndpointPool<String, Endpoint>(poolsize));
+ this.currentPool = poolsize;
+ }
+
+ public RecipientListEndpoint(){
+ this.currentPool = DEFAULT_MAX_POOL;
+ }
+
+ @Override
+ public void init(SynapseEnvironment synapseEnvironment) {
+ if (!initialized) {
+ super.init(synapseEnvironment);
+ }
+ this.env = synapseEnvironment;
+ }
+
+ @Override
+ public void destroy() {
+ super.destroy();
+ }
+
+ @Override
+ public void send(MessageContext synCtx) {
+
+ if (log.isDebugEnabled()) {
+ log.debug("Sending using Recipient List " + toString());
+ }
+
+ List<Endpoint> children = getChildren();
+
+ //Service child endpoints
+ if (children != null && !children.isEmpty()) {
+ sendToEndpointList(synCtx, children);
+ }
+ //Service member elements if specified
+ else if (members != null && !members.isEmpty()) {
+ sendToApplicationMembers(synCtx);
+ }
+ else if (dynamicEnpointSet != null) {
+ sendToDynamicMembers(synCtx);
+ }
+ else {
+ String msg = "No child endpoints nor member elements available";
+ log.error(msg);
+ throw new SynapseException(msg);
+ }
+ }
+
+ private void sendToEndpointList(MessageContext synCtx, List<Endpoint> children) {
+ int i = 0;
+ boolean foundEndpoint = false;
+
+ for (Endpoint childEndpoint : children) {
+
+ if (childEndpoint.readyToSend()) {
+ foundEndpoint = true;
+ MessageContext newCtx = null;
+ try {
+ newCtx = MessageHelper.cloneMessageContext(synCtx);
+ } catch (AxisFault e) {
+ handleException("Error cloning the message context", e);
+ }
+
+ //Used when aggregating responses
+ newCtx.setProperty(EIPConstants.MESSAGE_SEQUENCE,
+ String.valueOf(i++) + EIPConstants.MESSAGE_SEQUENCE_DELEMITER +
+ children.size());
+
+ // evaluate the endpoint properties
+ evaluateProperties(newCtx);
+
+ newCtx.pushFaultHandler(this);
+ childEndpoint.send(newCtx);
+ }
+ }
+
+ if (!foundEndpoint) {
+ String msg = "Recipient List endpoint : " +
+ (getName() != null ? getName() : SynapseConstants.ANONYMOUS_ENDPOINT) +
+ " - no ready child endpoints";
+ log.warn(msg);
+ informFailure(synCtx, SynapseConstants.ENDPOINT_FO_NONE_READY, msg);
+ }
+ }
+
+ private void sendToDynamicMembers(MessageContext synCtx) {
+ String dynamicUrlStr = dynamicEnpointSet.evaluateValue(synCtx);
+ String[] dynamicUrlSet = dynamicUrlStr.split(DELIMETER);
+ if (dynamicUrlSet.length == 0) {
+ log.warn("No recipient/s was derived from the expression : " + dynamicEnpointSet.toString());
+ return;
+ }
+ List<Endpoint> children = new ArrayList<Endpoint>();
+ for (String url : dynamicUrlSet) {
+ if (url != null && !"".equals(url.trim())) {
+ //get an Endpoint from the pool
+ Endpoint epFromPool = dynamicEndpointPool.get(url);
+ if (epFromPool == null) {
+ AddressEndpoint endpoint = new AddressEndpoint();
+ endpoint.setEnableMBeanStats(false);
+ endpoint.setName("DYNAMIC_RECIPIENT_LIST_EP_" + UUID.randomUUID());
+ EndpointDefinition definition = new EndpointDefinition();
+ definition.setReplicationDisabled(true);
+ definition.setAddress(url);
+ endpoint.setDefinition(definition);
+ endpoint.init(env);
+ //finally add the newly created endpoint to the Pool
+ dynamicEndpointPool.put(url, endpoint);
+ children.add(endpoint);
+ } else {
+ //do nothing endpoint is already in the pool
+ children.add(epFromPool);
+ }
+ }
+ }
+ if (children.size() > 0) {
+ sendToEndpointList(synCtx, children);
+ } else {
+ if (log.isDebugEnabled()) {
+ log.debug("Halted sending messages to recipients. No recipient found !!! : " + dynamicUrlStr);
+ }
+ }
+ }
+
+ /**<p>Iterates the <b>members</b> list, creates Address Endpoints
+ * from each member element and routes cloned copies of the message
+ * to each Address Endpoint.</p>
+ * @param synCtx - The Original Message received by Synapse
+ */
+ private void sendToApplicationMembers(MessageContext synCtx){
+
+ int i = 0;
+ boolean foundEndpoint = false;
+
+ for (Member member : members) {
+
+ org.apache.axis2.context.MessageContext axis2MsgCtx = ((Axis2MessageContext) synCtx)
+ .getAxis2MessageContext();
+
+ String transport = axis2MsgCtx.getTransportIn().getName();
+
+ //If the transport is not HTTP nor HTTPS
+ if (!transport.equals("http") && !transport.equals("https")) {
+ //Skip member.
+ log.error("Cannot deliver for non-HTTP/S transport " + transport);
+ continue;
+ }
+
+ MessageContext newCtx = null;
+
+ try {
+ newCtx = MessageHelper.cloneMessageContext(synCtx);
+ } catch (AxisFault e) {
+ handleException("Error cloning the message context", e);
+ }
+
+ // Used when aggregating responses
+ newCtx.setProperty(
+ EIPConstants.MESSAGE_SEQUENCE,
+ String.valueOf(i++)
+ + EIPConstants.MESSAGE_SEQUENCE_DELEMITER
+ + members.size());
+
+ // evaluate the endpoint properties
+ evaluateProperties(newCtx);
+
+ // URL rewrite
+ String address = newCtx.getTo().getAddress();
+ if (address.indexOf(":") != -1) {
+ try {
+ address = new URL(address).getPath();
+ } catch (MalformedURLException e) {
+ String msg = "URL " + address + " is malformed";
+ log.error(msg, e);
+ throw new SynapseException(msg, e);
+ }
+ }
+
+ EndpointReference epr = new EndpointReference(transport
+ + "://"
+ + member.getHostName()
+ + ":"
+ + ("http".equals(transport) ? member.getHttpPort()
+ : member.getHttpsPort()) + address);
+
+ newCtx.setTo(epr);
+ newCtx.pushFaultHandler(this);
+
+ AddressEndpoint endpoint = new AddressEndpoint();
+ EndpointDefinition definition = new EndpointDefinition();
+ endpoint.setDefinition(definition);
+ endpoint.init(newCtx.getEnvironment());
+
+ if(endpoint.readyToSend()){
+ foundEndpoint = true;
+ endpoint.send(newCtx);
+ }
+ }
+
+ if(!foundEndpoint){
+ String msg = "Recipient List endpoint : " +
+ (getName() != null ? getName() : SynapseConstants.ANONYMOUS_ENDPOINT) +
+ " - no ready child members";
+ log.warn(msg);
+ informFailure(synCtx, SynapseConstants.ENDPOINT_FO_NONE_READY, msg);
+ }
+ }
+
+ @Override
+ public boolean readyToSend(){
+ for(Endpoint endpoint : getChildren()){
+ if(endpoint.readyToSend()){
+ if (log.isDebugEnabled()) {
+ log.debug("Recipient List " + this.toString()
+ + " has at least one endpoint at ready state");
+ }
+ return true;
+ }
+ }
+ return false;
+ }
+
+ public void onChildEndpointFail(Endpoint endpoint, MessageContext synMessageContext) {
+ //we just log the failed recipient here
+ logOnChildEndpointFail(endpoint, synMessageContext);
+ String msg = "";
+ if (log.isDebugEnabled()) {
+ msg = "Recipient List endpoint : " +
+ (getName() != null ? getName() : SynapseConstants.ANONYMOUS_ENDPOINT) +
+ " - one of the recipients encounterd an error while sending the message ";
+ log.debug(msg);
+ }
+ informFailure(synMessageContext,SynapseConstants.ENDPOINT_FO_NONE_READY, msg);
+ }
+
+
+ public List<Member> getMembers() {
+ return members;
+ }
+
+ public void setMembers(List<Member> members) {
+ this.members = members;
+ }
+
+ public boolean isFailover() {
+ return failover;
+ }
+
+ public void setFailover(boolean failover) {
+ this.failover = failover;
+ }
+
+ public Value getDynamicEnpointSet() {
+ return dynamicEnpointSet;
+ }
+
+ public void setDynamicEnpointSet(Value dynamicEnpointSet) {
+ this.dynamicEnpointSet = dynamicEnpointSet;
+ }
+
+ public int getCurrentPoolSize() {
+ return currentPool;
+ }
+
+ /**
+ * create a simple LRU cached Endpoint pool for dynamic endpoints
+ */
+ private static class DynamicEndpointPool<String, Endpoint> extends LinkedHashMap<String, Endpoint> {
+
+ private final int maxPoolSize;
+
+ public DynamicEndpointPool(final int max) {
+ super(max + 1, 1.0f, true);
+ this.maxPoolSize = max;
+ }
+
+ @Override
+ protected boolean removeEldestEntry(final Map.Entry<String, Endpoint> eldest) {
+ return super.size() > maxPoolSize;
+ }
+
+
+ }
+
+}