You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/03/07 00:19:38 UTC
svn commit: r383702 [3/3] - in /incubator/servicemix/trunk:
servicemix-components/src/test/resources/org/apache/servicemix/components/email/
servicemix-core/src/main/java/org/apache/servicemix/jbi/audit/
servicemix-core/src/main/java/org/apache/service...
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Mon Mar 6 15:19:33 2006
@@ -16,7 +16,6 @@
package org.apache.servicemix.jbi.nmr.flow.jca;
import java.io.Serializable;
-import java.util.Iterator;
import java.util.Map;
import java.util.Set;
@@ -24,6 +23,7 @@
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.DeliveryMode;
@@ -46,7 +46,6 @@
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ActiveMQObjectMessage;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
@@ -62,15 +61,14 @@
import org.apache.geronimo.connector.outbound.connectionmanagerconfig.XATransactions;
import org.apache.geronimo.connector.work.GeronimoWorkManager;
import org.apache.geronimo.transaction.context.TransactionContextManager;
-import org.apache.servicemix.jbi.framework.ComponentConnector;
+import org.apache.servicemix.jbi.event.EndpointAdapter;
+import org.apache.servicemix.jbi.event.EndpointEvent;
+import org.apache.servicemix.jbi.event.EndpointListener;
import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.apache.servicemix.jbi.framework.ComponentPacket;
-import org.apache.servicemix.jbi.framework.ComponentPacketEvent;
-import org.apache.servicemix.jbi.framework.ComponentPacketEventListener;
-import org.apache.servicemix.jbi.framework.LocalComponentConnector;
import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
import org.apache.servicemix.jbi.nmr.Broker;
import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
import org.jencks.JCAConnector;
import org.jencks.SingletonEndpointFactory;
import org.jencks.factory.ConnectionManagerFactoryBean;
@@ -84,7 +82,7 @@
*
* @version $Revision$
*/
-public class JCAFlow extends AbstractFlow implements MessageListener, ComponentPacketEventListener {
+public class JCAFlow extends AbstractFlow implements MessageListener {
private static final Log log = LogFactory.getLog(JCAFlow.class);
private static final String INBOUND_PREFIX = "org.apache.servicemix.inbound.";
@@ -95,8 +93,6 @@
private Connection connection;
private String broadcastDestinationName = "org.apache.servicemix.JCAFlow";
private Topic broadcastTopic;
- private Map networkNodeKeyMap = new ConcurrentHashMap();
- private Map networkComponentKeyMap = new ConcurrentHashMap();
private Map connectorMap = new ConcurrentHashMap();
private AtomicBoolean started = new AtomicBoolean(false);
private Set subscriberSet=new CopyOnWriteArraySet();
@@ -110,6 +106,8 @@
private Topic advisoryTopic;
private MessageConsumer advisoryConsumer;
+ private EndpointListener endpointListener;
+
/**
* The type of Flow
*
@@ -227,8 +225,19 @@
* @throws JBIException
*/
public void init(Broker broker, String subType) throws JBIException {
+ log.info(broker.getContainerName() + ": Initializing jca flow");
super.init(broker, subType);
- broker.getRegistry().addComponentPacketListener(this);
+ // Create and register endpoint listener
+ endpointListener = new EndpointAdapter() {
+ public void internalEndpointRegistered(EndpointEvent event) {
+ onInternalEndpointRegistered(event, true);
+ }
+
+ public void internalEndpointUnregistered(EndpointEvent event) {
+ onInternalEndpointUnregistered(event, true);
+ }
+ };
+ broker.getContainer().addListener(endpointListener);
try {
resourceAdapter = createResourceAdapter();
@@ -248,17 +257,6 @@
mcf.setResourceAdapter(resourceAdapter);
connectionFactory = (ConnectionFactory) mcf.createConnectionFactory(getConnectionManager());
- // Inbound broadcast
- ac = new ActiveMQActivationSpec();
- ac.setDestinationType("javax.jms.Topic");
- ac.setDestination(broadcastDestinationName);
- broadcastConnector = new JCAConnector();
- broadcastConnector.setBootstrapContext(getBootstrapContext());
- broadcastConnector.setActivationSpec(ac);
- broadcastConnector.setResourceAdapter(resourceAdapter);
- broadcastConnector.setEndpointFactory(new SingletonEndpointFactory(this));
- broadcastConnector.afterPropertiesSet();
-
// Outbound broadcast
connection = ((ActiveMQResourceAdapter) resourceAdapter).makeConnection();
connection.start();
@@ -283,10 +281,43 @@
if (started.compareAndSet(false, true)) {
super.start();
try {
- advisoryConsumer=broadcastSession.createConsumer(advisoryTopic);
- advisoryConsumer.setMessageListener(this);
+ // Inbound broadcast
+ ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
+ ac.setDestinationType("javax.jms.Topic");
+ ac.setDestination(broadcastDestinationName);
+ broadcastConnector = new JCAConnector();
+ broadcastConnector.setBootstrapContext(getBootstrapContext());
+ broadcastConnector.setActivationSpec(ac);
+ broadcastConnector.setResourceAdapter(resourceAdapter);
+ broadcastConnector.setEndpointFactory(new SingletonEndpointFactory(new MessageListener() {
+ public void onMessage(Message message) {
+ try {
+ Object obj = ((ObjectMessage) message).getObject();
+ if (obj instanceof EndpointEvent) {
+ EndpointEvent event = (EndpointEvent) obj;
+ if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+ onRemoteEndpointRegistered(event);
+ } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+ onRemoteEndpointUnregistered(event);
+ }
+ }
+ } catch (Exception e) {
+ log.error("Error processing incoming broadcast message", e);
+ }
+ }
+ }));
+ broadcastConnector.afterPropertiesSet();
+
+ advisoryConsumer = broadcastSession.createConsumer(advisoryTopic);
+ advisoryConsumer.setMessageListener(new MessageListener() {
+ public void onMessage(Message message) {
+ if (started.get()) {
+ onAdvisoryMessage(((ActiveMQMessage) message).getDataStructure());
+ }
+ }
+ });
}
- catch (JMSException e) {
+ catch (Exception e) {
throw new JBIException("JMSException caught in start: " + e.getMessage(), e);
}
}
@@ -313,6 +344,8 @@
public void shutDown() throws JBIException {
super.shutDown();
stop();
+ // Remove endpoint listener
+ broker.getContainer().removeListener(endpointListener);
// Destroy connectors
while (!connectorMap.isEmpty()) {
JCAConnector connector = (JCAConnector) connectorMap.remove(connectorMap.keySet().iterator().next());
@@ -365,41 +398,60 @@
return true;
}
- /**
- * Process state changes in Components
- *
- * @param event
- */
- public void onEvent(final ComponentPacketEvent event){
+ public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
+ if (!started.get()) {
+ return;
+ }
+ try {
+ String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+ if(!connectorMap.containsKey(key)){
+ ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
+ ac.setDestinationType("javax.jms.Queue");
+ ac.setDestination(INBOUND_PREFIX + key);
+ JCAConnector connector = new JCAConnector();
+ connector.setBootstrapContext(getBootstrapContext());
+ connector.setActivationSpec(ac);
+ connector.setResourceAdapter(resourceAdapter);
+ connector.setEndpointFactory(new SingletonEndpointFactory(this, getTransactionManager()));
+ connector.afterPropertiesSet();
+ connectorMap.put(key, connector);
+ }
+ // broadcast change to the network
+ if (broadcast) {
+ log.info(broker.getContainerName() + ": broadcasting info for " + event);
+ sendJmsMessage(broadcastTopic, event, false, false);
+ }
+ } catch (Exception e) {
+ log.error("Cannot create consumer for " + event.getEndpoint(), e);
+ }
+ }
+
+ public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
try{
- String componentName=event.getPacket().getComponentNameSpace().getName();
- if(event.getStatus()==ComponentPacketEvent.ACTIVATED){
- if(!connectorMap.containsKey(componentName)){
- ActiveMQActivationSpec ac = new ActiveMQActivationSpec();
- ac.setDestinationType("javax.jms.Queue");
- ac.setDestination(INBOUND_PREFIX+componentName);
- JCAConnector connector=new JCAConnector();
- connector.setBootstrapContext(getBootstrapContext());
- connector.setActivationSpec(ac);
- connector.setResourceAdapter(resourceAdapter);
- connector.setEndpointFactory(new SingletonEndpointFactory(this,getTransactionManager()));
- connector.afterPropertiesSet();
- connectorMap.put(componentName,connector);
- }
- }else if(event.getStatus()==ComponentPacketEvent.DEACTIVATED){
- JCAConnector connector=(JCAConnector) connectorMap.remove(componentName);
- if(connector!=null){
- connector.destroy();
- }
+ String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+ JCAConnector connector=(JCAConnector) connectorMap.remove(key);
+ if(connector!=null){
+ connector.destroy();
}
// broadcast change to the network
- log.info("broadcast to internal JMS network: " + event);
- sendJmsMessage(broadcastTopic, event, false, false);
- }catch(Exception e){
- log.error("failed to broadcast to the internal JMS network: "+event,e);
+ if (broadcast) {
+ log.info(broker.getContainerName() + ": broadcasting info for " + event);
+ sendJmsMessage(broadcastTopic, event, false, false);
+ }
+ } catch (Exception e) {
+ log.error("Cannot destroy consumer for " + event, e);
}
}
+
+ public void onRemoteEndpointRegistered(EndpointEvent event) {
+ log.info(broker.getContainerName() + ": adding remote endpoint: " + event.getEndpoint());
+ broker.getRegistry().registerRemoteEndpoint(event.getEndpoint());
+ }
+ public void onRemoteEndpointUnregistered(EndpointEvent event) {
+ log.info(broker.getContainerName() + ": removing remote endpoint: " + event.getEndpoint());
+ broker.getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
+ }
/**
* Distribute an ExchangePacket
@@ -418,25 +470,19 @@
* @throws MessagingException
*/
public void doRouting(final MessageExchangeImpl me) throws MessagingException {
-
- ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
- ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
- if (cc != null) {
- try {
- final String componentName = cc.getComponentNameSpace().getName();
- String destination;
- if (me.getRole() == Role.PROVIDER){
- destination = INBOUND_PREFIX + componentName;
- }else {
- destination = INBOUND_PREFIX + id.getContainerName();
- }
- sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
- } catch (Exception e) {
- log.error("Failed to send exchange: " + me + " internal JMS Network", e);
- throw new MessagingException(e);
+ // let ActiveMQ do the routing ...
+ try {
+ String destination;
+ if (me.getRole() == Role.PROVIDER) {
+ destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+ } else {
+ ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
+ destination = INBOUND_PREFIX + id.getContainerName();
}
- } else {
- throw new MessagingException("No component with id (" + id + ") - Couldn't route MessageExchange " + me);
+ sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
+ } catch (JMSException e) {
+ log.error("Failed to send exchange: " + me + " internal JMS Network", e);
+ throw new MessagingException(e);
}
}
@@ -446,50 +492,26 @@
* @param message
*/
public void onMessage(Message message) {
- if (message == null) {
- return;
- }
try {
- if (message instanceof ObjectMessage) {
+ if (message != null && started.get()) {
ObjectMessage objMsg = (ObjectMessage) message;
- Object obj = objMsg.getObject();
- if (obj != null) {
- if (obj instanceof ComponentPacketEvent) {
- ComponentPacketEvent event = (ComponentPacketEvent) obj;
- String containerName = event.getPacket().getComponentNameSpace().getContainerName();
- processInBoundPacket(containerName, event);
- }
- else if (obj instanceof MessageExchangeImpl) {
- // Hack for redelivery: AMQ is too optimized and the object is the same upon redelivery
- // so that there are side effect (the exchange state may have been modified)
- // See http://jira.activemq.org/jira/browse/AMQ-519
- obj = ((ActiveMQObjectMessage) ((ActiveMQObjectMessage) message).copy()).getObject();
- MessageExchangeImpl me = (MessageExchangeImpl) obj;
- TransactionManager tm = (TransactionManager) getTransactionManager();
- if (tm != null) {
- me.setTransactionContext(tm.getTransaction());
- }
- super.doRouting(me);
- }
+ final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
+ // Hack for redelivery: AMQ is too optimized and the object is the same upon redelivery
+ // so that there are side effect (the exchange state may have been modified)
+ // See http://jira.activemq.org/jira/browse/AMQ-519
+ //me = (MessageExchangeImpl) ((ActiveMQObjectMessage) ((ActiveMQObjectMessage) message).copy()).getObject();
+ TransactionManager tm = (TransactionManager) getTransactionManager();
+ if (tm != null) {
+ me.setTransactionContext(tm.getTransaction());
}
- } else if (message instanceof ActiveMQMessage) {
- Object obj = ((ActiveMQMessage) message).getDataStructure();
- if(obj instanceof ConsumerInfo){
- ConsumerInfo info=(ConsumerInfo) obj;
- subscriberSet.add(info.getConsumerId().getConnectionId());
- if(started.get()){
- for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
- LocalComponentConnector lcc=(LocalComponentConnector) i.next();
- ComponentPacket packet=lcc.getPacket();
- ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
- onEvent(cpe);
- }
- }
- }else if(obj instanceof RemoveInfo){
- ConsumerId id=(ConsumerId) ((RemoveInfo) obj).getObjectId();
- subscriberSet.remove(id.getConnectionId());
- removeAllPackets(id.getConnectionId());
- }
+ if (me.getDestinationId() == null) {
+ ServiceEndpoint se = me.getEndpoint();
+ se = broker.getRegistry()
+ .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
+ me.setEndpoint(se);
+ me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
+ }
+ super.doRouting(me);
}
}
catch (JMSException jmsEx) {
@@ -503,82 +525,26 @@
}
}
- /**
- * Process Inbound packets
- *
- * @param containerName
- * @param event
- */
- protected void processInBoundPacket(String containerName, ComponentPacketEvent event) {
- ComponentPacket packet = event.getPacket();
- if (!packet.getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
- log.info("received from internal JMS network: " + event);
- int eventStatus = event.getStatus();
- switch (eventStatus) {
- case ComponentPacketEvent.ACTIVATED:
- addRemotePacket(containerName, packet);
- break;
- case ComponentPacketEvent.DEACTIVATED:
- removeRemotePacket(containerName, packet);
- break;
- case ComponentPacketEvent.STATE_CHANGE:
- updateRemotePacket(containerName, packet);
- break;
- default:
- log.warn("Unable to determine ComponentPacketEvent type: " + eventStatus + " for packet: " + packet);
- }
- }
- }
-
- private void addRemotePacket(String containerName, ComponentPacket packet) {
- networkComponentKeyMap.put(packet.getComponentNameSpace(), containerName);
- Set set = (Set) networkNodeKeyMap.get(containerName);
- if (set == null) {
- set = new CopyOnWriteArraySet();
- networkNodeKeyMap.put(containerName, set);
- }
- ComponentConnector cc = new ComponentConnector(packet);
- log.info("Adding Remote Component: " + cc);
- broker.getRegistry().addRemoteComponentConnector(cc);
- set.add(packet);
- }
-
- private void updateRemotePacket(String containerName, ComponentPacket packet) {
- Set set = (Set) networkNodeKeyMap.get(containerName);
- if (set != null) {
- set.remove(packet);
- set.add(packet);
- }
- ComponentConnector cc = new ComponentConnector(packet);
- log.info("Updating remote Component: " + cc);
- broker.getRegistry().updateRemoteComponentConnector(cc);
- }
-
- private void removeRemotePacket(String containerName, ComponentPacket packet) {
- networkComponentKeyMap.remove(packet.getComponentNameSpace());
- Set set = (Set) networkNodeKeyMap.get(containerName);
- if (set != null) {
- set.remove(packet);
- ComponentConnector cc = new ComponentConnector(packet);
- log.info("Removing remote Component: " + cc);
- broker.getRegistry().removeRemoteComponentConnector(cc);
- if (set.isEmpty()) {
- networkNodeKeyMap.remove(containerName);
+ protected void onAdvisoryMessage(Object obj) {
+ if (obj instanceof ConsumerInfo) {
+ ConsumerInfo info = (ConsumerInfo) obj;
+ subscriberSet.add(info.getConsumerId().getConnectionId());
+ ServiceEndpoint[] endpoints = broker.getRegistry().getEndpointsForInterface(null);
+ for (int i = 0; i < endpoints.length; i++) {
+ if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+ onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+ EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+ }
}
+ } else if (obj instanceof RemoveInfo) {
+ ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
+ subscriberSet.remove(id.getConnectionId());
+ removeAllPackets(id.getConnectionId());
}
}
private void removeAllPackets(String containerName) {
- Set set = (Set) networkNodeKeyMap.remove(containerName);
- if (set != null) {
- for (Iterator i = set.iterator();i.hasNext();) {
- ComponentPacket packet = (ComponentPacket) i.next();
- ComponentConnector cc = new ComponentConnector(packet);
- log.info("Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
- broker.getRegistry().removeRemoteComponentConnector(cc);
- networkComponentKeyMap.remove(packet.getComponentNameSpace());
- }
- }
+ //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
}
public ConnectionManager getConnectionManager() throws Exception {
@@ -643,4 +609,5 @@
connection.close();
}
}
+
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlow.java Mon Mar 6 15:19:33 2006
@@ -15,35 +15,15 @@
*/
package org.apache.servicemix.jbi.nmr.flow.jms;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
-import org.apache.activemq.advisory.AdvisorySupport;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.apache.activemq.command.ActiveMQMessage;
-import org.apache.activemq.command.ConsumerId;
-import org.apache.activemq.command.ConsumerInfo;
-import org.apache.activemq.command.RemoveInfo;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.jbi.framework.ComponentConnector;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.apache.servicemix.jbi.framework.ComponentPacket;
-import org.apache.servicemix.jbi.framework.ComponentPacketEvent;
-import org.apache.servicemix.jbi.framework.ComponentPacketEventListener;
-import org.apache.servicemix.jbi.framework.LocalComponentConnector;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.nmr.Broker;
-import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
import javax.jbi.JBIException;
import javax.jbi.messaging.MessageExchange;
import javax.jbi.messaging.MessagingException;
import javax.jbi.messaging.MessageExchange.Role;
+import javax.jbi.servicedesc.ServiceEndpoint;
import javax.jms.DeliveryMode;
import javax.jms.JMSException;
import javax.jms.Message;
@@ -57,38 +37,74 @@
import javax.resource.spi.work.Work;
import javax.resource.spi.work.WorkException;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.advisory.AdvisorySupport;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ConsumerId;
+import org.apache.activemq.command.ConsumerInfo;
+import org.apache.activemq.command.RemoveInfo;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.jbi.event.EndpointAdapter;
+import org.apache.servicemix.jbi.event.EndpointEvent;
+import org.apache.servicemix.jbi.event.EndpointListener;
+import org.apache.servicemix.jbi.framework.ComponentNameSpace;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import org.apache.servicemix.jbi.servicedesc.InternalEndpoint;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Use for message routing among a network of containers. All routing/registration happens automatically.
*
* @version $Revision$
*/
-public class JMSFlow extends AbstractFlow implements MessageListener, ComponentPacketEventListener {
-
+public class JMSFlow extends AbstractFlow implements MessageListener {
+
private static final Log log = LogFactory.getLog(JMSFlow.class);
+
private static final String INBOUND_PREFIX = "org.apache.servicemix.inbound.";
+
private String jmsURL = "peer://org.apache.servicemix?persistent=false";
+
private String userName;
+
private String password;
+
private ActiveMQConnectionFactory connectionFactory;
+
private ActiveMQConnection connection;
+
private String broadcastDestinationName = "org.apache.servicemix.JMSFlow";
+
private MessageProducer queueProducer;
+
private MessageProducer topicProducer;
+
private Topic broadcastTopic;
+
private Session broadcastSession;
+
private MessageConsumer broadcastConsumer;
+
private Session inboundSession;
+
private MessageConsumer advisoryConsumer;
- private Set subscriberSet=new CopyOnWriteArraySet();
- private Map networkNodeKeyMap = new ConcurrentHashMap();
- private Map networkComponentKeyMap = new ConcurrentHashMap();
+
+ private Set subscriberSet = new CopyOnWriteArraySet();
+
private Map consumerMap = new ConcurrentHashMap();
+
private AtomicBoolean started = new AtomicBoolean(false);
+ private EndpointListener endpointListener;
+
/**
* The type of Flow
*
@@ -154,7 +170,6 @@
this.connectionFactory = connectionFactory;
}
-
/**
* @return Returns the broadcastDestinationName.
*/
@@ -180,7 +195,7 @@
}
return true;
}
-
+
/**
* Initialize the Region
*
@@ -190,13 +205,22 @@
public void init(Broker broker, String subType) throws JBIException {
log.info(broker.getContainerName() + ": Initializing jms flow");
super.init(broker, subType);
- broker.getRegistry().addComponentPacketListener(this);
+ // Create and register endpoint listener
+ endpointListener = new EndpointAdapter() {
+ public void internalEndpointRegistered(EndpointEvent event) {
+ onInternalEndpointRegistered(event, true);
+ }
+
+ public void internalEndpointUnregistered(EndpointEvent event) {
+ onInternalEndpointUnregistered(event, true);
+ }
+ };
+ broker.getContainer().addListener(endpointListener);
try {
if (connectionFactory == null) {
if (jmsURL != null) {
connectionFactory = new ActiveMQConnectionFactory(jmsURL);
- }
- else {
+ } else {
connectionFactory = new ActiveMQConnectionFactory();
}
}
@@ -207,7 +231,7 @@
}
connection.setClientID(broker.getContainerName());
connection.start();
- inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ inboundSession = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
Queue queue = inboundSession.createQueue(INBOUND_PREFIX + broker.getContainerName());
MessageConsumer inboundQueue = inboundSession.createConsumer(queue);
inboundQueue.setMessageListener(this);
@@ -216,8 +240,7 @@
broadcastTopic = broadcastSession.createTopic(broadcastDestinationName);
topicProducer = broadcastSession.createProducer(broadcastTopic);
topicProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
log.error("Failed to initialize JMSFlow", e);
throw new JBIException(e);
}
@@ -237,11 +260,14 @@
broadcastConsumer.setMessageListener(new MessageListener() {
public void onMessage(Message message) {
try {
- if (started.get()) {
- ObjectMessage objMsg = (ObjectMessage) message;
- ComponentPacketEvent event = (ComponentPacketEvent) objMsg.getObject();
- String containerName = event.getPacket().getComponentNameSpace().getContainerName();
- processInBoundPacket(containerName, event);
+ Object obj = ((ObjectMessage) message).getObject();
+ if (obj instanceof EndpointEvent) {
+ EndpointEvent event = (EndpointEvent) obj;
+ if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_REGISTERED) {
+ onRemoteEndpointRegistered(event);
+ } else if (event.getEventType() == EndpointEvent.INTERNAL_ENDPOINT_UNREGISTERED) {
+ onRemoteEndpointUnregistered(event);
+ }
}
} catch (Exception e) {
log.error("Error processing incoming broadcast message", e);
@@ -257,16 +283,16 @@
}
}
});
-
+
// Start queue consumers for all components
- for (Iterator i = broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();) {
- LocalComponentConnector lcc = (LocalComponentConnector) i.next();
- ComponentPacket packet = lcc.getPacket();
- ComponentPacketEvent cpe = new ComponentPacketEvent(packet, ComponentPacketEvent.ACTIVATED);
- onEvent(cpe, false);
+ ServiceEndpoint[] endpoints = broker.getRegistry().getEndpointsForInterface(null);
+ for (int i = 0; i < endpoints.length; i++) {
+ if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+ onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+ EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), false);
+ }
}
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
JBIException jbiEx = new JBIException("JMSException caught in start: " + e.getMessage());
throw jbiEx;
}
@@ -290,8 +316,7 @@
try {
advisoryConsumer.close();
broadcastConsumer.close();
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
JBIException jbiEx = new JBIException("JMSException caught in stop: " + e.getMessage());
throw jbiEx;
}
@@ -301,11 +326,12 @@
public void shutDown() throws JBIException {
super.shutDown();
stop();
+ // Remove endpoint listener
+ broker.getContainer().removeListener(endpointListener);
if (this.connection != null) {
try {
this.connection.close();
- }
- catch (JMSException e) {
+ } catch (JMSException e) {
log.warn("Error closing JMS Connection", e);
}
}
@@ -320,51 +346,54 @@
return subscriberSet.size();
}
- /**
- * Process state changes in Components
- *
- * @param event
- */
- public void onEvent(ComponentPacketEvent event) {
- onEvent(event, true);
- }
-
- /**
- * Process state changes in Components
- *
- * @param event
- */
- public void onEvent(ComponentPacketEvent event, boolean broadcast) {
+ public void onInternalEndpointRegistered(EndpointEvent event, boolean broadcast) {
+ if (!started.get()) {
+ return;
+ }
try {
- // broadcast internal changes to the network
- if (started.get() && event.getPacket().getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
- String componentName = event.getPacket().getComponentNameSpace().getName();
- if (event.getStatus() == ComponentPacketEvent.ACTIVATED) {
- if (!consumerMap.containsKey(componentName)) {
- Queue queue = inboundSession.createQueue(INBOUND_PREFIX + componentName);
- MessageConsumer consumer = inboundSession.createConsumer(queue);
- consumer.setMessageListener(this);
- consumerMap.put(componentName,consumer);
- }
- } else if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
- MessageConsumer consumer = (MessageConsumer) consumerMap.remove(componentName);
- if (consumer != null){
- consumer.close();
- }
- }
- if (broadcast) {
- ObjectMessage msg = broadcastSession.createObjectMessage(event);
- log.info(broker.getContainerName() + ": broadcasting info for " + event.getPacket().getComponentNameSpace());
- topicProducer.send(msg);
- }
+ String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+ if (!consumerMap.containsKey(key)) {
+ Queue queue = inboundSession.createQueue(INBOUND_PREFIX + key);
+ MessageConsumer consumer = inboundSession.createConsumer(queue);
+ consumer.setMessageListener(this);
+ consumerMap.put(key, consumer);
+ }
+ if (broadcast) {
+ log.info(broker.getContainerName() + ": broadcasting info for " + event);
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ topicProducer.send(msg);
}
+ } catch (Exception e) {
+ log.error("Cannot create consumer for " + event.getEndpoint(), e);
}
- catch (JMSException e) {
- log.error("failed to broadcast to the internal JMS network: " + event, e);
+ }
+
+ public void onInternalEndpointUnregistered(EndpointEvent event, boolean broadcast) {
+ try {
+ String key = event.getEndpoint().getServiceName() + event.getEndpoint().getEndpointName();
+ MessageConsumer consumer = (MessageConsumer) consumerMap.remove(key);
+ if (consumer != null) {
+ consumer.close();
+ }
+ if (broadcast) {
+ ObjectMessage msg = broadcastSession.createObjectMessage(event);
+ log.info(broker.getContainerName() + ": broadcasting info for " + event);
+ topicProducer.send(msg);
+ }
+ } catch (Exception e) {
+ log.error("Cannot destroy consumer for " + event, e);
}
}
-
+ public void onRemoteEndpointRegistered(EndpointEvent event) {
+ log.info(broker.getContainerName() + ": adding remote endpoint: " + event.getEndpoint());
+ broker.getRegistry().registerRemoteEndpoint(event.getEndpoint());
+ }
+
+ public void onRemoteEndpointUnregistered(EndpointEvent event) {
+ log.info(broker.getContainerName() + ": removing remote endpoint: " + event.getEndpoint());
+ broker.getRegistry().unregisterRemoteEndpoint(event.getEndpoint());
+ }
/**
* Distribute an ExchangePacket
@@ -375,35 +404,29 @@
protected void doSend(MessageExchangeImpl me) throws MessagingException {
doRouting(me);
}
-
+
/**
* Distribute an ExchangePacket
*
* @param me
* @throws MessagingException
*/
- public void doRouting(MessageExchangeImpl me) throws MessagingException{
- ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
- ComponentConnector cc = broker.getRegistry().getComponentConnector(id);
- if (cc != null) {
- // let ActiveMQ do the routing ...
- try{
- String componentName = cc.getComponentNameSpace().getName();
- String destination = "";
- if (me.getRole() == Role.PROVIDER){
- destination = INBOUND_PREFIX + componentName;
- }else {
- destination = INBOUND_PREFIX + id.getContainerName();
- }
- Queue queue=inboundSession.createQueue(destination);
- ObjectMessage msg=inboundSession.createObjectMessage(me);
- queueProducer.send(queue,msg);
- }catch(JMSException e){
- log.error("Failed to send exchange: "+me+" internal JMS Network",e);
- throw new MessagingException(e);
+ public void doRouting(MessageExchangeImpl me) throws MessagingException {
+ // let ActiveMQ do the routing ...
+ try {
+ String destination;
+ if (me.getRole() == Role.PROVIDER) {
+ destination = INBOUND_PREFIX + me.getEndpoint().getServiceName() + me.getEndpoint().getEndpointName();
+ } else {
+ ComponentNameSpace id = me.getRole() == Role.PROVIDER ? me.getDestinationId() : me.getSourceId();
+ destination = INBOUND_PREFIX + id.getContainerName();
}
- }else{
- throw new MessagingException("No component with id ("+id+") - Couldn't route MessageExchange "+me);
+ Queue queue = inboundSession.createQueue(destination);
+ ObjectMessage msg = inboundSession.createObjectMessage(me);
+ queueProducer.send(queue, msg);
+ } catch (JMSException e) {
+ log.error("Failed to send exchange: " + me + " internal JMS Network", e);
+ throw new MessagingException(e);
}
}
@@ -414,7 +437,7 @@
*/
public void onMessage(final Message message) {
try {
- if (started.get()) {
+ if (message != null && started.get()) {
ObjectMessage objMsg = (ObjectMessage) message;
final MessageExchangeImpl me = (MessageExchangeImpl) objMsg.getObject();
// Dispatch the message in another thread so as to free the jms session
@@ -423,101 +446,49 @@
broker.getWorkManager().scheduleWork(new Work() {
public void release() {
}
+
public void run() {
try {
+ if (me.getDestinationId() == null) {
+ ServiceEndpoint se = me.getEndpoint();
+ se = broker.getRegistry()
+ .getInternalEndpoint(se.getServiceName(), se.getEndpointName());
+ me.setEndpoint(se);
+ me.setDestinationId(((InternalEndpoint) se).getComponentNameSpace());
+ }
JMSFlow.super.doRouting(me);
- }
- catch (Throwable e) {
+ } catch (Throwable e) {
log.error("Caught an exception routing ExchangePacket: ", e);
}
}
});
}
- }
- catch (JMSException jmsEx) {
+ } catch (JMSException jmsEx) {
log.error("Caught an exception unpacking JMS Message: ", jmsEx);
- }
- catch (WorkException e) {
+ } catch (WorkException e) {
log.error("Caught an exception routing ExchangePacket: ", e);
}
}
-
+
protected void onAdvisoryMessage(Object obj) {
if (obj instanceof ConsumerInfo) {
ConsumerInfo info = (ConsumerInfo) obj;
subscriberSet.add(info.getConsumerId().getConnectionId());
- for(Iterator i=broker.getRegistry().getLocalComponentConnectors().iterator();i.hasNext();){
- LocalComponentConnector lcc=(LocalComponentConnector) i.next();
- ComponentPacket packet=lcc.getPacket();
- ComponentPacketEvent cpe=new ComponentPacketEvent(packet,ComponentPacketEvent.ACTIVATED);
- onEvent(cpe);
+ ServiceEndpoint[] endpoints = broker.getRegistry().getEndpointsForInterface(null);
+ for (int i = 0; i < endpoints.length; i++) {
+ if (endpoints[i] instanceof InternalEndpoint && ((InternalEndpoint) endpoints[i]).isLocal()) {
+ onInternalEndpointRegistered(new EndpointEvent(endpoints[i],
+ EndpointEvent.INTERNAL_ENDPOINT_REGISTERED), true);
+ }
}
- }else if (obj instanceof RemoveInfo) {
+ } else if (obj instanceof RemoveInfo) {
ConsumerId id = (ConsumerId) ((RemoveInfo) obj).getObjectId();
subscriberSet.remove(id.getConnectionId());
removeAllPackets(id.getConnectionId());
}
}
- /**
- * Process Inbound packets
- *
- * @param containerName
- * @param event
- */
- protected void processInBoundPacket(String containerName, ComponentPacketEvent event) {
- ComponentPacket packet = event.getPacket();
- if (!packet.getComponentNameSpace().getContainerName().equals(broker.getContainerName())) {
- int eventStatus = event.getStatus();
- switch (eventStatus) {
- case ComponentPacketEvent.ACTIVATED:
- case ComponentPacketEvent.STATE_CHANGE:
- updateRemotePacket(containerName, packet);
- break;
- case ComponentPacketEvent.DEACTIVATED:
- removeRemotePacket(containerName, packet);
- break;
- default:
- log.warn("Unable to determine ComponentPacketEvent type: " + eventStatus + " for packet: " + packet);
- }
- }
- }
-
- private void updateRemotePacket(String containerName, ComponentPacket packet) {
- Set set = (Set) networkNodeKeyMap.get(containerName);
- if (set != null) {
- set.remove(packet);
- set.add(packet);
- }
- ComponentConnector cc = new ComponentConnector(packet);
- log.info(broker.getContainerName() + ": updating remote component: " + cc);
- broker.getRegistry().updateRemoteComponentConnector(cc);
- }
-
- private void removeRemotePacket(String containerName, ComponentPacket packet) {
- networkComponentKeyMap.remove(packet.getComponentNameSpace());
- Set set = (Set) networkNodeKeyMap.get(containerName);
- if (set != null) {
- set.remove(packet);
- ComponentConnector cc = new ComponentConnector(packet);
- log.info(broker.getContainerName() + ": removing remote component: " + cc);
- broker.getRegistry().removeRemoteComponentConnector(cc);
- if (set.isEmpty()) {
- networkNodeKeyMap.remove(containerName);
- }
- }
- }
-
private void removeAllPackets(String containerName) {
- Set set = (Set) networkNodeKeyMap.remove(containerName);
- if (set != null) {
- for (Iterator i = set.iterator();i.hasNext();) {
- ComponentPacket packet = (ComponentPacket) i.next();
- ComponentConnector cc = new ComponentConnector(packet);
- log.info(broker.getContainerName() + ": Network node: " + containerName + " Stopped. Removing remote Component: " + cc);
- broker.getRegistry().removeRemoteComponentConnector(cc);
- networkComponentKeyMap.remove(packet.getComponentNameSpace());
- }
- }
+ //TODO: broker.getRegistry().unregisterRemoteEndpoints(containerName);
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/seda/SedaFlow.java Mon Mar 6 15:19:33 2006
@@ -15,18 +15,8 @@
*/
package org.apache.servicemix.jbi.nmr.flow.seda;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
-
-import org.apache.commons.logging.Log;
-import org.apache.commons.logging.LogFactory;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.apache.servicemix.jbi.framework.ComponentPacketEvent;
-import org.apache.servicemix.jbi.framework.ComponentPacketEventListener;
-import org.apache.servicemix.jbi.management.AttributeInfoHelper;
-import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
-import org.apache.servicemix.jbi.nmr.Broker;
-import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+import java.util.Iterator;
+import java.util.Map;
import javax.jbi.JBIException;
import javax.jbi.management.LifeCycleMBean;
@@ -36,8 +26,19 @@
import javax.management.MBeanAttributeInfo;
import javax.management.ObjectName;
-import java.util.Iterator;
-import java.util.Map;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+import org.apache.servicemix.jbi.event.ComponentAdapter;
+import org.apache.servicemix.jbi.event.ComponentEvent;
+import org.apache.servicemix.jbi.event.ComponentListener;
+import org.apache.servicemix.jbi.framework.ComponentNameSpace;
+import org.apache.servicemix.jbi.management.AttributeInfoHelper;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.apache.servicemix.jbi.nmr.Broker;
+import org.apache.servicemix.jbi.nmr.flow.AbstractFlow;
+
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* The SedaFlow introduces a simple event staging between the internal processes
@@ -47,11 +48,12 @@
*
* @version $Revision$
*/
-public class SedaFlow extends AbstractFlow implements ComponentPacketEventListener {
+public class SedaFlow extends AbstractFlow {
private static final Log log = LogFactory.getLog(SedaFlow.class);
protected Map queueMap = new ConcurrentHashMap();
protected int capacity = 100;
protected AtomicBoolean started = new AtomicBoolean(false);
+ protected ComponentListener listener;
/**
* The type of Flow
@@ -70,7 +72,12 @@
*/
public void init(Broker broker, String subType) throws JBIException {
super.init(broker, subType);
- broker.getRegistry().addComponentPacketListener(this);
+ listener = new ComponentAdapter() {
+ public void componentShutDown(ComponentEvent event) {
+ onComponentShutdown(event.getComponent().getComponentNameSpace());
+ }
+ };
+ broker.getContainer().addListener(listener);
}
/**
@@ -118,8 +125,8 @@
* @throws JBIException
*/
public void shutDown() throws JBIException {
- broker.getRegistry().removeComponentPacketListener(this);
- for (Iterator i = queueMap.values().iterator();i.hasNext();) {
+ broker.getContainer().removeListener(listener);
+ for (Iterator i = queueMap.values().iterator(); i.hasNext();) {
SedaQueue queue = (SedaQueue) i.next();
queue.shutDown();
unregisterQueue(queue);
@@ -177,19 +184,15 @@
*
* @param event
*/
- public synchronized void onEvent(ComponentPacketEvent event) {
- // watch for deactivations
- if (event.getStatus() == ComponentPacketEvent.DEACTIVATED) {
- ComponentNameSpace cns = event.getPacket().getComponentNameSpace();
- SedaQueue queue = (SedaQueue) queueMap.remove(cns);
- if (queue != null) {
- try {
- queue.shutDown();
- unregisterQueue(queue);
- }
- catch (JBIException e) {
- log.error("Caught exception stopping SedaQueue: " + queue);
- }
+ public synchronized void onComponentShutdown(ComponentNameSpace cns) {
+ SedaQueue queue = (SedaQueue) queueMap.remove(cns);
+ if (queue != null) {
+ try {
+ queue.shutDown();
+ unregisterQueue(queue);
+ }
+ catch (JBIException e) {
+ log.error("Caught exception stopping SedaQueue: " + queue);
}
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/AbstractServiceEndpoint.java Mon Mar 6 15:19:33 2006
@@ -36,8 +36,12 @@
* get the id of the ComponentConnector
* @return the id
*/
- public ComponentNameSpace getComponentNameSpace(){
+ public ComponentNameSpace getComponentNameSpace() {
return componentName;
+ }
+
+ public void setComponentName(ComponentNameSpace componentName) {
+ this.componentName = componentName;
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/InternalEndpoint.java Mon Mar 6 15:19:33 2006
@@ -16,7 +16,9 @@
package org.apache.servicemix.jbi.servicedesc;
import java.util.ArrayList;
+import java.util.HashMap;
import java.util.List;
+import java.util.Map;
import javax.xml.namespace.QName;
@@ -38,7 +40,7 @@
private String endpointName;
private QName serviceName;
private List interfaces = new ArrayList();
-
+ private transient Map remotes = new HashMap();
/**
@@ -53,10 +55,6 @@
this.serviceName = serviceName;
}
- protected InternalEndpoint() {
- }
-
-
/**
* Get a reference to this endpoint, using an endpoint reference vocabulary
* that is known to the provider.
@@ -107,6 +105,40 @@
}
/**
+ * Retrieve all remote component namespaces where this endpoint is activated
+ * @return component namespaces
+ */
+ public InternalEndpoint[] getRemoteEndpoints() {
+ InternalEndpoint[] result = new InternalEndpoint[remotes.size()];
+ remotes.values().toArray(result);
+ return result;
+ }
+
+ public void addRemoteEndpoint(InternalEndpoint remote) {
+ remotes.put(remote.getComponentNameSpace(), remote);
+ }
+
+ public void removeRemoteEndpoint(InternalEndpoint remote) {
+ remotes.remove(remote.getComponentNameSpace());
+ }
+
+ /**
+ * Check if this endpoint is locally activated
+ * @return true if the endpoint has been activated locally
+ */
+ public boolean isLocal() {
+ return getComponentNameSpace() != null;
+ }
+
+ /**
+ * Check if the endpoint is remotely activated
+ * @return true if the endpoint has been remotely activated
+ */
+ public boolean isClustered() {
+ return remotes != null && remotes.size() > 0;
+ }
+
+ /**
* @param obj
* @return true if equal
*/
@@ -114,8 +146,7 @@
boolean result = false;
if (obj != null && obj instanceof InternalEndpoint){
InternalEndpoint other = (InternalEndpoint)obj;
- result = other.getComponentNameSpace().equals(this.getComponentNameSpace()) &&
- other.serviceName.equals(this.serviceName) &&
+ result = other.serviceName.equals(this.serviceName) &&
other.endpointName.equals(this.endpointName);
}
return result;
@@ -126,8 +157,7 @@
* @return has code
*/
public int hashCode() {
- return getComponentNameSpace().hashCode() ^
- serviceName.hashCode() ^
+ return serviceName.hashCode() ^
endpointName.hashCode() ;
}
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/servicedesc/LinkedEndpoint.java Mon Mar 6 15:19:33 2006
@@ -15,10 +15,9 @@
*/
package org.apache.servicemix.jbi.servicedesc;
-import org.apache.servicemix.jbi.framework.ComponentNameSpace;
-import org.w3c.dom.DocumentFragment;
-
import javax.xml.namespace.QName;
+
+import org.w3c.dom.DocumentFragment;
/**
* Linked endpoints are defined by SA deployment.
Modified: incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/components/logger-component-1.0-exploded.jar/META-INF/jbi.xml Mon Mar 6 15:19:33 2006
@@ -6,7 +6,7 @@
<name>logger-component</name>
<description>An example of a Logger JBI componet that can be configured through a service assembly to consume messages and dump them to the log</description>
</identification>
- <component-class-name description="Component Implementation">org.apache.servicemix.component.logger.LoggerComponent</component-class-name>
+ <component-class-name description="Component Implementation">org.servicemix.component.logger.LoggerComponent</component-class-name>
<component-class-path>
<path-element>lib/logger-component-1.0.jar</path-element>
</component-class-path>
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/framework/ComponentPacketTest.java Mon Mar 6 15:19:33 2006
@@ -25,6 +25,7 @@
public class ComponentPacketTest extends TestCase {
public void testRegisterTwoEndpoints() throws Exception {
+ /*
ComponentPacket packet = new ComponentPacket();
ComponentNameSpace cns = new ComponentNameSpace("container", "component", null);
ServiceEndpoint ep1 = new InternalEndpoint(cns, "endpoint", new QName("urn:foo", "service1"));
@@ -32,6 +33,7 @@
packet.addActiveEndpoint(ep1);
packet.addActiveEndpoint(ep2);
assertEquals(2, packet.getActiveEndpoints().size());
+ */
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/installation/DeploymentTest.java Mon Mar 6 15:19:33 2006
@@ -251,6 +251,7 @@
componentMock.setReturnValue(manager, MockControl.ONE_OR_MORE);
manager.init(null, null);
managerMock.setMatcher(MockControl.ALWAYS_MATCHER);
+ manager.shutDown("su");
replay();
// start container
startContainer(false);
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementAttributesTest.java Mon Mar 6 15:19:33 2006
@@ -53,6 +53,7 @@
container.setRmiPort(namingPort);
container.setCreateMBeanServer(true);
container.init();
+ Thread.sleep(5000);
}
protected void tearDown() throws Exception {
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/management/ManagementContextTest.java Mon Mar 6 15:19:33 2006
@@ -67,6 +67,7 @@
}
public void testRemote() throws Exception {
+ Thread.sleep(5000);
// The address of the connector server
JMXServiceURL url = new JMXServiceURL("service:jmx:rmi:///jndi/rmi://"
+ namingHost + ":" + namingPort + jndiPath);
@@ -100,11 +101,11 @@
}
public void testComponent() throws Exception {
- ObjectName[] names = context.getEngineComponents();
+ ObjectName[] names = context.getPojoComponents();
assertEquals(1, names.length);
EchoComponent echo = new EchoComponent();
container.activateComponent(echo, "echo");
- names = context.getEngineComponents();
+ names = context.getPojoComponents();
assertNotNull(names);
assertEquals(2, names.length);
assertEquals(LifeCycleMBean.STARTED, echo.getCurrentState());
Added: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java?rev=383702&view=auto
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java (added)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/ConnectionsTest.java Mon Mar 6 15:19:33 2006
@@ -0,0 +1,85 @@
+/*
+ * Copyright 2005-2006 The Apache Software Foundation.
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.servicemix.jbi.nmr;
+
+import javax.xml.namespace.QName;
+
+import junit.framework.TestCase;
+
+import org.apache.servicemix.jbi.container.ActivationSpec;
+import org.apache.servicemix.jbi.container.JBIContainer;
+import org.apache.servicemix.tck.Receiver;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.tck.Sender;
+import org.apache.servicemix.tck.SenderComponent;
+
+public class ConnectionsTest extends TestCase {
+
+ private JBIContainer container;
+
+ protected void setUp() throws Exception {
+ container = new JBIContainer();
+ container.setEmbedded(true);
+ container.init();
+ container.start();
+ }
+
+ protected void tearDown() throws Exception {
+ container.shutDown();
+ }
+
+ public void testEndpointConnection() throws Exception {
+ Receiver receiver = new ReceiverComponent();
+ ActivationSpec asReceiver = new ActivationSpec();
+ asReceiver.setComponent(receiver);
+ asReceiver.setService(new QName("service"));
+ asReceiver.setEndpoint("endpoint");
+
+ Sender sender = new SenderComponent();
+ ActivationSpec asSender = new ActivationSpec();
+ asSender.setComponent(sender);
+ asSender.setDestinationService(new QName("service"));
+ asSender.setDestinationEndpoint("linkedEndpoint");
+
+ container.activateComponent(asReceiver);
+ container.activateComponent(asSender);
+ container.getRegistry().registerEndpointConnection(new QName("service"), "linkedEndpoint", new QName("service"), "endpoint", null);
+
+ sender.sendMessages(1);
+ receiver.getMessageList().assertMessagesReceived(1);
+ }
+
+ public void testInterfaceConnection() throws Exception {
+ Receiver receiver = new ReceiverComponent();
+ ActivationSpec asReceiver = new ActivationSpec();
+ asReceiver.setComponent(receiver);
+ asReceiver.setService(new QName("service"));
+ asReceiver.setEndpoint("endpoint");
+
+ Sender sender = new SenderComponent();
+ ActivationSpec asSender = new ActivationSpec();
+ asSender.setComponent(sender);
+ asSender.setDestinationInterface(new QName("interface"));
+
+ container.activateComponent(asReceiver);
+ container.activateComponent(asSender);
+ container.getRegistry().registerInterfaceConnection(new QName("interface"), new QName("service"), "endpoint");
+
+ sender.sendMessages(1);
+ receiver.getMessageList().assertMessagesReceived(1);
+ }
+
+}
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/JMSFlowTest.java Mon Mar 6 15:19:33 2006
@@ -69,12 +69,6 @@
receiver = new ReceiverComponent();
sender = new SenderComponent();
sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
-
- senderContainer.activateComponent(new ActivationSpec("sender", sender));
- receiverContainer.activateComponent(new ActivationSpec("receiver", receiver));
-
-
- Thread.sleep(2000);
}
protected void tearDown() throws Exception{
@@ -85,9 +79,13 @@
}
public void testInOnly() throws Exception {
- sender.sendMessages(NUM_MESSAGES);
- Thread.sleep(3000);
- receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
+ senderContainer.activateComponent(new ActivationSpec("sender", sender));
+ receiverContainer.activateComponent(new ActivationSpec("receiver", receiver));
+ Thread.sleep(1000);
+
+ sender.sendMessages(NUM_MESSAGES);
+ Thread.sleep(3000);
+ receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
}
public void testClusteredInOnly() throws Exception {
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jms/MultipleJMSFlowTest.java Mon Mar 6 15:19:33 2006
@@ -20,6 +20,8 @@
import org.apache.servicemix.jbi.container.JBIContainer;
import org.apache.servicemix.jbi.nmr.flow.Flow;
import org.apache.servicemix.jbi.nmr.flow.jms.JMSFlow;
+import org.apache.servicemix.tck.ReceiverComponent;
+import org.apache.servicemix.tck.SenderComponent;
import org.springframework.core.io.ClassPathResource;
import junit.framework.TestCase;
@@ -61,6 +63,9 @@
}
long t1 = System.currentTimeMillis();
System.err.println(t1 - t0);
+ for (int i = 0; i < containers.length; i++) {
+ containers[i].activateComponent(new ReceiverComponent(), "receiver");
+ }
for (int i = 0; i < containers.length; i++) {
containers[i].stop();
printNodes(containers);
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java?rev=383702&r1=383701&r2=383702&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/servicedesc/InternalEndpointTest.java Mon Mar 6 15:19:33 2006
@@ -59,7 +59,7 @@
assertFalse(e1.equals(e2));
ComponentNameSpace cns2 = new ComponentNameSpace("myContainer", "myName", "myId2");
e2 = new InternalEndpoint(cns2, "myEndpoint1", new QName("myService"));
- assertFalse(e1.equals(e2));
+ assertTrue(e1.equals(e2));
cns2 = new ComponentNameSpace("myContainer", "myName", "myId");
e2 = new InternalEndpoint(cns2, "myEndpoint1", new QName("myService"));
assertTrue(e1.equals(e2));