You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by ge...@apache.org on 2008/09/02 12:50:48 UTC
svn commit: r691194 - in /servicemix/sandbox/gertv/smx-sling: ./
servicemix-audit-jcr/
servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/
Author: gertv
Date: Tue Sep 2 03:50:48 2008
New Revision: 691194
URL: http://svn.apache.org/viewvc?rev=691194&view=rev
Log:
Asynchronous message auditing -- thx to vladislav
Added:
servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java
servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java
Modified:
servicemix/sandbox/gertv/smx-sling/pom.xml
servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml
servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java
servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java
Modified: servicemix/sandbox/gertv/smx-sling/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/pom.xml?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/pom.xml (original)
+++ servicemix/sandbox/gertv/smx-sling/pom.xml Tue Sep 2 03:50:48 2008
@@ -20,6 +20,7 @@
<properties>
<servicemix-version>3.3-SNAPSHOT</servicemix-version>
<sling-version>2.0.2-incubator</sling-version>
+ <spring-version>2.5.5</spring-version>
</properties>
<repositories>
<repository>
Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/pom.xml Tue Sep 2 03:50:48 2008
@@ -22,6 +22,11 @@
<version>1.0</version>
</dependency>
<dependency>
+ <groupId>org.springframework</groupId>
+ <artifactId>spring-jms</artifactId>
+ <version>${spring-version}</version>
+ </dependency>
+ <dependency>
<groupId>org.apache.sling</groupId>
<artifactId>org.apache.sling.jcr.jackrabbit.client</artifactId>
<version>${sling-version}</version>
Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/AsynchronousAbstractAuditor.java Tue Sep 2 03:50:48 2008
@@ -1,39 +1,250 @@
package org.apache.servicemix.audit.jcr;
-import javax.jbi.JBIException;
+import java.net.URISyntaxException;
+import javax.jbi.JBIException;
+import javax.jbi.messaging.MessageExchange;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.jbi.audit.AbstractAuditor;
import org.apache.servicemix.jbi.event.ExchangeEvent;
+import org.apache.servicemix.jbi.messaging.MessageExchangeImpl;
+import org.springframework.jms.core.JmsTemplate;
+import org.springframework.jms.listener.DefaultMessageListenerContainer;
+
/**
*
- * Abstract class for take care of all the serialization and multi-threading stuff
+ * Abstract class for take care of all the serialization and multi-threading
+ * stuff
*
* @author vkrejcirik
*
+ *
*/
public abstract class AsynchronousAbstractAuditor extends AbstractAuditor {
+ private static final Log LOG = LogFactory
+ .getLog(AsynchronousAbstractAuditor.class);
+
+ private Session acceptedSession;
+ private Session sentSession;
+
+ private ActiveMQConnection connection;
+
+ private Destination acceptedDestination;
+ private Destination sentDestination;
+
+ private ExchangeAcceptedProducer acceptedProducer;
+ private ExchangeSentProducer sentProducer;
+
+ private AcceptedListener acceptedListener;
+ private SentListener sentListener;
+ private ExceptionListener exceptionListener;
+
+ private JmsTemplate acceptedJmsTemplate;
+ private JmsTemplate sentJmsTemplate;
+ private DefaultMessageListenerContainer sentListenerContainer;
+ private DefaultMessageListenerContainer acceptedListenerContainer;
+
public void doStart() throws JBIException {
+
+ try {
+ connection = ActiveMQConnection
+ .makeConnection("tcp://localhost:61616");
+ //connection.start();
+
+ acceptedSession = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+
+ sentSession = connection.createSession(false,
+ Session.AUTO_ACKNOWLEDGE);
+
+ acceptedDestination = acceptedSession
+ .createQueue("messages.accepted");
+
+ sentDestination = sentSession.createQueue("messages.sent");
+
+ } catch (URISyntaxException e) {
+ throw new JBIException("URI syntax is wrong", e);
+ } catch (JMSException e1) {
+ throw new JBIException(
+ "Error while creating queue for exchange", e1);
+ }
+
+ sentListenerContainer = new DefaultMessageListenerContainer();
+ acceptedListenerContainer = new DefaultMessageListenerContainer();
+
+ ActiveMQConnectionFactory sentConnectionFactory = new ActiveMQConnectionFactory();
+ sentConnectionFactory.setBrokerURL("tcp://localhost:61616");
+
+ ActiveMQConnectionFactory acceptedConnectionFactory = new ActiveMQConnectionFactory();
+ acceptedConnectionFactory.setBrokerURL("tcp://localhost:61616");
+
+ sentListenerContainer.setConnectionFactory(sentConnectionFactory);
+ acceptedListenerContainer.setConnectionFactory(acceptedConnectionFactory);
+
+ sentListenerContainer.setConcurrentConsumers(5);
+ acceptedListenerContainer.setConcurrentConsumers(5);
+
+ sentListenerContainer.setDestination(sentDestination);
+ acceptedListenerContainer.setDestination(acceptedDestination);
+
+ sentListener = new SentListener();
+ acceptedListener = new AcceptedListener();
+
+ acceptedJmsTemplate = new JmsTemplate();
+ sentJmsTemplate = new JmsTemplate();
+
+ acceptedJmsTemplate.setConnectionFactory(acceptedConnectionFactory);
+ acceptedJmsTemplate.setDefaultDestination(acceptedDestination);
+
+ sentJmsTemplate.setConnectionFactory(sentConnectionFactory);
+ sentJmsTemplate.setDefaultDestination(sentDestination);
+
+ sentProducer = new ExchangeSentProducer(sentJmsTemplate);
+ acceptedProducer = new ExchangeAcceptedProducer(acceptedJmsTemplate);
+
+ sentListenerContainer.setMessageListener(sentListener);
+ acceptedListenerContainer.setMessageListener(acceptedListener);
+
+ //exceptionListener = new ExceptionListener();
+
+ sentListenerContainer.setExceptionListener(exceptionListener);
+
+ sentListenerContainer.setAutoStartup(true);
+ sentListenerContainer.afterPropertiesSet();
+ acceptedListenerContainer.setAutoStartup(true);
+ acceptedListenerContainer.afterPropertiesSet();
+ try {
+ connection.start();
+ } catch (JMSException e) {
+ e.printStackTrace();
+ }
super.doStart();
}
-
-
- public void exchangeSent(ExchangeEvent event) {
- onExchangeSent(event);
+
+ @Override
+ protected void doStop() throws JBIException {
+
+ try {
+
+ sentListenerContainer.stop();
+ sentListenerContainer.shutdown();
+ acceptedListenerContainer.stop();
+ acceptedListenerContainer.shutdown();
+
+ acceptedSession.close();
+ sentSession.close();
+ connection.close();
+
+ } catch (JMSException e) {
+ throw new JBIException("Close session or connection failed", e);
+ }
+
+
+ super.doStop();
+ }
+
+ public void exchangeSent(ExchangeEvent event) {
+
+ MessageExchange messageExchange = event.getExchange();
+ ObjectMessage objectExchange = null;
+
+ try {
+ objectExchange = sentSession
+ .createObjectMessage((MessageExchangeImpl) messageExchange);
+ sentProducer.sendMessage(objectExchange);
+
+ } catch (JMSException e) {
+ LOG.error("Error while serializing sent message exchange.");
+ }
+
}
public void exchangeAccepted(ExchangeEvent event) {
- onExchangeAccepted(event);
+
+ MessageExchange messageExchange = event.getExchange();
+ ObjectMessage objectExchange = null;
+
+ try {
+ objectExchange = acceptedSession
+ .createObjectMessage((MessageExchangeImpl) messageExchange);
+ acceptedProducer.sendMessage(objectExchange);
+
+ } catch (JMSException e) {
+ LOG.error("Error while serializing accepted message exchange.");
+ }
+
super.exchangeAccepted(event);
}
-
- public abstract void onExchangeSent(ExchangeEvent event);
- public abstract void onExchangeAccepted(ExchangeEvent event);
+ public abstract void onExchangeSent(MessageExchange exchange);
+
+ public abstract void onExchangeAccepted(MessageExchange exchange);
+
+ public class SentListener implements MessageListener {
+
+ public void onMessage(Message message) {
+
+ if (message instanceof ObjectMessage) {
+
+ try {
+ LOG.debug("receive message");
+ ObjectMessage m = (ObjectMessage) message;
+ MessageExchange exchange = (MessageExchange) m.getObject();
+
+ //System.out.println(exchange);
+
+ onExchangeSent(exchange);
+
+ } catch (JMSException e) {
+ LOG.error("Error while receiving message.");
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Message must be of type ObjectMessage");
+ }
+ }
+ }
+
+ public class AcceptedListener implements MessageListener {
+
+ public void onMessage(Message message) {
+
+ if (message instanceof ObjectMessage) {
+
+ try {
+ LOG.debug("receive message");
+ ObjectMessage m = (ObjectMessage) message;
+ MessageExchange exchange = (MessageExchange) m.getObject();
+
+ //System.out.println(exchange);
+
+ onExchangeAccepted(exchange);
+
+ } catch (JMSException e) {
+ LOG.error("Error while receiving message.");
+ }
+ } else {
+ throw new IllegalArgumentException(
+ "Message must be of type ObjectMessage");
+ }
+ }
+ }
}
Added: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java?rev=691194&view=auto
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java (added)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeAcceptedProducer.java Tue Sep 2 03:50:48 2008
@@ -0,0 +1,19 @@
+package org.apache.servicemix.audit.jcr;
+
+import javax.jms.ObjectMessage;
+
+import org.springframework.jms.core.JmsTemplate;
+
+public class ExchangeAcceptedProducer {
+
+ private JmsTemplate jmsTemplate;
+
+ public ExchangeAcceptedProducer(JmsTemplate jmsTemplate) {
+ this.jmsTemplate = jmsTemplate;
+ }
+
+ public void sendMessage(ObjectMessage message) {
+
+ jmsTemplate.convertAndSend(message);
+ }
+}
Added: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java?rev=691194&view=auto
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java (added)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/ExchangeSentProducer.java Tue Sep 2 03:50:48 2008
@@ -0,0 +1,20 @@
+package org.apache.servicemix.audit.jcr;
+
+import javax.jms.ObjectMessage;
+
+import org.springframework.jms.core.JmsTemplate;
+
+public class ExchangeSentProducer {
+
+ private JmsTemplate jmsTemplate;
+
+ public ExchangeSentProducer(JmsTemplate jmsTemplate) {
+ this.jmsTemplate = jmsTemplate;
+ }
+
+ public void sendMessage(ObjectMessage message) {
+
+ jmsTemplate.convertAndSend(message);
+ }
+
+}
Modified: servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java
URL: http://svn.apache.org/viewvc/servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java?rev=691194&r1=691193&r2=691194&view=diff
==============================================================================
--- servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java (original)
+++ servicemix/sandbox/gertv/smx-sling/servicemix-audit-jcr/src/main/java/org/apache/servicemix/audit/jcr/JcrAuditor.java Tue Sep 2 03:50:48 2008
@@ -11,7 +11,6 @@
import org.apache.commons.logging.Log;
import org.apache.commons.logging.LogFactory;
import org.apache.servicemix.jbi.audit.AuditorException;
-import org.apache.servicemix.jbi.event.ExchangeEvent;
/**
*
@@ -41,6 +40,7 @@
super.doStart();
}
+
protected Session getSession() throws LoginException, RepositoryException {
if (session.get() == null) {
Session session = repository.login(new SimpleCredentials("admin",
@@ -50,23 +50,23 @@
return session.get();
}
- public void onExchangeSent(ExchangeEvent event) {
+ public void onExchangeSent(MessageExchange exchange) {
try {
- strategy.processExchange(event.getExchange(), getSession());
+ strategy.processExchange(exchange, getSession());
getSession().save();
LOG.info("Successfully stored information about message exchange "
- + event.getExchange().getExchangeId()
+ + exchange.getExchangeId()
+ " in the JCR repository");
} catch (Exception e) {
LOG.error("Unable to store information about message exchange "
- + event.getExchange().getExchangeId(), e);
+ + exchange.getExchangeId(), e);
}
}
@Override
- public void onExchangeAccepted(ExchangeEvent event) {
+ public void onExchangeAccepted(MessageExchange exchange) {
}