You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2006/01/03 14:08:53 UTC

svn commit: r365624 - in /incubator/servicemix/trunk/servicemix-core/src: main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java

Author: gnodet
Date: Tue Jan  3 05:08:46 2006
New Revision: 365624

URL: http://svn.apache.org/viewcvs?rev=365624&view=rev
Log:
Remove jca flow dependency on spring

Modified:
    incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
    incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java

Modified: incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java?rev=365624&r1=365623&r2=365624&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/main/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlow.java Tue Jan  3 05:08:46 2006
@@ -15,13 +15,36 @@
  */
 package org.apache.servicemix.jbi.nmr.flow.jca;
 
-import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
-import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
-import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
+import java.io.Serializable;
+import java.util.Iterator;
+import java.util.Map;
+import java.util.Set;
 
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessagingException;
+import javax.jbi.messaging.MessageExchange.Role;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.resource.spi.BootstrapContext;
+import javax.resource.spi.ConnectionManager;
+import javax.resource.spi.ResourceAdapter;
+import javax.resource.spi.ResourceAdapterInternalException;
+import javax.transaction.SystemException;
+import javax.transaction.TransactionManager;
 
 import org.apache.activemq.advisory.AdvisorySupport;
 import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQQueue;
 import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.command.ConsumerInfo;
@@ -48,32 +71,10 @@
 import org.jencks.JCAConnector;
 import org.jencks.SingletonEndpointFactory;
 import org.jencks.factory.ConnectionManagerFactoryBean;
-import org.springframework.jms.core.JmsTemplate;
-import org.springframework.jms.core.MessageCreator;
-
-import javax.jbi.JBIException;
-import javax.jbi.messaging.MessagingException;
-import javax.jbi.messaging.MessageExchange.Role;
-import javax.jms.Connection;
-import javax.jms.ConnectionFactory;
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.ObjectMessage;
-import javax.jms.Session;
-import javax.jms.Topic;
-import javax.resource.spi.BootstrapContext;
-import javax.resource.spi.ConnectionManager;
-import javax.resource.spi.ResourceAdapter;
-import javax.resource.spi.ResourceAdapterInternalException;
-import javax.transaction.SystemException;
-import javax.transaction.TransactionManager;
 
-import java.util.Iterator;
-import java.util.Map;
-import java.util.Set;
+import edu.emory.mathcs.backport.java.util.concurrent.ConcurrentHashMap;
+import edu.emory.mathcs.backport.java.util.concurrent.CopyOnWriteArraySet;
+import edu.emory.mathcs.backport.java.util.concurrent.atomic.AtomicBoolean;
 
 /**
  * Use for message routing among a network of containers. All routing/registration happens automatically.
@@ -98,8 +99,6 @@
     private Set subscriberSet=new CopyOnWriteArraySet();
     private TransactionContextManager transactionContextManager;
     private ConnectionManager connectionManager;
-    private JmsTemplate jmsTemplate;
-    private JmsTemplate jmsPersistentTemplate;
     private BootstrapContext bootstrapContext;
     private ResourceAdapter resourceAdapter;
     private JCAConnector containerConnector;
@@ -244,11 +243,7 @@
         	// Outbound connector
         	ActiveMQManagedConnectionFactory mcf = new ActiveMQManagedConnectionFactory();
         	mcf.setResourceAdapter(resourceAdapter);
-        	ConnectionFactory cf = (ConnectionFactory) mcf.createConnectionFactory(getConnectionManager());
-        	jmsTemplate = new JmsTemplate(cf);
-        	jmsTemplate.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
-        	jmsPersistentTemplate = new JmsTemplate(cf);
-        	jmsPersistentTemplate.setDeliveryMode(DeliveryMode.PERSISTENT);
+        	connectionFactory = (ConnectionFactory) mcf.createConnectionFactory(getConnectionManager());
         	
         	// Inbound broadcast
         	ac = new ActiveMQActivationSpec();
@@ -393,12 +388,8 @@
                 }
             }
             // broadcast change to the network
-            log.info("broadcast to internal JMS network: "+event);
-            jmsTemplate.send(broadcastTopic,new MessageCreator(){
-                public Message createMessage(Session session) throws JMSException{
-                    return session.createObjectMessage(event);
-                }
-            });
+            log.info("broadcast to internal JMS network: " + event);
+            sendJmsMessage(broadcastTopic, event, false, false);
         }catch(Exception e){
             log.error("failed to broadcast to the internal JMS network: "+event,e);
         }
@@ -431,18 +422,13 @@
         	}
             try {
                 final String componentName = cc.getComponentNameSpace().getName();
-                JmsTemplate jt = isPersistent(me) ? jmsPersistentTemplate : jmsTemplate;
-                String destination = "";
+                String destination;
                 if (me.getRole() == Role.PROVIDER){
                     destination = INBOUND_PREFIX + componentName;
                 }else {
                     destination = INBOUND_PREFIX + id.getContainerName();
                 }
-                jt.send(destination, new MessageCreator() {
-					public Message createMessage(Session session) throws JMSException {
-	                    return session.createObjectMessage(me);
-					}
-				});
+                sendJmsMessage(new ActiveMQQueue(destination), me, isPersistent(me), me.isTransacted());
             } catch (Exception e) {
                 log.error("Failed to send exchange: " + me + " internal JMS Network", e);
                 throw new MessagingException(e);
@@ -631,5 +617,18 @@
     
     public String toString(){
         return broker.getContainerName() + " JCAFlow";
+    }
+    
+    private void sendJmsMessage(Destination dest, Serializable object, boolean persistent, boolean transacted) throws JMSException {
+    	Connection connection = connectionFactory.createConnection();
+    	try {
+    		Session session = connection.createSession(transacted, transacted ? Session.SESSION_TRANSACTED : Session.AUTO_ACKNOWLEDGE);
+    		ObjectMessage msg = session.createObjectMessage(object);
+    		MessageProducer producer = session.createProducer(dest);
+    		producer.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+    		producer.send(msg);
+    	} finally {
+    		connection.close();
+    	}
     }
 }

Modified: incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java
URL: http://svn.apache.org/viewcvs/incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java?rev=365624&r1=365623&r2=365624&view=diff
==============================================================================
--- incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java (original)
+++ incubator/servicemix/trunk/servicemix-core/src/test/java/org/apache/servicemix/jbi/nmr/flow/jca/JCAFlowTest.java Tue Jan  3 05:08:46 2006
@@ -104,7 +104,7 @@
         broker.stop();
     }
     
-    public void XtestInOnly() throws Exception {
+    public void testInOnly() throws Exception {
         final SenderComponent sender = new SenderComponent();
         final ReceiverComponent receiver =  new ReceiverComponent();
         sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
@@ -119,7 +119,7 @@
         receiver.getMessageList().assertMessagesReceived(NUM_MESSAGES);
     }
     
-    public void XtestTxInOnly() throws Exception {
+    public void testTxInOnly() throws Exception {
         final SenderComponent sender = new SenderComponent();
         final ReceiverComponent receiver =  new ReceiverComponent();
         sender.setResolver(new ServiceNameEndpointResolver(ReceiverComponent.SERVICE));
@@ -178,8 +178,6 @@
         
         sender.sendMessages(NUM_MESSAGES);
         Thread.sleep(3000);
-        System.out.println("REC1 =" + receiver1.getMessageList().getMessageCount());
-        System.out.println("REC2 =" + receiver2.getMessageList().getMessageCount());
         assertTrue(receiver1.getMessageList().hasReceivedMessage());
         assertFalse(receiver2.getMessageList().hasReceivedMessage());
         receiver1.getMessageList().flushMessages();