You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/02/27 16:53:14 UTC
svn commit: r381368 - in
/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow:
AbstractFlow.java DefaultFlowChooser.java Flow.java jca/JCAFlow.java
jms/JMSFlow.java seda/SedaFlow.java st/STFlow.java
Author: gnodet
Date: Mon Feb 27 07:53:10 2006
New Revision: 381368
URL: http://svn.apache.org/viewcvs?rev=381368&view=rev
Log:
SM-319: multiple flows step 2. Use flow capabilities and exchange QoS to choose a flow
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Mon Feb 27 07:53:10 2006
@@ -20,6 +20,7 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
import org.apache.servicemix.jbi.framework.ComponentNameSpace;
import org.apache.servicemix.jbi.framework.LocalComponentConnector;
import org.apache.servicemix.jbi.management.AttributeInfoHelper;
@@ -27,15 +28,18 @@
import org.apache.servicemix.jbi.messaging.ExchangePacket;
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
import javax.jbi.JBIException;
import javax.jbi.management.LifeCycleMBean;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
import javax.management.JMException;
import javax.management.MBeanAttributeInfo;
import javax.management.ObjectName;
+import javax.xml.namespace.QName;
/**
* A simple Straight through flow
@@ -95,8 +99,9 @@
* @throws JBIException
*/
public void shutDown() throws JBIException{
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Called Flow shutdown");
+ }
super.shutDown();
}
@@ -106,12 +111,9 @@
* @throws JBIException
*/
public void send(MessageExchange me) throws JBIException{
- // Check persistence
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Called Flow send");
- if (!canPersist() && isPersistent(me)) {
- throw new UnsupportedOperationException("persistence is not available on st flow");
- }
+ }
// do send
try {
lock.readLock().lock();
@@ -125,8 +127,9 @@
* suspend the flow to prevent any message exchanges
*/
public synchronized void suspend(){
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Called Flow suspend");
+ }
lock.writeLock().lock();
suspendThread = Thread.currentThread();
}
@@ -136,8 +139,9 @@
* resume message exchange processing
*/
public synchronized void resume(){
- if (log.isDebugEnabled())
+ if (log.isDebugEnabled()) {
log.debug("Called Flow resume");
+ }
lock.writeLock().unlock();
suspendThread = null;
}
@@ -150,15 +154,6 @@
protected abstract void doSend(MessageExchangeImpl me) throws JBIException;
/**
- * Ability for this flow to persist exchanges.
- *
- * @return <code>true</code> if this flow can persist messages
- */
- protected boolean canPersist() {
- return false;
- }
-
- /**
* Distribute an ExchangePacket
*
* @param packet
@@ -206,6 +201,55 @@
}
}
+ protected boolean isTransacted(MessageExchange me) {
+ return me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) != null;
+ }
+
+ protected boolean isSynchronous(MessageExchange me) {
+ Boolean sync = (Boolean) me.getProperty(JbiConstants.SEND_SYNC);
+ return sync != null && sync.booleanValue();
+ }
+
+ protected boolean isClustered(MessageExchange me) {
+ ServiceEndpoint se = me.getEndpoint();
+ if (se == null) {
+ // Routing by service name
+ QName serviceName = me.getService();
+ if (serviceName != null) {
+ ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForService(serviceName);
+ for (int i = 0; i < eps.length; i++) {
+ if (eps[i] instanceof InternalEndpoint) {
+ String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
+ if (!name.equals(broker.getContainerName())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ } else {
+ // Routing by interface name
+ QName interfaceName = me.getInterfaceName();
+ ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpoints(interfaceName);
+ for (int i = 0; i < eps.length; i++) {
+ if (eps[i] instanceof InternalEndpoint) {
+ String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
+ if (!name.equals(broker.getContainerName())) {
+ return true;
+ }
+ }
+ }
+ return false;
+ }
+ // Routing by endpoint
+ } else if (se instanceof InternalEndpoint) {
+ String name = ((InternalEndpoint) se).getComponentNameSpace().getContainerName();
+ return !name.equals(broker.getContainerName());
+ // Unknown: assume this is not clustered
+ } else {
+ return false;
+ }
+ }
+
public Broker getBroker() {
return broker;
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java Mon Feb 27 07:53:10 2006
@@ -34,16 +34,22 @@
if (flow != null) {
for (int i = 0; i < flows.length; i++) {
if (flows[i].getName().equals(flow)) {
- return flows[i];
+ if (flows[i].canHandle(exchange)) {
+ return flows[i];
+ } else {
+ log.debug("Flow '" + flow + "' was specified but not able to handle exchange");
+ }
}
}
log.debug("Flow '" + flow + "' was specified but not found");
}
// Check against flow capabilities
- Object tx = exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
- Boolean sync = (Boolean) exchange.getProperty(JbiConstants.SEND_SYNC);
- // TODO
- return flows[0];
+ for (int i = 0; i < flows.length; i++) {
+ if (flows[i].canHandle(exchange)) {
+ return flows[i];
+ }
+ }
+ return null;
}
-
+
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java Mon Feb 27 07:53:10 2006
@@ -64,5 +64,18 @@
* resume message exchange processing
*/
public void resume();
+
+ /**
+ * Get the broker associated with this flow
+ *
+ */
+ public Broker getBroker();
+
+ /**
+ * Check if the flow can support the requested QoS for this exchange
+ * @param me the exchange to check
+ * @return true if this flow can handle the given exchange
+ */
+ public boolean canHandle(MessageExchange me);
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Mon Feb 27 07:53:10 2006
@@ -21,6 +21,7 @@
import java.util.Set;
import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
import javax.jms.Connection;
@@ -92,7 +93,7 @@
private String password;
private ConnectionFactory connectionFactory;
private Connection connection;
- private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
+ private String broadcastDestinationName = "org.apache.servicemix.JCAFlow";
private Topic broadcastTopic;
private Map networkNodeKeyMap = new ConcurrentHashMap();
private Map networkComponentKeyMap = new ConcurrentHashMap();
@@ -263,10 +264,9 @@
connection.start();
broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
- broadcastSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
- broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
- advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
-
+ broadcastSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+ broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
+ advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
}
catch (Exception e) {
log.error("Failed to initialize JCAFlow", e);
@@ -354,14 +354,17 @@
}
/**
- * Ability for this flow to persist exchanges.
- *
- * @return <code>true</code> if this flow can persist messages
- */
- protected boolean canPersist() {
- return true;
+ * Check if the flow can support the requested QoS for this exchange
+ * @param me the exchange to check
+ * @return true if this flow can handle the given exchange
+ */
+ public boolean canHandle(MessageExchange me) {
+ if (isTransacted(me) && isSynchronous(me)) {
+ return false;
+ }
+ return true;
}
-
+
/**
* Process state changes in Components
*
@@ -372,7 +375,7 @@
String componentName=event.getPacket().getComponentNameSpace().getName();
if(event.getStatus()==ComponentPacketEvent.ACTIVATED){
if(!connectorMap.containsKey(componentName)){
- ActiveMQActivationSpec ac=new ActiveMQActivationSpec();
+ ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
ac.setDestinationType("javax.jms.Queue");
ac.setDestination(INBOUND_PREFIX+componentName);
JCAConnector connector=new JCAConnector();
@@ -419,9 +422,6 @@
ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
if (cc != null) {
- if (me.getMirror().getSyncState() != MessageExchangeImpl.SYNC_STATE_ASYNC) {
- throw new IllegalStateException("sendSync can not be used on jca flow with external components");
- }
try {
final String componentName = cc.getComponentNameSpace().getName();
String destination;
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Mon Feb 27 07:53:10 2006
@@ -41,6 +41,7 @@
import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
import javax.jms.DeliveryMode;
@@ -169,6 +170,18 @@
}
/**
+ * Check if the flow can support the requested QoS for this exchange
+ * @param me the exchange to check
+ * @return true if this flow can handle the given exchange
+ */
+ public boolean canHandle(MessageExchange me) {
+ if (isTransacted(me)) {
+ return false;
+ }
+ return true;
+ }
+
+ /**
* Initialize the Region
*
* @param broker
@@ -221,11 +234,29 @@
super.start();
try {
broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
- broadcastConsumer.setMessageListener(this);
- Topic advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
- advisoryConsumer=broadcastSession.createConsumer(advisoryTopic);
- advisoryConsumer.setMessageListener(this);
-
+ broadcastConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ try {
+ if (started.get()) {
+ ObjectMessage objMsg = (ObjectMessage) message;
+ ComponentPacketEvent event = (ComponentPacketEvent) objMsg.getObject();
+ String containerName = event.getPacket().getComponentNameSpace().getContainerName();
+ processInBoundPacket(containerName, event);
+ }
+ } catch (Exception e) {
+ log.error("Error processing incoming broadcast message", e);
+ }
+ }
+ });
+ Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
+ advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
+ advisoryConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ if (started.get()) {
+ onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
+ }
+ }
+ });
// Start queue consumers for all components
for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
@@ -352,15 +383,12 @@
* @throws MessagingException
*/
public void doRouting(MessageExchangeImpl me) throws MessagingException{
- ComponentNameSpace id=me.getRole()==Role.PROVIDER?me.getDestinationId():me.getSourceId();
- ComponentConnector cc=broker.getRegistry().getComponentConnector(id);
- if(cc!=null){
- if (me.isTransacted() && me.getMirror().getSyncState() != MessageExchangeImpl.SYNC_STATE_ASYNC) {
- throw new IllegalStateException("transacted sendSync can not be used on jca flow with external components");
- }
+ ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
+ ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
+ if (cc != null) {
// let ActiveMQ do the routing ...
try{
- String componentName=cc.getComponentNameSpace().getName();
+ String componentName = cc.getComponentNameSpace().getName();
String destination = "";
if (me.getRole() == Role.PROVIDER){
destination = INBOUND_PREFIX + componentName;
@@ -384,55 +412,26 @@
*
* @param message
*/
- public void onMessage(Message message) {
- if (!started.get() || message == null) {
- return;
- }
+ public void onMessage(final Message message) {
try {
- if (message instanceof ObjectMessage) {
+ if (started.get()) {
ObjectMessage objMsg = (ObjectMessage) message;
- Object obj = objMsg.getObject();
- if (obj != null) {
- if (obj instanceof ComponentPacketEvent) {
- ComponentPacketEvent event = (ComponentPacketEvent) obj;
- String containerName = event.getPacket().getComponentNameSpace().getContainerName();
- processInBoundPacket(containerName, event);
+ final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
+ // Dispatch the message in another thread so as to free the jms session
+ // else if a component do a sendSync into the jms flow, the whole
+ // flow is deadlocked
+ broker.getWorkManager().scheduleWork(new Work() {
+ public void release() {
}
- else if (obj instanceof MessageExchangeImpl) {
- final MessageExchangeImpl me = (MessageExchangeImpl) obj;
- // Dispatch the message in another thread so as to free the jms session
- // else if a component do a sendSync into the jms flow, the whole
- // flow is deadlocked
- broker.getWorkManager().scheduleWork(new Work() {
- public void release() {
- }
- public void run() {
- try {
- JMSFlow.super.doRouting(me);
- }
- catch (Throwable e) {
- log.error("Caught an exception routing ExchangePacket: ", e);
- }
- }
- });
- }
- }
- } else if (message instanceof ActiveMQMessage) {
- Object obj = ((ActiveMQMessage) message).getDataStructure();
- if(obj instanceof ConsumerInfo){
- ConsumerInfo info=(ConsumerInfo) obj;
- subscriberSet.add(info.getConsumerId().getConnectionId());
- for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
- LocalComponentConnector lcc=(LocalComponentConnector) i.next();
- ComponentPacket packet=lcc.getPacket();
- ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
- onEvent(cpe);
+ public void run() {
+ try {
+ JMSFlow.super.doRouting(me);
+ }
+ catch (Throwable e) {
+ log.error("Caught an exception routing ExchangePacket: ", e);
+ }
}
- }else if(obj instanceof RemoveInfo){
- ConsumerId id=(ConsumerId) ((RemoveInfo) obj).getObjectId();
- subscriberSet.remove(id.getConnectionId());
- removeAllPackets(id.getConnectionId());
- }
+ });
}
}
catch (JMSException jmsEx) {
@@ -440,6 +439,23 @@
}
catch (WorkException e) {
log.error("Caught an exception routing ExchangePacket: ", e);
+ }
+ }
+
+ protected void onAdvisoryMessage(Object obj) {
+ if (obj instanceof ConsumerInfo) {
+ ConsumerInfo info = (ConsumerInfo) obj;
+ subscriberSet.add(info.getConsumerId().getConnectionId());
+ for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
+ LocalComponentConnector lcc=(LocalComponentConnector) i.next();
+ ComponentPacket packet=lcc.getPacket();
+ ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
+ onEvent(cpe);
+ }
+ }else if (obj instanceof RemoveInfo) {
+ ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
+ subscriberSet.remove(id.getConnectionId());
+ removeAllPackets(id.getConnectionId());
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java Mon Feb 27 07:53:10 2006
@@ -74,6 +74,15 @@
}
/**
+ * Check if the flow can support the requested QoS for this exchange
+ * @param me the exchange to check
+ * @return true if this flow can handle the given exchange
+ */
+ public boolean canHandle(MessageExchange me) {
+ return !isPersistent(me) && !isClustered(me);
+ }
+
+ /**
* start the flow
*
* @throws JBIException
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java Mon Feb 27 07:53:10 2006
@@ -18,6 +18,7 @@
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
/**
@@ -48,6 +49,15 @@
*/
public String getDescription(){
return "st";
+ }
+
+ /**
+ * Check if the flow can support the requested QoS for this exchange
+ * @param me the exchange to check
+ * @return true if this flow can handle the given exchange
+ */
+ public boolean canHandle(MessageExchange me) {
+ return !isPersistent(me) && !isClustered(me);
}
}