You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/09/02 12:50:48 UTC

svn commit: r691194 - in /servicemix/sandbox/gertv/smx-sling: ./ servicemix-audit-jcr/ servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/

Author: gertv
Date: Tue Sep  2 03:50:48 2008
New Revision: 691194

URL: http://svn.apache.org/viewvc?rev=691194&view=rev
Log:
Asynchronous message auditing -- thx to vladislav

Added:
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java
Modified:
    servicemix/sandbox/gertv/smx-sling/pom.xml
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java
    servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java

Modified: servicemix/sandbox/gertv/smx-sling/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/pom.xml?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/pom.xml (original)
+++ servicemix/sandbox/gertv/smx-sling/pom.xml Tue Sep  2 03:50:48 2008
@@ -20,6 +20,7 @@
   <properties>
     <servicemix-version>3.3-SNAPSHOT</servicemix-version>
     <sling-version>2.0.2-incubator</sling-version>
+    <spring-version>2.5.5</spring-version>
   </properties>
   <repositories>
     <repository>

Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml Tue Sep  2 03:50:48 2008
@@ -22,6 +22,11 @@
       <version>1.0</version>
     </dependency>
     <dependency>
+            <groupId>org.springframework</groupId>
+            <artifactId>spring-jms</artifactId>
+            <version>${spring-version}</version>
+    </dependency>
+    <dependency>
       <groupId>org.apache.sling</groupId>
       <artifactId>org.apache.sling.jcr.jackrabbit.client</artifactId>
       <version>${sling-version}</version>

Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java Tue Sep  2 03:50:48 2008
@@ -1,39 +1,250 @@
 package org.apache.servicemix.audit.jcr;
 
-import javax.jbi.JBIException;
+import java.net.URISyntaxException;
 
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.jbi.audit.AbstractAuditor;
 import org.apache.servicemix.jbi.event.ExchangeEvent;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
 
 /**
  * 
- * Abstract class for take care of all the serialization and multi-threading stuff
+ * Abstract class for take care of all the serialization and multi-threading
+ * stuff
  * 
  * @author vkrejcirik
  * 
+ * 
  */
 public abstract class AsynchronousAbstractAuditor extends AbstractAuditor {
 
+    private static final Log LOG = LogFactory
+            .getLog(AsynchronousAbstractAuditor.class);
+
+    private Session acceptedSession;
+    private Session sentSession;
+
+    private ActiveMQConnection connection;
+
+    private Destination acceptedDestination;
+    private Destination sentDestination;
+
+    private ExchangeAcceptedProducer acceptedProducer;
+    private ExchangeSentProducer sentProducer;
+
+    private AcceptedListener acceptedListener;
+    private SentListener sentListener;
+    private ExceptionListener exceptionListener;
+    
+    private JmsTemplate acceptedJmsTemplate;
+    private JmsTemplate sentJmsTemplate;
     
+    private DefaultMessageListenerContainer sentListenerContainer;
+    private DefaultMessageListenerContainer acceptedListenerContainer;
+
     public void doStart() throws JBIException {
+
+        try {
+            connection = ActiveMQConnection
+                    .makeConnection("tcp://localhost:61616");
+            //connection.start();
+            
+            acceptedSession = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+
+            sentSession = connection.createSession(false,
+                    Session.AUTO_ACKNOWLEDGE);
+            
+            acceptedDestination = acceptedSession
+                    .createQueue("messages.accepted");
+            
+            sentDestination = sentSession.createQueue("messages.sent");
+
+        } catch (URISyntaxException e) {
+            throw new JBIException("URI syntax is wrong", e);
+        } catch (JMSException e1) {
+            throw new JBIException(
+                    "Error while creating queue for exchange", e1);
+        }
+
+        sentListenerContainer = new DefaultMessageListenerContainer();
+        acceptedListenerContainer = new DefaultMessageListenerContainer();
+
+        ActiveMQConnectionFactory sentConnectionFactory = new ActiveMQConnectionFactory();
+        sentConnectionFactory.setBrokerURL("tcp://localhost:61616");
+
+        ActiveMQConnectionFactory acceptedConnectionFactory = new ActiveMQConnectionFactory();
+        acceptedConnectionFactory.setBrokerURL("tcp://localhost:61616");
+        
+        sentListenerContainer.setConnectionFactory(sentConnectionFactory);
+        acceptedListenerContainer.setConnectionFactory(acceptedConnectionFactory);
+        
+        sentListenerContainer.setConcurrentConsumers(5);
+        acceptedListenerContainer.setConcurrentConsumers(5);
+        
+        sentListenerContainer.setDestination(sentDestination);
+        acceptedListenerContainer.setDestination(acceptedDestination);
+        
+        sentListener = new SentListener();
+        acceptedListener = new AcceptedListener();
+        
+        acceptedJmsTemplate = new JmsTemplate();
+        sentJmsTemplate = new JmsTemplate();
+        
+        acceptedJmsTemplate.setConnectionFactory(acceptedConnectionFactory);
+        acceptedJmsTemplate.setDefaultDestination(acceptedDestination);
+        
+        sentJmsTemplate.setConnectionFactory(sentConnectionFactory);
+        sentJmsTemplate.setDefaultDestination(sentDestination);
+        
+        sentProducer = new ExchangeSentProducer(sentJmsTemplate);
+        acceptedProducer = new ExchangeAcceptedProducer(acceptedJmsTemplate);
+        
+        sentListenerContainer.setMessageListener(sentListener);
+        acceptedListenerContainer.setMessageListener(acceptedListener);
+      
+        //exceptionListener = new ExceptionListener();
+        
+        sentListenerContainer.setExceptionListener(exceptionListener);
+        
+        sentListenerContainer.setAutoStartup(true);
+        sentListenerContainer.afterPropertiesSet();
         
+        acceptedListenerContainer.setAutoStartup(true);
+        acceptedListenerContainer.afterPropertiesSet();
         
+        try {
+            connection.start();
+        } catch (JMSException e) {
+            e.printStackTrace();
+        }
         
         super.doStart();
     }
-    
-    
-    public void exchangeSent(ExchangeEvent event) {      
-        onExchangeSent(event);
+
+    @Override
+    protected void doStop() throws JBIException {
+
+        try {
+            
+            sentListenerContainer.stop();
+            sentListenerContainer.shutdown();
+            acceptedListenerContainer.stop();
+            acceptedListenerContainer.shutdown();
+            
+            acceptedSession.close();
+            sentSession.close();
+            connection.close();
+
+        } catch (JMSException e) {
+            throw new JBIException("Close session or connection failed", e);
+        }
+
+        
+        super.doStop();
+    }
+
+    public void exchangeSent(ExchangeEvent event) {
+
+        MessageExchange messageExchange = event.getExchange();
+        ObjectMessage objectExchange = null;
+
+        try {
+            objectExchange = sentSession
+                    .createObjectMessage((MessageExchangeImpl) messageExchange);
+            sentProducer.sendMessage(objectExchange);
+
+        } catch (JMSException e) {
+            LOG.error("Error while serializing sent message exchange.");
+        }
+
     }
 
     public void exchangeAccepted(ExchangeEvent event) {
-        onExchangeAccepted(event);
+
+        MessageExchange messageExchange = event.getExchange();
+        ObjectMessage objectExchange = null;
+
+        try {
+            objectExchange = acceptedSession
+                    .createObjectMessage((MessageExchangeImpl) messageExchange);
+            acceptedProducer.sendMessage(objectExchange);
+
+        } catch (JMSException e) {
+            LOG.error("Error while serializing accepted message exchange.");
+        }
+
         super.exchangeAccepted(event);
     }
-    
-    public abstract void onExchangeSent(ExchangeEvent event);
 
-    public abstract void onExchangeAccepted(ExchangeEvent event);
+    public abstract void onExchangeSent(MessageExchange exchange);
+
+    public abstract void onExchangeAccepted(MessageExchange exchange);
+
+    public class SentListener implements MessageListener {
+
+        public void onMessage(Message message) {
+
+            if (message instanceof ObjectMessage) {
+
+                try {
+                    LOG.debug("receive message");
+                    ObjectMessage m = (ObjectMessage) message;
+                    MessageExchange exchange = (MessageExchange) m.getObject();
+                    
+                    //System.out.println(exchange);
+                    
+                    onExchangeSent(exchange);
+
+                } catch (JMSException e) {
+                    LOG.error("Error while receiving message.");
+                }
+            } else {
+                throw new IllegalArgumentException(
+                        "Message must be of type ObjectMessage");
+            }
+        }
+    }
+
+    public class AcceptedListener implements MessageListener {
+
+        public void onMessage(Message message) {
+
+            if (message instanceof ObjectMessage) {
+
+                try {
+                    LOG.debug("receive message");
+                    ObjectMessage m = (ObjectMessage) message;
+                    MessageExchange exchange = (MessageExchange) m.getObject();
+                    
+                    //System.out.println(exchange);
+                    
+                    onExchangeAccepted(exchange);
+
+                } catch (JMSException e) {
+                    LOG.error("Error while receiving message.");
+                }
+            } else {
+                throw new IllegalArgumentException(
+                        "Message must be of type ObjectMessage");
+            }
+        }
+    }
 
 }

Added: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java?rev=691194&view=auto
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java (added)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java Tue Sep  2 03:50:48 2008
@@ -0,0 +1,19 @@
+package org.apache.servicemix.audit.jcr;
+
+import javax.jms.ObjectMessage;
+
+import org.springframework.jms.core.JmsTemplate;
+
+public class ExchangeAcceptedProducer {
+
+    private JmsTemplate jmsTemplate;
+
+    public ExchangeAcceptedProducer(JmsTemplate jmsTemplate) {
+        this.jmsTemplate = jmsTemplate;
+    }
+
+    public void sendMessage(ObjectMessage message) {
+      
+        jmsTemplate.convertAndSend(message);
+    }
+}

Added: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java?rev=691194&view=auto
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java (added)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java Tue Sep  2 03:50:48 2008
@@ -0,0 +1,20 @@
+package org.apache.servicemix.audit.jcr;
+
+import javax.jms.ObjectMessage;
+
+import org.springframework.jms.core.JmsTemplate;
+
+public class ExchangeSentProducer {
+
+    private JmsTemplate jmsTemplate;
+
+    public ExchangeSentProducer(JmsTemplate jmsTemplate) {
+        this.jmsTemplate = jmsTemplate;
+    }
+
+    public void sendMessage(ObjectMessage message) {
+      
+        jmsTemplate.convertAndSend(message);
+    }
+
+}

Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java Tue Sep  2 03:50:48 2008
@@ -11,7 +11,6 @@
 import org.apache.commons.logging.Log;
 import org.apache.commons.logging.LogFactory;
 import org.apache.servicemix.jbi.audit.AuditorException;
-import org.apache.servicemix.jbi.event.ExchangeEvent;
 
 /**
  * 
@@ -41,6 +40,7 @@
         super.doStart();
     }
 
+    
     protected Session getSession() throws LoginException, RepositoryException {
         if (session.get() == null) {
             Session session = repository.login(new SimpleCredentials("admin",
@@ -50,23 +50,23 @@
         return session.get();
     }
 
-    public void onExchangeSent(ExchangeEvent event) {
+    public void onExchangeSent(MessageExchange exchange) {
 
         try {
-            strategy.processExchange(event.getExchange(), getSession());
+            strategy.processExchange(exchange, getSession());
             getSession().save();
 
             LOG.info("Successfully stored information about message exchange "
-                    + event.getExchange().getExchangeId()
+                    + exchange.getExchangeId()
                     + " in the JCR repository");
         } catch (Exception e) {
             LOG.error("Unable to store information about message exchange "
-                    + event.getExchange().getExchangeId(), e);
+                    + exchange.getExchangeId(), e);
         }
     }
 
     @Override
-    public void onExchangeAccepted(ExchangeEvent event) {
+    public void onExchangeAccepted(MessageExchange exchange) {
 
     }