You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/03/07 00:19:38 UTC

svn commit: r383702 [3/3] - in /incubator/servicemix/trunk: servicemix-components/src/test/resources/org/apache/servicemix/components/email/ servicemix-core/src/main/java/org/apache/servicemix/jbi/audit/ servicemix-core/src/main/java/org/apache/service...

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Mon Mar  6 15:19:33 2006
@@ -16,7 +16,6 @@
 package org.apache.servicemix.jbi.nmr.flow.jca;
 
 import java.io.Serializable;
-import java.util.Iterator;
 import java.util.Map;
 import java.util.Set;
 
@@ -24,6 +23,7 @@
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.jms.Connection;
 import javax.jms.ConnectionFactory;
 import javax.jms.DeliveryMode;
@@ -46,7 +46,6 @@
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
 import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
@@ -62,15 +61,14 @@
 import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
 import org.apache.geronimo.connector.work.GeronimoWorkManager;
 import org.apache.geronimo.transaction.context.TransactionContextManager;
-import org.apache.servicemix.jbi.framework.ComponentConnector;
+import org.apache.servicemix.jbi.event.EndpointAdapter;
+import org.apache.servicemix.jbi.event.EndpointEvent;
+import org.apache.servicemix.jbi.event.EndpointListener;
 import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.apache.servicemix.jbi.framework.ComponentPacket;
-import org.apache.servicemix.jbi.framework.ComponentPacketEvent;
-import org.apache.servicemix.jbi.framework.ComponentPacketEventListener;
-import org.apache.servicemix.jbi.framework.LocalComponentConnector;
 import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
 import org.apache.servicemix.jbi.nmr.Broker;
 import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
 import org.jencks.JCAConnector;
 import org.jencks.SingletonEndpointFactory;
 import org.jencks.factory.ConnectionManagerFactoryBean;
@@ -84,7 +82,7 @@
  * 
  * @version $Revision$
  */
-public class JCAFlow extends AbstractFlow implements  MessageListener, ComponentPacketEventListener {
+public class JCAFlow extends AbstractFlow implements MessageListener {
     
     private static final Log log = LogFactory.getLog(JCAFlow.class);
     private static final String INBOUND_PREFIX = "org.apache.servicemix.inbound.";
@@ -95,8 +93,6 @@
     private Connection connection;
     private String broadcastDestinationName = "org.apache.servicemix.JCAFlow";
     private Topic broadcastTopic;
-    private Map networkNodeKeyMap = new ConcurrentHashMap();
-    private Map networkComponentKeyMap = new ConcurrentHashMap();
     private Map connectorMap = new ConcurrentHashMap();
     private AtomicBoolean started = new AtomicBoolean(false);
     private Set subscriberSet=new CopyOnWriteArraySet();
@@ -110,6 +106,8 @@
     private Topic advisoryTopic;
     private MessageConsumer advisoryConsumer;
 
+    private EndpointListener endpointListener;
+
     /**
      * The type of Flow
      * 
@@ -227,8 +225,19 @@
      * @throws JBIException
      */
     public void init(Broker broker, String subType) throws JBIException {
+        log.info(broker.getContainerName() + ": Initializing jca flow");
         super.init(broker, subType);
-        broker.getRegistry().addComponentPacketListener(this);
+        // Create and register endpoint listener
+        endpointListener = new EndpointAdapter() {
+            public void internalEndpointRegistered(EndpointEvent event) {
+                onInternalEndpointRegistered(event, true);
+            }
+
+            public void internalEndpointUnregistered(EndpointEvent event) {
+                onInternalEndpointUnregistered(event, true);
+            }
+        };
+        broker.getContainer().addListener(endpointListener);
         try {
         	resourceAdapter = createResourceAdapter();
         	
@@ -248,17 +257,6 @@
         	mcf.setResourceAdapter(resourceAdapter);
         	connectionFactory = (ConnectionFactory) mcf.createConnectionFactory(getConnectionManager());
         	
-        	// Inbound broadcast
-        	ac = new ActiveMQActivationSpec();
-        	ac.setDestinationType("javax.jms.Topic");
-        	ac.setDestination(broadcastDestinationName);
-        	broadcastConnector = new JCAConnector();
-        	broadcastConnector.setBootstrapContext(getBootstrapContext());
-        	broadcastConnector.setActivationSpec(ac);
-        	broadcastConnector.setResourceAdapter(resourceAdapter);
-        	broadcastConnector.setEndpointFactory(new SingletonEndpointFactory(this));
-        	broadcastConnector.afterPropertiesSet();
-        	
         	// Outbound broadcast
         	connection = ((ActiveMQResourceAdapter) resourceAdapter).makeConnection();
         	connection.start();
@@ -283,10 +281,43 @@
         if (started.compareAndSet(false, true)) {
             super.start();
             try {
-                advisoryConsumer=broadcastSession.createConsumer(advisoryTopic);
-                advisoryConsumer.setMessageListener(this);
+                // Inbound broadcast
+                ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
+                ac.setDestinationType("javax.jms.Topic");
+                ac.setDestination(broadcastDestinationName);
+                broadcastConnector = new JCAConnector();
+                broadcastConnector.setBootstrapContext(getBootstrapContext());
+                broadcastConnector.setActivationSpec(ac);
+                broadcastConnector.setResourceAdapter(resourceAdapter);
+                broadcastConnector.setEndpointFactory(new SingletonEndpointFactory(new MessageListener() {
+                    public void onMessage(Message message) {
+                        try {
+                            Object obj = ((ObjectMessage) message).getObject();
+                            if (obj instanceof EndpointEvent) {
+                                EndpointEvent event = (EndpointEvent) obj;
+                                if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+                                    onRemoteEndpointRegistered(event);
+                                } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+                                    onRemoteEndpointUnregistered(event);
+                                }
+                            }
+                        } catch (Exception e) {
+                            log.error("Error processing incoming broadcast message", e);
+                        }
+                    }
+                }));
+                broadcastConnector.afterPropertiesSet();
+                
+                advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
+                advisoryConsumer.setMessageListener(new MessageListener() {
+                    public void onMessage(Message message) {
+                        if (started.get()) {
+                            onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
+                        }
+                    }
+                });
             }
-            catch (JMSException e) {
+            catch (Exception e) {
                 throw new JBIException("JMSException caught in start: " + e.getMessage(), e);
             }
         }
@@ -313,6 +344,8 @@
     public void shutDown() throws JBIException {
         super.shutDown();
         stop();
+        // Remove endpoint listener
+        broker.getContainer().removeListener(endpointListener);
         // Destroy connectors
         while (!connectorMap.isEmpty()) {
         	JCAConnector connector = (JCAConnector) connectorMap.remove(connectorMap.keySet().iterator().next());
@@ -365,41 +398,60 @@
         return true;
     }
     
-    /**
-     * Process state changes in Components
-     * 
-     * @param event
-     */
-    public void onEvent(final ComponentPacketEvent event){
+    public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
+        if (!started.get()) {
+            return;
+        }
+        try {
+            String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+            if(!connectorMap.containsKey(key)){
+                ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
+                ac.setDestinationType("javax.jms.Queue");
+                ac.setDestination(INBOUND_PREFIX + key);
+                JCAConnector connector = new JCAConnector();
+                connector.setBootstrapContext(getBootstrapContext());
+                connector.setActivationSpec(ac);
+                connector.setResourceAdapter(resourceAdapter);
+                connector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager()));
+                connector.afterPropertiesSet();
+                connectorMap.put(key, connector);
+            }
+            // broadcast change to the network
+            if (broadcast) {
+                log.info(broker.getContainerName() + ": broadcasting info for " + event);
+                sendJmsMessage(broadcastTopic, event, false, false);
+            }
+        } catch (Exception e) {
+            log.error("Cannot create consumer for " + event.getEndpoint(), e);
+        }
+    }
+    
+    public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
         try{
-            String componentName=event.getPacket().getComponentNameSpace().getName();
-            if(event.getStatus()==ComponentPacketEvent.ACTIVATED){
-                if(!connectorMap.containsKey(componentName)){
-                    ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
-                    ac.setDestinationType("javax.jms.Queue");
-                    ac.setDestination(INBOUND_PREFIX+componentName);
-                    JCAConnector connector=new JCAConnector();
-                    connector.setBootstrapContext(getBootstrapContext());
-                    connector.setActivationSpec(ac);
-                    connector.setResourceAdapter(resourceAdapter);
-                    connector.setEndpointFactory(new SingletonEndpointFactory(this,getTransactionManager()));
-                    connector.afterPropertiesSet();
-                    connectorMap.put(componentName,connector);
-                }
-            }else if(event.getStatus()==ComponentPacketEvent.DEACTIVATED){
-                JCAConnector connector=(JCAConnector) connectorMap.remove(componentName);
-                if(connector!=null){
-                    connector.destroy();
-                }
+            String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+            JCAConnector connector=(JCAConnector) connectorMap.remove(key);
+            if(connector!=null){
+                connector.destroy();
             }
             // broadcast change to the network
-            log.info("broadcast to internal JMS network: " + event);
-            sendJmsMessage(broadcastTopic, event, false, false);
-        }catch(Exception e){
-            log.error("failed to broadcast to the internal JMS network: "+event,e);
+            if (broadcast) {
+                log.info(broker.getContainerName() + ": broadcasting info for " + event);
+                sendJmsMessage(broadcastTopic, event, false, false);
+            }
+        } catch (Exception e) {
+            log.error("Cannot destroy consumer for " + event, e);
         }
     }
+    
+    public void onRemoteEndpointRegistered(EndpointEvent event) {
+        log.info(broker.getContainerName() + ": adding remote endpoint: " + event.getEndpoint());
+        broker.getRegistry().registerRemoteEndpoint(event.getEndpoint());
+    }
 
+    public void onRemoteEndpointUnregistered(EndpointEvent event) {
+        log.info(broker.getContainerName() + ": removing remote endpoint: " + event.getEndpoint());
+        broker.getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
+    }
 
     /**
      * Distribute an ExchangePacket
@@ -418,25 +470,19 @@
      * @throws MessagingException
      */
     public void doRouting(final MessageExchangeImpl me) throws MessagingException {
-        
-        ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
-        ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
-        if (cc != null) {
-            try {
-                final String componentName = cc.getComponentNameSpace().getName();
-                String destination;
-                if (me.getRole() == Role.PROVIDER){
-                    destination = INBOUND_PREFIX + componentName;
-                }else {
-                    destination = INBOUND_PREFIX + id.getContainerName();
-                }
-                sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
-            } catch (Exception e) {
-                log.error("Failed to send exchange: " + me + " internal JMS Network", e);
-                throw new MessagingException(e);
+        // let ActiveMQ do the routing ...
+        try {
+            String destination;
+            if (me.getRole() == Role.PROVIDER) {
+                destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+            } else {
+                ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
+                destination = INBOUND_PREFIX + id.getContainerName();
             }
-        } else {
-            throw new MessagingException("No component with id (" + id + ") - Couldn't route MessageExchange " + me);
+            sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
+        } catch (JMSException e) {
+            log.error("Failed to send exchange: " + me + " internal JMS Network", e);
+            throw new MessagingException(e);
         }
     }
 
@@ -446,50 +492,26 @@
      * @param message
      */
     public void onMessage(Message message) {
-        if (message == null) {
-            return;
-        }
         try {
-            if (message instanceof ObjectMessage) {
+            if (message != null && started.get()) {
                 ObjectMessage objMsg = (ObjectMessage) message;
-                Object obj = objMsg.getObject();
-                if (obj != null) {
-                    if (obj instanceof ComponentPacketEvent) {
-                        ComponentPacketEvent event = (ComponentPacketEvent) obj;
-                        String containerName = event.getPacket().getComponentNameSpace().getContainerName();
-                        processInBoundPacket(containerName, event);
-                    }
-                    else if (obj instanceof MessageExchangeImpl) {
-                        // Hack for redelivery: AMQ is too optimized and the object is the same upon redelivery
-                        // so that there are side effect (the exchange state may have been modified)
-                        // See http://jira.activemq.org/jira/browse/AMQ-519
-                        obj = ((ActiveMQObjectMessage) ((ActiveMQObjectMessage) message).copy()).getObject();
-                        MessageExchangeImpl me = (MessageExchangeImpl) obj;
-                        TransactionManager tm = (TransactionManager) getTransactionManager();
-                        if (tm != null) {
-                            me.setTransactionContext(tm.getTransaction());
-                        }
-                        super.doRouting(me);
-                    }
+                final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
+                // Hack for redelivery: AMQ is too optimized and the object is the same upon redelivery
+                // so that there are side effect (the exchange state may have been modified)
+                // See http://jira.activemq.org/jira/browse/AMQ-519
+                //me = (MessageExchangeImpl) ((ActiveMQObjectMessage) ((ActiveMQObjectMessage) message).copy()).getObject();
+                TransactionManager tm = (TransactionManager) getTransactionManager();
+                if (tm != null) {
+                    me.setTransactionContext(tm.getTransaction());
                 }
-            } else if (message instanceof ActiveMQMessage) {
-                Object obj = ((ActiveMQMessage) message).getDataStructure();
-                if(obj instanceof ConsumerInfo){
-                    ConsumerInfo info=(ConsumerInfo) obj;
-                    subscriberSet.add(info.getConsumerId().getConnectionId());
-                    if(started.get()){
-                        for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
-                            LocalComponentConnector lcc=(LocalComponentConnector) i.next();
-                            ComponentPacket packet=lcc.getPacket();
-                            ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
-                            onEvent(cpe);
-                        }
-                    }
-                }else if(obj instanceof RemoveInfo){
-                    ConsumerId id=(ConsumerId) ((RemoveInfo) obj).getObjectId();
-                    subscriberSet.remove(id.getConnectionId());
-                    removeAllPackets(id.getConnectionId());
-                }            
+                if (me.getDestinationId() == null) {
+                    ServiceEndpoint se = me.getEndpoint();
+                    se = broker.getRegistry()
+                            .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
+                    me.setEndpoint(se);
+                    me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
+                }
+                super.doRouting(me);
             }
         }
         catch (JMSException jmsEx) {
@@ -503,82 +525,26 @@
 		}
     }
 
-    /**
-     * Process Inbound packets
-     * 
-     * @param containerName
-     * @param event
-     */
-    protected void processInBoundPacket(String containerName, ComponentPacketEvent event) {
-        ComponentPacket packet = event.getPacket();
-        if (!packet.getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
-        	log.info("received from internal JMS network: " + event);
-            int eventStatus = event.getStatus();
-            switch (eventStatus) {
-                case ComponentPacketEvent.ACTIVATED:
-                    addRemotePacket(containerName, packet);
-                    break;
-                case ComponentPacketEvent.DEACTIVATED:
-                    removeRemotePacket(containerName, packet);
-                    break;
-                case ComponentPacketEvent.STATE_CHANGE:
-                    updateRemotePacket(containerName, packet);
-                    break;
-                default:
-                    log.warn("Unable to determine ComponentPacketEvent type: " + eventStatus + " for packet: " + packet);
-            }         	
-        }
-    }
-
-    private void addRemotePacket(String containerName, ComponentPacket packet) {
-        networkComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
-        Set set = (Set) networkNodeKeyMap.get(containerName);
-        if (set == null) {
-            set = new CopyOnWriteArraySet();
-            networkNodeKeyMap.put(containerName, set);
-        }
-        ComponentConnector cc = new ComponentConnector(packet);
-        log.info("Adding Remote Component: " + cc);
-        broker.getRegistry().addRemoteComponentConnector(cc);
-        set.add(packet);
-    }
-
-    private void updateRemotePacket(String containerName, ComponentPacket packet) {
-        Set set = (Set) networkNodeKeyMap.get(containerName);
-        if (set != null) {
-            set.remove(packet);
-            set.add(packet);
-        }
-        ComponentConnector cc = new ComponentConnector(packet);
-        log.info("Updating remote Component: " + cc);
-        broker.getRegistry().updateRemoteComponentConnector(cc);
-    }
-
-    private void removeRemotePacket(String containerName, ComponentPacket packet) {
-        networkComponentKeyMap.remove(packet.getComponentNameSpace());
-        Set set = (Set) networkNodeKeyMap.get(containerName);
-        if (set != null) {
-            set.remove(packet);
-            ComponentConnector cc = new ComponentConnector(packet);
-            log.info("Removing remote Component: " + cc);
-            broker.getRegistry().removeRemoteComponentConnector(cc);
-            if (set.isEmpty()) {
-                networkNodeKeyMap.remove(containerName);
+    protected void onAdvisoryMessage(Object obj) {
+        if (obj instanceof ConsumerInfo) {
+            ConsumerInfo info = (ConsumerInfo) obj;
+            subscriberSet.add(info.getConsumerId().getConnectionId());
+            ServiceEndpoint[] endpoints = broker.getRegistry().getEndpointsForInterface(null);
+            for (int i = 0; i < endpoints.length; i++) {
+                if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+                    onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+                            EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+                }
             }
+        } else if (obj instanceof RemoveInfo) {
+            ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
+            subscriberSet.remove(id.getConnectionId());
+            removeAllPackets(id.getConnectionId());
         }
     }
 
     private void removeAllPackets(String containerName) {
-        Set set = (Set) networkNodeKeyMap.remove(containerName);
-        if (set != null) {
-	        for (Iterator i = set.iterator();i.hasNext();) {
-	            ComponentPacket packet = (ComponentPacket) i.next();
-	            ComponentConnector cc = new ComponentConnector(packet);
-	            log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
-	            broker.getRegistry().removeRemoteComponentConnector(cc);
-	            networkComponentKeyMap.remove(packet.getComponentNameSpace());
-	        }
-        }
+        //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
     }
 
 	public ConnectionManager getConnectionManager() throws Exception {
@@ -643,4 +609,5 @@
     		connection.close();
     	}
     }
+
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Mon Mar  6 15:19:33 2006
@@ -15,35 +15,15 @@
  */
 package org.apache.servicemix.jbi.nmr.flow.jms;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.RemoveInfo;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.jbi.framework.ComponentConnector;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.apache.servicemix.jbi.framework.ComponentPacket;
-import org.apache.servicemix.jbi.framework.ComponentPacketEvent;
-import org.apache.servicemix.jbi.framework.ComponentPacketEventListener;
-import org.apache.servicemix.jbi.framework.LocalComponentConnector;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.nmr.Broker;
-import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
 
 import javax.jbi.JBIException;
 import javax.jbi.messaging.MessageExchange;
 import javax.jbi.messaging.MessagingException;
 import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
 import javax.jms.DeliveryMode;
 import javax.jms.JMSException;
 import javax.jms.Message;
@@ -57,38 +37,74 @@
 import javax.resource.spi.work.Work;
 import javax.resource.spi.work.WorkException;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.jbi.event.EndpointAdapter;
+import org.apache.servicemix.jbi.event.EndpointEvent;
+import org.apache.servicemix.jbi.event.EndpointListener;
+import org.apache.servicemix.jbi.framework.ComponentNameSpace;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Use for message routing among a network of containers. All routing/registration happens automatically.
  * 
  * @version $Revision$
  */
-public class JMSFlow extends AbstractFlow implements MessageListener, ComponentPacketEventListener {
-    
+public class JMSFlow extends AbstractFlow implements MessageListener {
+
     private static final Log log = LogFactory.getLog(JMSFlow.class);
+
     private static final String INBOUND_PREFIX = "org.apache.servicemix.inbound.";
+
     private String jmsURL = "peer://org.apache.servicemix?persistent=false";
+
     private String userName;
+
     private String password;
+
     private ActiveMQConnectionFactory connectionFactory;
+
     private ActiveMQConnection connection;
+
     private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
+
     private MessageProducer queueProducer;
+
     private MessageProducer topicProducer;
+
     private Topic broadcastTopic;
+
     private Session broadcastSession;
+
     private MessageConsumer broadcastConsumer;
+
     private Session inboundSession;
+
     private MessageConsumer advisoryConsumer;
-    private Set subscriberSet=new CopyOnWriteArraySet();
-    private Map networkNodeKeyMap = new ConcurrentHashMap();
-    private Map networkComponentKeyMap = new ConcurrentHashMap();
+
+    private Set subscriberSet = new CopyOnWriteArraySet();
+
     private Map consumerMap = new ConcurrentHashMap();
+
     private AtomicBoolean started = new AtomicBoolean(false);
 
+    private EndpointListener endpointListener;
+
     /**
      * The type of Flow
      * 
@@ -154,7 +170,6 @@
         this.connectionFactory = connectionFactory;
     }
 
-
     /**
      * @return Returns the broadcastDestinationName.
      */
@@ -180,7 +195,7 @@
         }
         return true;
     }
-    
+
     /**
      * Initialize the Region
      * 
@@ -190,13 +205,22 @@
     public void init(Broker broker, String subType) throws JBIException {
         log.info(broker.getContainerName() + ": Initializing jms flow");
         super.init(broker, subType);
-        broker.getRegistry().addComponentPacketListener(this);
+        // Create and register endpoint listener
+        endpointListener = new EndpointAdapter() {
+            public void internalEndpointRegistered(EndpointEvent event) {
+                onInternalEndpointRegistered(event, true);
+            }
+
+            public void internalEndpointUnregistered(EndpointEvent event) {
+                onInternalEndpointUnregistered(event, true);
+            }
+        };
+        broker.getContainer().addListener(endpointListener);
         try {
             if (connectionFactory == null) {
                 if (jmsURL != null) {
                     connectionFactory = new ActiveMQConnectionFactory(jmsURL);
-                }
-                else {
+                } else {
                     connectionFactory = new ActiveMQConnectionFactory();
                 }
             }
@@ -207,7 +231,7 @@
             }
             connection.setClientID(broker.getContainerName());
             connection.start();
-           	inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+            inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainerName());
             MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
             inboundQueue.setMessageListener(this);
@@ -216,8 +240,7 @@
             broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
             topicProducer = broadcastSession.createProducer(broadcastTopic);
             topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        }
-        catch (JMSException e) {
+        } catch (JMSException e) {
             log.error("Failed to initialize JMSFlow", e);
             throw new JBIException(e);
         }
@@ -237,11 +260,14 @@
                 broadcastConsumer.setMessageListener(new MessageListener() {
                     public void onMessage(Message message) {
                         try {
-                            if (started.get()) {
-                                ObjectMessage objMsg = (ObjectMessage) message;
-                                ComponentPacketEvent event = (ComponentPacketEvent) objMsg.getObject();
-                                String containerName = event.getPacket().getComponentNameSpace().getContainerName();
-                                processInBoundPacket(containerName, event);
+                            Object obj = ((ObjectMessage) message).getObject();
+                            if (obj instanceof EndpointEvent) {
+                                EndpointEvent event = (EndpointEvent) obj;
+                                if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+                                    onRemoteEndpointRegistered(event);
+                                } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+                                    onRemoteEndpointUnregistered(event);
+                                }
                             }
                         } catch (Exception e) {
                             log.error("Error processing incoming broadcast message", e);
@@ -257,16 +283,16 @@
                         }
                     }
                 });
-                
+
                 // Start queue consumers for all components
-                for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
-                    LocalComponentConnector lcc = (LocalComponentConnector) i.next();
-                    ComponentPacket packet = lcc.getPacket();
-                    ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
-                    onEvent(cpe, false);
+                ServiceEndpoint[] endpoints = broker.getRegistry().getEndpointsForInterface(null);
+                for (int i = 0; i < endpoints.length; i++) {
+                    if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+                        onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+                                EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
+                    }
                 }
-            }
-            catch (JMSException e) {
+            } catch (JMSException e) {
                 JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
                 throw jbiEx;
             }
@@ -290,8 +316,7 @@
             try {
                 advisoryConsumer.close();
                 broadcastConsumer.close();
-            }
-            catch (JMSException e) {
+            } catch (JMSException e) {
                 JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
                 throw jbiEx;
             }
@@ -301,11 +326,12 @@
     public void shutDown() throws JBIException {
         super.shutDown();
         stop();
+        // Remove endpoint listener
+        broker.getContainer().removeListener(endpointListener);
         if (this.connection != null) {
             try {
                 this.connection.close();
-            }
-            catch (JMSException e) {
+            } catch (JMSException e) {
                 log.warn("Error closing JMS Connection", e);
             }
         }
@@ -320,51 +346,54 @@
         return subscriberSet.size();
     }
 
-    /**
-     * Process state changes in Components
-     * 
-     * @param event
-     */
-    public void onEvent(ComponentPacketEvent event) {
-        onEvent(event, true);
-    }
-    
-    /**
-     * Process state changes in Components
-     * 
-     * @param event
-     */
-    public void onEvent(ComponentPacketEvent event, boolean broadcast) {
+    public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
+        if (!started.get()) {
+            return;
+        }
         try {
-            // broadcast internal changes to the network
-            if (started.get() && event.getPacket().getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
-                String componentName = event.getPacket().getComponentNameSpace().getName();
-                if (event.getStatus() == ComponentPacketEvent.ACTIVATED) {
-                    if (!consumerMap.containsKey(componentName)) {
-                        Queue queue = inboundSession.createQueue(INBOUND_PREFIX + componentName);
-                        MessageConsumer consumer = inboundSession.createConsumer(queue);
-                        consumer.setMessageListener(this);
-                        consumerMap.put(componentName,consumer);
-                    }
-                } else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
-                    MessageConsumer consumer = (MessageConsumer) consumerMap.remove(componentName);
-                    if (consumer != null){
-                        consumer.close();
-                    }
-                }
-                if (broadcast) {
-                    ObjectMessage msg = broadcastSession.createObjectMessage(event);
-                    log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
-                    topicProducer.send(msg);
-                }
+            String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+            if (!consumerMap.containsKey(key)) {
+                Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
+                MessageConsumer consumer = inboundSession.createConsumer(queue);
+                consumer.setMessageListener(this);
+                consumerMap.put(key, consumer);
+            }
+            if (broadcast) {
+                log.info(broker.getContainerName() + ": broadcasting info for " + event);
+                ObjectMessage msg = broadcastSession.createObjectMessage(event);
+                topicProducer.send(msg);
             }
+        } catch (Exception e) {
+            log.error("Cannot create consumer for " + event.getEndpoint(), e);
         }
-        catch (JMSException e) {
-            log.error("failed to broadcast to the internal JMS network: " + event, e);
+    }
+
+    public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
+        try {
+            String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+            MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
+            if (consumer != null) {
+                consumer.close();
+            }
+            if (broadcast) {
+                ObjectMessage msg = broadcastSession.createObjectMessage(event);
+                log.info(broker.getContainerName() + ": broadcasting info for " + event);
+                topicProducer.send(msg);
+            }
+        } catch (Exception e) {
+            log.error("Cannot destroy consumer for " + event, e);
         }
     }
 
-    
+    public void onRemoteEndpointRegistered(EndpointEvent event) {
+        log.info(broker.getContainerName() + ": adding remote endpoint: " + event.getEndpoint());
+        broker.getRegistry().registerRemoteEndpoint(event.getEndpoint());
+    }
+
+    public void onRemoteEndpointUnregistered(EndpointEvent event) {
+        log.info(broker.getContainerName() + ": removing remote endpoint: " + event.getEndpoint());
+        broker.getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
+    }
 
     /**
      * Distribute an ExchangePacket
@@ -375,35 +404,29 @@
     protected void doSend(MessageExchangeImpl me) throws MessagingException {
         doRouting(me);
     }
-    
+
     /**
      * Distribute an ExchangePacket
      * 
      * @param me
      * @throws MessagingException
      */
-    public void doRouting(MessageExchangeImpl me) throws MessagingException{
-        ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
-        ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
-        if (cc != null) {
-            // let ActiveMQ do the routing ...
-            try{
-                String componentName = cc.getComponentNameSpace().getName();
-                String destination = "";
-                if (me.getRole() == Role.PROVIDER){
-                    destination = INBOUND_PREFIX + componentName;
-                }else {
-                    destination = INBOUND_PREFIX + id.getContainerName();
-                }
-                Queue queue=inboundSession.createQueue(destination);
-                ObjectMessage msg=inboundSession.createObjectMessage(me);
-                queueProducer.send(queue,msg);
-            }catch(JMSException e){
-                log.error("Failed to send exchange: "+me+" internal JMS Network",e);
-                throw new MessagingException(e);
+    public void doRouting(MessageExchangeImpl me) throws MessagingException {
+        // let ActiveMQ do the routing ...
+        try {
+            String destination;
+            if (me.getRole() == Role.PROVIDER) {
+                destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+            } else {
+                ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
+                destination = INBOUND_PREFIX + id.getContainerName();
             }
-        }else{
-            throw new MessagingException("No component with id ("+id+") - Couldn't route MessageExchange "+me);
+            Queue queue = inboundSession.createQueue(destination);
+            ObjectMessage msg = inboundSession.createObjectMessage(me);
+            queueProducer.send(queue, msg);
+        } catch (JMSException e) {
+            log.error("Failed to send exchange: " + me + " internal JMS Network", e);
+            throw new MessagingException(e);
         }
     }
 
@@ -414,7 +437,7 @@
      */
     public void onMessage(final Message message) {
         try {
-            if (started.get()) {
+            if (message != null && started.get()) {
                 ObjectMessage objMsg = (ObjectMessage) message;
                 final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
                 // Dispatch the message in another thread so as to free the jms session
@@ -423,101 +446,49 @@
                 broker.getWorkManager().scheduleWork(new Work() {
                     public void release() {
                     }
+
                     public void run() {
                         try {
+                            if (me.getDestinationId() == null) {
+                                ServiceEndpoint se = me.getEndpoint();
+                                se = broker.getRegistry()
+                                        .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
+                                me.setEndpoint(se);
+                                me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
+                            }
                             JMSFlow.super.doRouting(me);
-                        }
-                        catch (Throwable e) {
+                        } catch (Throwable e) {
                             log.error("Caught an exception routing ExchangePacket: ", e);
                         }
                     }
                 });
             }
-        }
-        catch (JMSException jmsEx) {
+        } catch (JMSException jmsEx) {
             log.error("Caught an exception unpacking JMS Message: ", jmsEx);
-        }
-        catch (WorkException e) {
+        } catch (WorkException e) {
             log.error("Caught an exception routing ExchangePacket: ", e);
         }
     }
-    
+
     protected void onAdvisoryMessage(Object obj) {
         if (obj instanceof ConsumerInfo) {
             ConsumerInfo info = (ConsumerInfo) obj;
             subscriberSet.add(info.getConsumerId().getConnectionId());
-            for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
-                LocalComponentConnector lcc=(LocalComponentConnector) i.next();
-                ComponentPacket packet=lcc.getPacket();
-                ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
-                onEvent(cpe);
+            ServiceEndpoint[] endpoints = broker.getRegistry().getEndpointsForInterface(null);
+            for (int i = 0; i < endpoints.length; i++) {
+                if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+                    onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+                            EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+                }
             }
-        }else if (obj instanceof RemoveInfo) {
+        } else if (obj instanceof RemoveInfo) {
             ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
             subscriberSet.remove(id.getConnectionId());
             removeAllPackets(id.getConnectionId());
         }
     }
 
-    /**
-     * Process Inbound packets
-     * 
-     * @param containerName
-     * @param event
-     */
-    protected void processInBoundPacket(String containerName, ComponentPacketEvent event) {
-        ComponentPacket packet = event.getPacket();
-        if (!packet.getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
-            int eventStatus = event.getStatus();
-            switch (eventStatus) {
-                case ComponentPacketEvent.ACTIVATED:
-                case ComponentPacketEvent.STATE_CHANGE:
-                    updateRemotePacket(containerName, packet);
-                    break;
-                case ComponentPacketEvent.DEACTIVATED:
-                    removeRemotePacket(containerName, packet);
-                    break;
-                default:
-                    log.warn("Unable to determine ComponentPacketEvent type: " + eventStatus + " for packet: " + packet);
-            }         	
-        }
-    }
-
-    private void updateRemotePacket(String containerName, ComponentPacket packet) {
-        Set set = (Set) networkNodeKeyMap.get(containerName);
-        if (set != null) {
-            set.remove(packet);
-            set.add(packet);
-        }
-        ComponentConnector cc = new ComponentConnector(packet);
-        log.info(broker.getContainerName() + ": updating remote component: " + cc);
-        broker.getRegistry().updateRemoteComponentConnector(cc);
-    }
-
-    private void removeRemotePacket(String containerName, ComponentPacket packet) {
-        networkComponentKeyMap.remove(packet.getComponentNameSpace());
-        Set set = (Set) networkNodeKeyMap.get(containerName);
-        if (set != null) {
-            set.remove(packet);
-            ComponentConnector cc = new ComponentConnector(packet);
-            log.info(broker.getContainerName() + ": removing remote component: " + cc);
-            broker.getRegistry().removeRemoteComponentConnector(cc);
-            if (set.isEmpty()) {
-                networkNodeKeyMap.remove(containerName);
-            }
-        }
-    }
-
     private void removeAllPackets(String containerName) {
-        Set set = (Set) networkNodeKeyMap.remove(containerName);
-        if (set != null) {
-	        for (Iterator i = set.iterator();i.hasNext();) {
-	            ComponentPacket packet = (ComponentPacket) i.next();
-	            ComponentConnector cc = new ComponentConnector(packet);
-	            log.info(broker.getContainerName() + ": Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
-	            broker.getRegistry().removeRemoteComponentConnector(cc);
-	            networkComponentKeyMap.remove(packet.getComponentNameSpace());
-	        }
-        }
+        //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
     }
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java Mon Mar  6 15:19:33 2006
@@ -15,18 +15,8 @@
  */
 package org.apache.servicemix.jbi.nmr.flow.seda;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.apache.servicemix.jbi.framework.ComponentPacketEvent;
-import org.apache.servicemix.jbi.framework.ComponentPacketEventListener;
-import org.apache.servicemix.jbi.management.AttributeInfoHelper;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.nmr.Broker;
-import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import java.util.Iterator;
+import java.util.Map;
 
 import javax.jbi.JBIException;
 import javax.jbi.management.LifeCycleMBean;
@@ -36,8 +26,19 @@
 import javax.management.MBeanAttributeInfo;
 import javax.management.ObjectName;
 
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.jbi.event.ComponentAdapter;
+import org.apache.servicemix.jbi.event.ComponentEvent;
+import org.apache.servicemix.jbi.event.ComponentListener;
+import org.apache.servicemix.jbi.framework.ComponentNameSpace;
+import org.apache.servicemix.jbi.management.AttributeInfoHelper;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * The SedaFlow introduces a simple event staging between the internal processes 
@@ -47,11 +48,12 @@
  * 
  * @version $Revision$
  */
-public class SedaFlow extends AbstractFlow implements ComponentPacketEventListener {
+public class SedaFlow extends AbstractFlow {
     private static final Log log = LogFactory.getLog(SedaFlow.class);
     protected Map queueMap = new ConcurrentHashMap();
     protected int capacity = 100;
     protected AtomicBoolean started = new AtomicBoolean(false);
+    protected ComponentListener listener;
 
     /**
      * The type of Flow
@@ -70,7 +72,12 @@
      */
     public void init(Broker broker, String subType) throws JBIException {
         super.init(broker, subType);
-        broker.getRegistry().addComponentPacketListener(this);
+        listener = new ComponentAdapter() {
+            public void componentShutDown(ComponentEvent event) {
+                onComponentShutdown(event.getComponent().getComponentNameSpace());
+            }
+        };
+        broker.getContainer().addListener(listener);
     }
 
     /**
@@ -118,8 +125,8 @@
      * @throws JBIException
      */
     public void shutDown() throws JBIException {
-        broker.getRegistry().removeComponentPacketListener(this);
-        for (Iterator i = queueMap.values().iterator();i.hasNext();) {
+        broker.getContainer().removeListener(listener);
+        for (Iterator i = queueMap.values().iterator(); i.hasNext();) {
             SedaQueue queue = (SedaQueue) i.next();
             queue.shutDown();
             unregisterQueue(queue);
@@ -177,19 +184,15 @@
      * 
      * @param event
      */
-    public synchronized void onEvent(ComponentPacketEvent event) {
-        // watch for deactivations
-        if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
-            ComponentNameSpace cns = event.getPacket().getComponentNameSpace();
-            SedaQueue queue = (SedaQueue) queueMap.remove(cns);
-            if (queue != null) {
-                try {
-                    queue.shutDown();
-                    unregisterQueue(queue);
-                }
-                catch (JBIException e) {
-                    log.error("Caught exception stopping SedaQueue: " + queue);
-                }
+    public synchronized void onComponentShutdown(ComponentNameSpace cns) {
+        SedaQueue queue = (SedaQueue) queueMap.remove(cns);
+        if (queue != null) {
+            try {
+                queue.shutDown();
+                unregisterQueue(queue);
+            }
+            catch (JBIException e) {
+                log.error("Caught exception stopping SedaQueue: " + queue);
             }
         }
     }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java Mon Mar  6 15:19:33 2006
@@ -36,8 +36,12 @@
      * get the id of the ComponentConnector
      * @return the id
      */
-    public ComponentNameSpace getComponentNameSpace(){
+    public ComponentNameSpace getComponentNameSpace() {
         return componentName;
+    }
+
+    public void setComponentName(ComponentNameSpace componentName) {
+        this.componentName = componentName;
     }
 
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java Mon Mar  6 15:19:33 2006
@@ -16,7 +16,9 @@
 package org.apache.servicemix.jbi.servicedesc;
 
 import java.util.ArrayList;
+import java.util.HashMap;
 import java.util.List;
+import java.util.Map;
 
 import javax.xml.namespace.QName;
 
@@ -38,7 +40,7 @@
     private String endpointName;
     private QName serviceName;
     private List interfaces = new ArrayList();
-    
+    private transient Map remotes = new HashMap();
     
 
     /**
@@ -53,10 +55,6 @@
         this.serviceName = serviceName;
     }
     
-    protected InternalEndpoint() {
-    }
-    
-    
     /**
      * Get a reference to this endpoint, using an endpoint reference vocabulary
      * that is known to the provider.
@@ -107,6 +105,40 @@
     }
     
     /**
+     * Retrieve all remote component namespaces where this endpoint is activated
+     * @return component namespaces
+     */
+    public InternalEndpoint[] getRemoteEndpoints() {
+        InternalEndpoint[] result = new InternalEndpoint[remotes.size()];
+        remotes.values().toArray(result);
+        return result;
+    }
+    
+    public void addRemoteEndpoint(InternalEndpoint remote) {
+        remotes.put(remote.getComponentNameSpace(), remote);
+    }
+    
+    public void removeRemoteEndpoint(InternalEndpoint remote) {
+        remotes.remove(remote.getComponentNameSpace());
+    }
+    
+    /**
+     * Check if this endpoint is locally activated
+     * @return true if the endpoint has been activated locally
+     */
+    public boolean isLocal() {
+        return getComponentNameSpace() != null;
+    }
+    
+    /**
+     * Check if the endpoint is remotely activated
+     * @return true if the endpoint has been remotely activated
+     */
+    public boolean isClustered() {
+        return remotes != null && remotes.size() > 0;
+    }
+    
+    /**
      * @param obj
      * @return true if equal
      */
@@ -114,8 +146,7 @@
         boolean result = false;
         if (obj != null && obj instanceof InternalEndpoint){
             InternalEndpoint other = (InternalEndpoint)obj;
-            result = other.getComponentNameSpace().equals(this.getComponentNameSpace()) && 
-                     other.serviceName.equals(this.serviceName) &&
+            result = other.serviceName.equals(this.serviceName) &&
                      other.endpointName.equals(this.endpointName);
         }
         return result;
@@ -126,8 +157,7 @@
      * @return has code
      */
     public int hashCode() {
-        return getComponentNameSpace().hashCode() ^
-               serviceName.hashCode() ^
+        return serviceName.hashCode() ^
                endpointName.hashCode() ;
     }
     

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java Mon Mar  6 15:19:33 2006
@@ -15,10 +15,9 @@
  */
 package org.apache.servicemix.jbi.servicedesc;
 
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.w3c.dom.DocumentFragment;
-
 import javax.xml.namespace.QName;
+
+import org.w3c.dom.DocumentFragment;
 
 /**
  * Linked endpoints are defined by SA deployment.

Modified: incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml Mon Mar  6 15:19:33 2006
@@ -6,7 +6,7 @@
       <name>logger-component</name>
       <description>An example of a Logger JBI componet that can be configured through a service assembly to consume messages and dump them to the log</description>
     </identification>
-    <component-class-name description="Component Implementation">org.apache.servicemix.component.logger.LoggerComponent</component-class-name>
+    <component-class-name description="Component Implementation">org.servicemix.component.logger.LoggerComponent</component-class-name>
     <component-class-path>
       <path-element>lib/logger-component-1.0.jar</path-element>
     </component-class-path>

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java Mon Mar  6 15:19:33 2006
@@ -25,6 +25,7 @@
 public class ComponentPacketTest extends TestCase {
     
     public void testRegisterTwoEndpoints() throws Exception {
+        /*
         ComponentPacket packet = new ComponentPacket();
         ComponentNameSpace cns = new ComponentNameSpace("container", "component", null);
         ServiceEndpoint ep1 = new InternalEndpoint(cns, "endpoint", new QName("urn:foo", "service1"));
@@ -32,6 +33,7 @@
         packet.addActiveEndpoint(ep1);
         packet.addActiveEndpoint(ep2);
         assertEquals(2, packet.getActiveEndpoints().size());
+        */
     }
 
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java Mon Mar  6 15:19:33 2006
@@ -251,6 +251,7 @@
         componentMock.setReturnValue(manager, MockControl.ONE_OR_MORE);
         manager.init(null, null);
         managerMock.setMatcher(MockControl.ALWAYS_MATCHER);
+        manager.shutDown("su");
         replay();
         // start container
         startContainer(false);

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java Mon Mar  6 15:19:33 2006
@@ -53,6 +53,7 @@
     	container.setRmiPort(namingPort);
     	container.setCreateMBeanServer(true);
     	container.init();
+        Thread.sleep(5000);
     }
     
     protected void tearDown() throws Exception {

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java Mon Mar  6 15:19:33 2006
@@ -67,6 +67,7 @@
     }
 
     public void testRemote() throws Exception {
+        Thread.sleep(5000);
         // The address of the connector server
         JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"
                 + namingHost + ":" + namingPort + jndiPath);
@@ -100,11 +101,11 @@
     }
     
     public void testComponent() throws Exception {
-    	ObjectName[] names = context.getEngineComponents();
+    	ObjectName[] names = context.getPojoComponents();
     	assertEquals(1, names.length);
     	EchoComponent echo = new EchoComponent();
     	container.activateComponent(echo, "echo");
-    	names = context.getEngineComponents();
+    	names = context.getPojoComponents();
     	assertNotNull(names);
     	assertEquals(2, names.length);
     	assertEquals(LifeCycleMBean.STARTED, echo.getCurrentState());

Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java?rev=383702&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java Mon Mar  6 15:19:33 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr;
+
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.tck.Sender;
+import org.apache.servicemix.tck.SenderComponent;
+
+public class ConnectionsTest extends TestCase {
+
+    private JBIContainer container;
+    
+    protected void setUp() throws Exception {
+        container = new JBIContainer();
+        container.setEmbedded(true);
+        container.init();
+        container.start();
+    }
+    
+    protected void tearDown() throws Exception {
+        container.shutDown();
+    }
+    
+    public void testEndpointConnection() throws Exception {
+        Receiver receiver = new ReceiverComponent();
+        ActivationSpec asReceiver = new ActivationSpec();
+        asReceiver.setComponent(receiver);
+        asReceiver.setService(new QName("service"));
+        asReceiver.setEndpoint("endpoint");
+        
+        Sender sender = new SenderComponent();
+        ActivationSpec asSender = new ActivationSpec();
+        asSender.setComponent(sender);
+        asSender.setDestinationService(new QName("service"));
+        asSender.setDestinationEndpoint("linkedEndpoint");
+        
+        container.activateComponent(asReceiver);
+        container.activateComponent(asSender);
+        container.getRegistry().registerEndpointConnection(new QName("service"), "linkedEndpoint", new QName("service"), "endpoint", null);
+        
+        sender.sendMessages(1);
+        receiver.getMessageList().assertMessagesReceived(1);
+    }
+    
+    public void testInterfaceConnection() throws Exception {
+        Receiver receiver = new ReceiverComponent();
+        ActivationSpec asReceiver = new ActivationSpec();
+        asReceiver.setComponent(receiver);
+        asReceiver.setService(new QName("service"));
+        asReceiver.setEndpoint("endpoint");
+        
+        Sender sender = new SenderComponent();
+        ActivationSpec asSender = new ActivationSpec();
+        asSender.setComponent(sender);
+        asSender.setDestinationInterface(new QName("interface"));
+        
+        container.activateComponent(asReceiver);
+        container.activateComponent(asSender);
+        container.getRegistry().registerInterfaceConnection(new QName("interface"), new QName("service"), "endpoint");
+        
+        sender.sendMessages(1);
+        receiver.getMessageList().assertMessagesReceived(1);
+    }
+    
+}

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java Mon Mar  6 15:19:33 2006
@@ -69,12 +69,6 @@
         receiver = new ReceiverComponent();
         sender = new SenderComponent();
         sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
-
-        senderContainer.activateComponent(new ActivationSpec("sender", sender));
-        receiverContainer.activateComponent(new ActivationSpec("receiver", receiver));
-
-        
-        Thread.sleep(2000);
     }
     
     protected void tearDown() throws Exception{
@@ -85,9 +79,13 @@
     }
     
     public void testInOnly() throws Exception {
-      sender.sendMessages(NUM_MESSAGES);
-      Thread.sleep(3000);
-      receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
+        senderContainer.activateComponent(new ActivationSpec("sender", sender));
+        receiverContainer.activateComponent(new ActivationSpec("receiver", receiver));
+        Thread.sleep(1000);
+
+        sender.sendMessages(NUM_MESSAGES);
+        Thread.sleep(3000);
+        receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
     }
 
     public void testClusteredInOnly() throws Exception {

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java Mon Mar  6 15:19:33 2006
@@ -20,6 +20,8 @@
 import org.apache.servicemix.jbi.container.JBIContainer;
 import org.apache.servicemix.jbi.nmr.flow.Flow;
 import org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.tck.SenderComponent;
 import org.springframework.core.io.ClassPathResource;
 
 import junit.framework.TestCase;
@@ -61,6 +63,9 @@
         }
         long t1 = System.currentTimeMillis();
         System.err.println(t1 - t0);
+        for (int i = 0; i < containers.length; i++) {
+            containers[i].activateComponent(new ReceiverComponent(), "receiver");
+        }
         for (int i = 0; i < containers.length; i++) {
             containers[i].stop();
             printNodes(containers);

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java Mon Mar  6 15:19:33 2006
@@ -59,7 +59,7 @@
         assertFalse(e1.equals(e2));
         ComponentNameSpace cns2 = new ComponentNameSpace("myContainer", "myName", "myId2");
         e2 = new InternalEndpoint(cns2, "myEndpoint1", new QName("myService"));
-        assertFalse(e1.equals(e2));
+        assertTrue(e1.equals(e2));
         cns2 = new ComponentNameSpace("myContainer", "myName", "myId");
         e2 = new InternalEndpoint(cns2, "myEndpoint1", new QName("myService"));
         assertTrue(e1.equals(e2));