You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/02/27 16:53:14 UTC

svn commit: r381368 - in /incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow: AbstractFlow.java DefaultFlowChooser.java Flow.java jca/JCAFlow.java jms/JMSFlow.java seda/SedaFlow.java st/STFlow.java

Author: gnodet
Date: Mon Feb 27 07:53:10 2006
New Revision: 381368

URL: http://svn.apache.org/viewcvs?rev=381368&view=rev
Log:
SM-319: multiple flows step 2. Use flow capabilities and exchange QoS to choose a flow

Modified:
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/AbstractFlow.java Mon Feb 27 07:53:10 2006
@@ -20,6 +20,7 @@
 
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.JbiConstants;
 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
 import org.apache.servicemix.jbi.framework.LocalComponentConnector;
 import org.apache.servicemix.jbi.management.AttributeInfoHelper;
@@ -27,15 +28,18 @@
 import org.apache.servicemix.jbi.messaging.ExchangePacket;
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
 
 import javax.jbi.JBIException;
 import javax.jbi.management.LifeCycleMBean;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.management.JMException;
 import javax.management.MBeanAttributeInfo;
 import javax.management.ObjectName;
+import javax.xml.namespace.QName;
 
 /**
  * A simple Straight through flow
@@ -95,8 +99,9 @@
      * @throws JBIException
      */
     public void shutDown() throws JBIException{
-    	if (log.isDebugEnabled())
+    	if (log.isDebugEnabled()) {
     		log.debug("Called Flow shutdown");
+        }
         super.shutDown();
     }
     
@@ -106,12 +111,9 @@
      * @throws JBIException
      */
     public void send(MessageExchange me) throws JBIException{
-    	// Check persistence
-    	if (log.isDebugEnabled())
+    	if (log.isDebugEnabled()) {
     		log.debug("Called Flow send");
-    	if (!canPersist() && isPersistent(me)) {
-    		throw new UnsupportedOperationException("persistence is not available on st flow");
-    	}
+        }
     	// do send
         try {
             lock.readLock().lock();
@@ -125,8 +127,9 @@
      * suspend the flow to prevent any message exchanges
      */
     public synchronized void suspend(){
-    	if (log.isDebugEnabled())
+    	if (log.isDebugEnabled()) {
     		log.debug("Called Flow suspend");
+        }
         lock.writeLock().lock();
         suspendThread = Thread.currentThread();
     }
@@ -136,8 +139,9 @@
      * resume message exchange processing
      */
     public synchronized void resume(){
-    	if (log.isDebugEnabled())
+    	if (log.isDebugEnabled()) {
     		log.debug("Called Flow resume");
+        }
         lock.writeLock().unlock();
         suspendThread = null;
     }
@@ -150,15 +154,6 @@
     protected abstract void doSend(MessageExchangeImpl me) throws JBIException;
 
     /**
-     * Ability for this flow to persist exchanges.
-     * 
-     * @return <code>true</code> if this flow can persist messages
-     */
-    protected boolean canPersist() {
-    	return false;
-    }
-    
-    /**
      * Distribute an ExchangePacket
      * 
      * @param packet
@@ -206,6 +201,55 @@
     	}
     }
 
+    protected boolean isTransacted(MessageExchange me) {
+        return me.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME) != null;
+    }
+    
+    protected boolean isSynchronous(MessageExchange me) {
+        Boolean sync = (Boolean) me.getProperty(JbiConstants.SEND_SYNC);
+        return sync != null && sync.booleanValue();
+    }
+
+    protected boolean isClustered(MessageExchange me) {
+        ServiceEndpoint se = me.getEndpoint();
+        if (se == null) {
+            // Routing by service name
+            QName serviceName = me.getService();
+            if (serviceName != null) {
+                ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpointsForService(serviceName);
+                for (int i = 0; i < eps.length; i++) {
+                    if (eps[i] instanceof InternalEndpoint) {
+                        String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
+                        if (!name.equals(broker.getContainerName())) {
+                            return true;
+                        }
+                    }
+                }
+                return false;
+            } else {
+                // Routing by interface name
+                QName interfaceName = me.getInterfaceName();
+                ServiceEndpoint[] eps = broker.getContainer().getRegistry().getEndpoints(interfaceName);
+                for (int i = 0; i < eps.length; i++) {
+                    if (eps[i] instanceof InternalEndpoint) {
+                        String name = ((InternalEndpoint) eps[i]).getComponentNameSpace().getContainerName();
+                        if (!name.equals(broker.getContainerName())) {
+                            return true;
+                        }
+                    }
+                }
+                return false;
+            }
+        // Routing by endpoint
+        } else if (se instanceof InternalEndpoint) {
+            String name = ((InternalEndpoint) se).getComponentNameSpace().getContainerName();
+            return !name.equals(broker.getContainerName());
+        // Unknown: assume this is not clustered
+        } else {
+            return false;
+        }
+    }
+    
     public Broker getBroker() {
         return broker;
     }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/DefaultFlowChooser.java Mon Feb 27 07:53:10 2006
@@ -34,16 +34,22 @@
         if (flow != null) {
             for (int i = 0; i < flows.length; i++) {
                 if (flows[i].getName().equals(flow)) {
-                    return flows[i];
+                    if (flows[i].canHandle(exchange)) {
+                        return flows[i];
+                    } else {
+                        log.debug("Flow '" + flow + "' was specified but not able to handle exchange");
+                    }
                 }
             }
             log.debug("Flow '" + flow + "' was specified but not found");
         }
         // Check against flow capabilities
-        Object tx = exchange.getProperty(MessageExchange.JTA_TRANSACTION_PROPERTY_NAME);
-        Boolean sync = (Boolean) exchange.getProperty(JbiConstants.SEND_SYNC);
-        // TODO
-        return flows[0];
+        for (int i = 0; i < flows.length; i++) {
+            if (flows[i].canHandle(exchange)) {
+                return flows[i];
+            }
+        }
+        return null;
     }
-
+    
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/Flow.java Mon Feb 27 07:53:10 2006
@@ -64,5 +64,18 @@
      * resume message exchange processing
      */
     public void resume();
+    
+    /**
+     * Get the broker associated with this flow
+     *
+     */
+    public Broker getBroker();
+    
+    /**
+     * Check if the flow can support the requested QoS for this exchange
+     * @param me the exchange to check
+     * @return true if this flow can handle the given exchange
+     */
+    public boolean canHandle(MessageExchange me);
         
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Mon Feb 27 07:53:10 2006
@@ -21,6 +21,7 @@
 import java.util.Set;
 
 import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
 import javax.jms.Connection;
@@ -92,7 +93,7 @@
     private String password;
     private ConnectionFactory connectionFactory;
     private Connection connection;
-    private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
+    private String broadcastDestinationName = "org.apache.servicemix.JCAFlow";
     private Topic broadcastTopic;
     private Map networkNodeKeyMap = new ConcurrentHashMap();
     private Map networkComponentKeyMap = new ConcurrentHashMap();
@@ -263,10 +264,9 @@
         	connection.start();
         	broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
             
-        broadcastSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
-        broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
-        advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
-            
+            broadcastSession=connection.createSession(false,Session.AUTO_ACKNOWLEDGE);
+            broadcastTopic = new ActiveMQTopic(broadcastDestinationName);
+            advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
         }
         catch (Exception e) {
             log.error("Failed to initialize JCAFlow", e);
@@ -354,14 +354,17 @@
     }
 
     /**
-     * Ability for this flow to persist exchanges.
-     * 
-     * @return <code>true</code> if this flow can persist messages
-     */
-    protected boolean canPersist() {
-    	return true;
+     * Check if the flow can support the requested QoS for this exchange
+     * @param me the exchange to check
+     * @return true if this flow can handle the given exchange
+     */
+    public boolean canHandle(MessageExchange me) {
+        if (isTransacted(me) && isSynchronous(me)) {
+            return false;
+        }
+        return true;
     }
-
+    
     /**
      * Process state changes in Components
      * 
@@ -372,7 +375,7 @@
             String componentName=event.getPacket().getComponentNameSpace().getName();
             if(event.getStatus()==ComponentPacketEvent.ACTIVATED){
                 if(!connectorMap.containsKey(componentName)){
-                    ActiveMQActivationSpec ac=new ActiveMQActivationSpec();
+                    ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
                     ac.setDestinationType("javax.jms.Queue");
                     ac.setDestination(INBOUND_PREFIX+componentName);
                     JCAConnector connector=new JCAConnector();
@@ -419,9 +422,6 @@
         ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
         ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
         if (cc != null) {
-        	if (me.getMirror().getSyncState() != MessageExchangeImpl.SYNC_STATE_ASYNC) {
-        		throw new IllegalStateException("sendSync can not be used on jca flow with external components");
-        	}
             try {
                 final String componentName = cc.getComponentNameSpace().getName();
                 String destination;

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Mon Feb 27 07:53:10 2006
@@ -41,6 +41,7 @@
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
 
 import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
 import javax.jms.DeliveryMode;
@@ -169,6 +170,18 @@
     }
 
     /**
+     * Check if the flow can support the requested QoS for this exchange
+     * @param me the exchange to check
+     * @return true if this flow can handle the given exchange
+     */
+    public boolean canHandle(MessageExchange me) {
+        if (isTransacted(me)) {
+            return false;
+        }
+        return true;
+    }
+    
+    /**
      * Initialize the Region
      * 
      * @param broker
@@ -221,11 +234,29 @@
             super.start();
             try {
                 broadcastConsumer = broadcastSession.createConsumer(broadcastTopic, null, true);
-                broadcastConsumer.setMessageListener(this);
-                Topic advisoryTopic=AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
-                advisoryConsumer=broadcastSession.createConsumer(advisoryTopic);
-                advisoryConsumer.setMessageListener(this);
-
+                broadcastConsumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message message) {
+                        try {
+                            if (started.get()) {
+                                ObjectMessage objMsg = (ObjectMessage) message;
+                                ComponentPacketEvent event = (ComponentPacketEvent) objMsg.getObject();
+                                String containerName = event.getPacket().getComponentNameSpace().getContainerName();
+                                processInBoundPacket(containerName, event);
+                            }
+                        } catch (Exception e) {
+                            log.error("Error processing incoming broadcast message", e);
+                        }
+                    }
+                });
+                Topic advisoryTopic = AdvisorySupport.getConsumerAdvisoryTopic((ActiveMQDestination) broadcastTopic);
+                advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
+                advisoryConsumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message message) {
+                        if (started.get()) {
+                            onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
+                        }
+                    }
+                });
                 
                 // Start queue consumers for all components
                 for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
@@ -352,15 +383,12 @@
      * @throws MessagingException
      */
     public void doRouting(MessageExchangeImpl me) throws MessagingException{
-        ComponentNameSpace id=me.getRole()==Role.PROVIDER?me.getDestinationId():me.getSourceId();
-        ComponentConnector cc=broker.getRegistry().getComponentConnector(id);
-        if(cc!=null){
-            if (me.isTransacted() && me.getMirror().getSyncState() != MessageExchangeImpl.SYNC_STATE_ASYNC) {
-                throw new IllegalStateException("transacted sendSync can not be used on jca flow with external components");
-            }
+        ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
+        ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
+        if (cc != null) {
             // let ActiveMQ do the routing ...
             try{
-                String componentName=cc.getComponentNameSpace().getName();
+                String componentName = cc.getComponentNameSpace().getName();
                 String destination = "";
                 if (me.getRole() == Role.PROVIDER){
                     destination = INBOUND_PREFIX + componentName;
@@ -384,55 +412,26 @@
      * 
      * @param message
      */
-    public void onMessage(Message message) {
-        if (!started.get() || message == null) {
-            return;
-        }
+    public void onMessage(final Message message) {
         try {
-            if (message instanceof ObjectMessage) {
+            if (started.get()) {
                 ObjectMessage objMsg = (ObjectMessage) message;
-                Object obj = objMsg.getObject();
-                if (obj != null) {
-                    if (obj instanceof ComponentPacketEvent) {
-                        ComponentPacketEvent event = (ComponentPacketEvent) obj;
-                        String containerName = event.getPacket().getComponentNameSpace().getContainerName();
-                        processInBoundPacket(containerName, event);
+                final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
+                // Dispatch the message in another thread so as to free the jms session
+                // else if a component do a sendSync into the jms flow, the whole
+                // flow is deadlocked 
+                broker.getWorkManager().scheduleWork(new Work() {
+                    public void release() {
                     }
-                    else if (obj instanceof MessageExchangeImpl) {
-                        final MessageExchangeImpl me = (MessageExchangeImpl) obj;
-                        // Dispatch the message in another thread so as to free the jms session
-                        // else if a component do a sendSync into the jms flow, the whole
-                        // flow is deadlocked 
-                        broker.getWorkManager().scheduleWork(new Work() {
-                            public void release() {
-                            }
-                            public void run() {
-                                try {
-                                    JMSFlow.super.doRouting(me);
-                                }
-                                catch (Throwable e) {
-                                    log.error("Caught an exception routing ExchangePacket: ", e);
-                                }
-                            }
-                        });
-                    }
-                }
-            } else if (message instanceof ActiveMQMessage) {
-                Object obj = ((ActiveMQMessage) message).getDataStructure();
-                if(obj instanceof ConsumerInfo){
-                    ConsumerInfo info=(ConsumerInfo) obj;
-                    subscriberSet.add(info.getConsumerId().getConnectionId());
-                    for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
-                        LocalComponentConnector lcc=(LocalComponentConnector) i.next();
-                        ComponentPacket packet=lcc.getPacket();
-                        ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
-                        onEvent(cpe);
+                    public void run() {
+                        try {
+                            JMSFlow.super.doRouting(me);
+                        }
+                        catch (Throwable e) {
+                            log.error("Caught an exception routing ExchangePacket: ", e);
+                        }
                     }
-                }else if(obj instanceof RemoveInfo){
-                    ConsumerId id=(ConsumerId) ((RemoveInfo) obj).getObjectId();
-                    subscriberSet.remove(id.getConnectionId());
-                    removeAllPackets(id.getConnectionId());
-                }
+                });
             }
         }
         catch (JMSException jmsEx) {
@@ -440,6 +439,23 @@
         }
         catch (WorkException e) {
             log.error("Caught an exception routing ExchangePacket: ", e);
+        }
+    }
+    
+    protected void onAdvisoryMessage(Object obj) {
+        if (obj instanceof ConsumerInfo) {
+            ConsumerInfo info = (ConsumerInfo) obj;
+            subscriberSet.add(info.getConsumerId().getConnectionId());
+            for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
+                LocalComponentConnector lcc=(LocalComponentConnector) i.next();
+                ComponentPacket packet=lcc.getPacket();
+                ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
+                onEvent(cpe);
+            }
+        }else if (obj instanceof RemoveInfo) {
+            ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
+            subscriberSet.remove(id.getConnectionId());
+            removeAllPackets(id.getConnectionId());
         }
     }
 

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java Mon Feb 27 07:53:10 2006
@@ -74,6 +74,15 @@
     }
 
     /**
+     * Check if the flow can support the requested QoS for this exchange
+     * @param me the exchange to check
+     * @return true if this flow can handle the given exchange
+     */
+    public boolean canHandle(MessageExchange me) {
+        return !isPersistent(me) && !isClustered(me);
+    }
+    
+    /**
      * start the flow
      * 
      * @throws JBIException

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java?rev=381368&r1=381367&r2=381368&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/st/STFlow.java Mon Feb 27 07:53:10 2006
@@ -18,6 +18,7 @@
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
 
+import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 
 /**
@@ -48,6 +49,15 @@
      */
     public String getDescription(){
         return "st";
+    }
+    
+    /**
+     * Check if the flow can support the requested QoS for this exchange
+     * @param me the exchange to check
+     * @return true if this flow can handle the given exchange
+     */
+    public boolean canHandle(MessageExchange me) {
+        return !isPersistent(me) && !isClustered(me);
     }
     
 }