You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/01/03 14:08:53 UTC
svn commit: r365624 - in /incubator/servicemix/trunk/servicemix-core/src:
main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java
Author: gnodet
Date: Tue Jan 3 05:08:46 2006
New Revision: 365624
URL: http://svn.apache.org/viewcvs?rev=365624&view=rev
Log:
Remove jca flow dependency on spring
Modified:
incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java
Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=365624&r1=365623&r2=365624&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Tue Jan 3 05:08:46 2006
@@ -15,13 +15,36 @@
*/
package org.apache.servicemix.jbi.nmr.flow.jca;
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.ConnectionManager;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.ResourceAdapterInternalException;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.command.ConsumerInfo;
@@ -48,32 +71,10 @@
import org.jencks.JCAConnector;
import org.jencks.SingletonEndpointFactory;
import org.jencks.factory.ConnectionManagerFactoryBean;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-
-import javax.jbi.JBIException;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.ConnectionManager;
-import javax.resource.spi.ResourceAdapter;
-import javax.resource.spi.ResourceAdapterInternalException;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
/**
* Use for message routing among a network of containers. All routing/registration happens automatically.
@@ -98,8 +99,6 @@
private Set subscriberSet=new CopyOnWriteArraySet();
private TransactionContextManager transactionContextManager;
private ConnectionManager connectionManager;
- private JmsTemplate jmsTemplate;
- private JmsTemplate jmsPersistentTemplate;
private BootstrapContext bootstrapContext;
private ResourceAdapter resourceAdapter;
private JCAConnector containerConnector;
@@ -244,11 +243,7 @@
// Outbound connector
ActiveMQManagedConnectionFactory mcf = new ActiveMQManagedConnectionFactory();
mcf.setResourceAdapter(resourceAdapter);
- ConnectionFactory cf = (ConnectionFactory) mcf.createConnectionFactory(getConnectionManager());
- jmsTemplate = new JmsTemplate(cf);
- jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
- jmsPersistentTemplate = new JmsTemplate(cf);
- jmsPersistentTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
+ connectionFactory = (ConnectionFactory) mcf.createConnectionFactory(getConnectionManager());
// Inbound broadcast
ac = new ActiveMQActivationSpec();
@@ -393,12 +388,8 @@
}
}
// broadcast change to the network
- log.info("broadcast to internal JMS network: "+event);
- jmsTemplate.send(broadcastTopic,new MessageCreator(){
- public Message createMessage(Session session) throws JMSException{
- return session.createObjectMessage(event);
- }
- });
+ log.info("broadcast to internal JMS network: " + event);
+ sendJmsMessage(broadcastTopic, event, false, false);
}catch(Exception e){
log.error("failed to broadcast to the internal JMS network: "+event,e);
}
@@ -431,18 +422,13 @@
}
try {
final String componentName = cc.getComponentNameSpace().getName();
- JmsTemplate jt = isPersistent(me) ? jmsPersistentTemplate : jmsTemplate;
- String destination = "";
+ String destination;
if (me.getRole() == Role.PROVIDER){
destination = INBOUND_PREFIX + componentName;
}else {
destination = INBOUND_PREFIX + id.getContainerName();
}
- jt.send(destination, new MessageCreator() {
- public Message createMessage(Session session) throws JMSException {
- return session.createObjectMessage(me);
- }
- });
+ sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
} catch (Exception e) {
log.error("Failed to send exchange: " + me + " internal JMS Network", e);
throw new MessagingException(e);
@@ -631,5 +617,18 @@
public String toString(){
return broker.getContainerName() + " JCAFlow";
+ }
+
+ private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException {
+ Connection connection = connectionFactory.createConnection();
+ try {
+ Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+ ObjectMessage msg = session.createObjectMessage(object);
+ MessageProducer producer = session.createProducer(dest);
+ producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+ producer.send(msg);
+ } finally {
+ connection.close();
+ }
}
}
Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java?rev=365624&r1=365623&r2=365624&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java Tue Jan 3 05:08:46 2006
@@ -104,7 +104,7 @@
broker.stop();
}
- public void XtestInOnly() throws Exception {
+ public void testInOnly() throws Exception {
final SenderComponent sender = new SenderComponent();
final ReceiverComponent receiver = new ReceiverComponent();
sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
@@ -119,7 +119,7 @@
receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
}
- public void XtestTxInOnly() throws Exception {
+ public void testTxInOnly() throws Exception {
final SenderComponent sender = new SenderComponent();
final ReceiverComponent receiver = new ReceiverComponent();
sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
@@ -178,8 +178,6 @@
sender.sendMessages(NUM_MESSAGES);
Thread.sleep(3000);
- System.out.println("REC1 =" + receiver1.getMessageList().getMessageCount());
- System.out.println("REC2 =" + receiver2.getMessageList().getMessageCount());
assertTrue(receiver1.getMessageList().hasReceivedMessage());
assertFalse(receiver2.getMessageList().hasReceivedMessage());
receiver1.getMessageList().flushMessages();