You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/13 09:55:59 UTC

svn commit: r753175 - in /camel/trunk/components/camel-mail/src: main/java/org/apache/camel/component/mail/ test/java/org/apache/camel/component/mail/

Author: davsclaus
Date: Fri Mar 13 08:55:58 2009
New Revision: 753175

URL: http://svn.apache.org/viewvc?rev=753175&view=rev
Log:
CAMEL-1178: mail consumer will now rollback if the processing on the exchange is failed, to allow consuming the same mail next time. Renamed some long options for Camel 2.0.

Added:
    camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java
      - copied, changed from r753137, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
Modified:
    camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
    camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
    camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java
    camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
    camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java

Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java Fri Mar 13 08:55:58 2009
@@ -50,9 +50,9 @@
     private String defaultEncoding;
     private String from = DEFAULT_FROM;
     private String folderName = DEFAULT_FOLDER_NAME;
-    private boolean deleteProcessedMessages;
+    private boolean delete;
+    private boolean unseen = true;
     private boolean ignoreUriScheme;
-    private boolean processOnlyUnseenMessages = true;
     private Map<Message.RecipientType, String> recipients = new HashMap<Message.RecipientType, String>();
     private int fetchSize = -1;
     private boolean debugMode;
@@ -313,12 +313,12 @@
         this.from = from;
     }
 
-    public boolean isDeleteProcessedMessages() {
-        return deleteProcessedMessages;
+    public boolean isDelete() {
+        return delete;
     }
 
-    public void setDeleteProcessedMessages(boolean deleteProcessedMessages) {
-        this.deleteProcessedMessages = deleteProcessedMessages;
+    public void setDelete(boolean delete) {
+        this.delete = delete;
     }
 
     public String getFolderName() {
@@ -337,12 +337,12 @@
         this.ignoreUriScheme = ignoreUriScheme;
     }
 
-    public boolean isProcessOnlyUnseenMessages() {
-        return processOnlyUnseenMessages;
+    public boolean isUnseen() {
+        return unseen;
     }
 
-    public void setProcessOnlyUnseenMessages(boolean processOnlyUnseenMessages) {
-        this.processOnlyUnseenMessages = processOnlyUnseenMessages;
+    public void setUnseen(boolean unseen) {
+        this.unseen = unseen;
     }
 
     /**

Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Fri Mar 13 08:55:58 2009
@@ -73,7 +73,7 @@
 
         if (store == null || folder == null) {
             throw new IllegalStateException("MailConsumer did not connect properly to the MailStore: "
-                                            + endpoint.getConfiguration().getMailStoreLogInformation());
+                    + endpoint.getConfiguration().getMailStoreLogInformation());
         }
 
         if (LOG.isDebugEnabled()) {
@@ -96,7 +96,7 @@
                 Message[] messages;
 
                 // should we process all messages or only unseen messages
-                if (endpoint.getConfiguration().isProcessOnlyUnseenMessages()) {
+                if (endpoint.getConfiguration().isUnseen()) {
                     messages = folder.search(new FlagTerm(new Flags(Flags.Flag.SEEN), false));
                 } else {
                     messages = folder.getMessages();
@@ -107,6 +107,8 @@
             } else if (count == -1) {
                 throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
             }
+        } catch (Exception e) {
+            handleException(e);
         } finally {
             // need to ensure we release resources
             try {
@@ -153,8 +155,15 @@
         for (int i = 0; i < count; i++) {
             Message message = messages[i];
             if (!message.getFlags().contains(Flags.Flag.DELETED)) {
-                processMessage(message);
-                flagMessageProcessed(message);
+
+                MailExchange exchange = endpoint.createExchange(message);
+                process(exchange);
+
+                if (!exchange.isFailed()) {
+                    processCommit(exchange);
+                } else {
+                    processRollback(exchange);
+                }
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Skipping message as it was flagged as deleted: " + MailUtils.dumpMessage(message));
@@ -166,10 +175,9 @@
     /**
      * Strategy to process the mail message.
      */
-    protected void processMessage(Message message) throws Exception {
-        MailExchange exchange = endpoint.createExchange(message);
+    protected void process(MailExchange exchange) throws Exception {
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Processing message: " + MailUtils.dumpMessage(message));
+            LOG.debug("Processing message: " + MailUtils.dumpMessage(exchange.getIn().getMessage()));
         }
         getProcessor().process(exchange);
     }
@@ -177,11 +185,23 @@
     /**
      * Strategy to flag the message after being processed.
      */
-    protected void flagMessageProcessed(Message message) throws MessagingException {
-        if (endpoint.getConfiguration().isDeleteProcessedMessages()) {
+    protected void processCommit(MailExchange exchange) throws MessagingException {
+        Message message = exchange.getIn().getMessage();
+
+        if (endpoint.getConfiguration().isDelete()) {
+            LOG.debug("Exchange processed, so flagging message as DELETED");
             message.setFlag(Flags.Flag.DELETED, true);
         } else {
+            LOG.debug("Exchange processed, so flagging message as SEEN");
             message.setFlag(Flags.Flag.SEEN, true);
         }
     }
+
+    /**
+     * Strategy when processing the exchange failed.
+     */
+    protected void processRollback(MailExchange exchange) throws MessagingException {
+        LOG.warn("Exchange failed, so rolling back message status: " + exchange);
+    }
+
 }

Modified: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java Fri Mar 13 08:55:58 2009
@@ -71,11 +71,11 @@
         assertEquals("encoding", null, config.getDefaultEncoding());
         assertEquals("from", "camel@localhost", config.getFrom());
         assertEquals("password", "secret", config.getPassword());
-        assertEquals(false, config.isDeleteProcessedMessages());
+        assertEquals(false, config.isDelete());
         assertEquals(false, config.isIgnoreUriScheme());
         assertEquals("fetchSize", -1, config.getFetchSize());
         assertEquals("contentType", "text/plain", config.getContentType());
-        assertEquals("processOnlyUnseenMessages", true, config.isProcessOnlyUnseenMessages());
+        assertEquals("unseen", true, config.isUnseen());
     }
 
     public void testDefaultPOP3Configuration() throws Exception {
@@ -90,11 +90,11 @@
         assertEquals("encoding", null, config.getDefaultEncoding());
         assertEquals("from", "camel@localhost", config.getFrom());
         assertEquals("password", "secret", config.getPassword());
-        assertEquals(false, config.isDeleteProcessedMessages());
+        assertEquals(false, config.isDelete());
         assertEquals(false, config.isIgnoreUriScheme());
         assertEquals("fetchSize", -1, config.getFetchSize());
         assertEquals("contentType", "text/plain", config.getContentType());
-        assertEquals("processOnlyUnseenMessages", true, config.isProcessOnlyUnseenMessages());
+        assertEquals("unseen", true, config.isUnseen());
     }
 
     public void testDefaultIMAPConfiguration() throws Exception {
@@ -109,17 +109,17 @@
         assertEquals("encoding", null, config.getDefaultEncoding());
         assertEquals("from", "camel@localhost", config.getFrom());
         assertEquals("password", "secret", config.getPassword());
-        assertEquals(false, config.isDeleteProcessedMessages());
+        assertEquals(false, config.isDelete());
         assertEquals(false, config.isIgnoreUriScheme());
         assertEquals("fetchSize", -1, config.getFetchSize());
         assertEquals("contentType", "text/plain", config.getContentType());
-        assertEquals("processOnlyUnseenMessages", true, config.isProcessOnlyUnseenMessages());
+        assertEquals("unseen", true, config.isUnseen());
     }
 
     public void testManyConfigurations() throws Exception {
         MailEndpoint endpoint = resolveMandatoryEndpoint("smtp://james@myhost:30/subject?password=secret"
-            + "&from=me@camelriders.org&deleteProcessedMessages=true&defaultEncoding=iso-8859-1&folderName=riders"
-            + "&contentType=text/html&processOnlyUnseenMessages=false");
+            + "&from=me@camelriders.org&delete=true&defaultEncoding=iso-8859-1&folderName=riders"
+            + "&contentType=text/html&unseen=false");
         MailConfiguration config = endpoint.getConfiguration();
         assertEquals("getProtocol()", "smtp", config.getProtocol());
         assertEquals("getHost()", "myhost", config.getHost());
@@ -130,10 +130,10 @@
         assertEquals("encoding", "iso-8859-1", config.getDefaultEncoding());
         assertEquals("from", "me@camelriders.org", config.getFrom());
         assertEquals("password", "secret", config.getPassword());
-        assertEquals(true, config.isDeleteProcessedMessages());
+        assertEquals(true, config.isDelete());
         assertEquals(false, config.isIgnoreUriScheme());
         assertEquals("fetchSize", -1, config.getFetchSize());
-        assertEquals("processOnlyUnseenMessages", false, config.isProcessOnlyUnseenMessages());
+        assertEquals("unseen", false, config.isUnseen());
         assertEquals("contentType", "text/html", config.getContentType());
     }
 
@@ -149,7 +149,7 @@
         assertEquals("encoding", null, config.getDefaultEncoding());
         assertEquals("from", "camel@localhost", config.getFrom());
         assertEquals("password", "secret", config.getPassword());
-        assertEquals(false, config.isDeleteProcessedMessages());
+        assertEquals(false, config.isDelete());
         assertEquals(false, config.isIgnoreUriScheme());
         assertEquals("fetchSize", -1, config.getFetchSize());
     }
@@ -166,7 +166,7 @@
         assertEquals("encoding", null, config.getDefaultEncoding());
         assertEquals("from", "camel@localhost", config.getFrom());
         assertEquals("password", "secret", config.getPassword());
-        assertEquals(false, config.isDeleteProcessedMessages());
+        assertEquals(false, config.isDelete());
         assertEquals(false, config.isIgnoreUriScheme());
         assertEquals("fetchSize", -1, config.getFetchSize());
     }

Copied: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java (from r753137, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java?p2=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java&p1=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java&r1=753137&r2=753175&rev=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java Fri Mar 13 08:55:58 2009
@@ -23,39 +23,35 @@
 import javax.mail.internet.MimeMessage;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
 import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
 import org.jvnet.mock_javamail.Mailbox;
 import org.springframework.mail.javamail.JavaMailSenderImpl;
 
 /**
- * Unit test for processOnlyUnseenMessages option.
+ * Unit test for rollback option.
  */
-public class MailProcessOnlyUnseenMessagesTest extends ContextTestSupport {
+public class MailDoNotDeleteIfProcessFailsTest extends ContextTestSupport {
 
-    public void testProcessOnlyUnseenMessages() throws Exception {
+    private static int counter;
+
+    public void testRoolbackIfProcessFails() throws Exception {
         prepareMailbox();
 
-        sendBody("direct:a", "Message 3");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Message 1");
+        // the first 2 attempt should fail
+        getMockEndpoint("mock:error").expectedMessageCount(2);
+
+        assertMockEndpointsSatisfied();
 
-        MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Message 3");
-        mock.assertIsSatisfied();
-
-        // reset mock so we can make new assertions
-        mock.reset();
-
-        // send a new message, now we should only receive this new massages as all the others has been SEEN
-        sendBody("direct:a", "Message 4");
-        mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Message 4");
-        mock.assertIsSatisfied();
+        assertEquals(3, counter);
     }
 
     private void prepareMailbox() throws Exception {
         // connect to mailbox
         Mailbox.clearAll();
+
         JavaMailSenderImpl sender = new JavaMailSenderImpl();
         Store store = sender.getSession().getStore("imap");
         store.connect("localhost", 25, "claus", "secret");
@@ -63,11 +59,11 @@
         folder.open(Folder.READ_WRITE);
         folder.expunge();
 
-        // inserts two messages with the SEEN flag
+        // inserts two new messages
         Message[] msg = new Message[2];
         msg[0] = new MimeMessage(sender.getSession());
         msg[0].setText("Message 1");
-        msg[0].setFlag(Flags.Flag.SEEN, true);
+        msg[0].setFlag(Flags.Flag.SEEN, false);
         msg[1] = new MimeMessage(sender.getSession());
         msg[1].setText("Message 2");
         msg[1].setFlag(Flags.Flag.SEEN, true);
@@ -78,11 +74,21 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("direct:a").to("smtp://claus@localhost");
+                // no redelivery for unit test as we want it to be polled next time
+                errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0).logStackTrace(false));
 
-                from("imap://localhost?username=claus&password=secret&processOnlyUnseenMessages=true&consumer.delay=1000").to("mock:result");
+                from("imap://localhost?username=claus&password=secret&unseen=true&delay=250")
+                        .process(new Processor() {
+                            public void process(Exchange exchange) throws Exception {
+                                counter++;
+                                if (counter < 3) {
+                                    throw new IllegalArgumentException("Forced by unit test");
+                                }
+                            }
+                        })
+                        .to("mock:result");
             }
         };
     }
 
-}
+}
\ No newline at end of file

Modified: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java Fri Mar 13 08:55:58 2009
@@ -89,7 +89,7 @@
         return new RouteBuilder() {
             public void configure() throws Exception {
                 from("pop3://jones@localhost?password=secret&fetchSize=2&consumer.delay=5000"
-                    + "&deleteProcessedMessages=true").to("mock:result");
+                    + "&delete=true").to("mock:result");
             }
         };
     }

Modified: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java Fri Mar 13 08:55:58 2009
@@ -29,7 +29,7 @@
 import org.springframework.mail.javamail.JavaMailSenderImpl;
 
 /**
- * Unit test for processOnlyUnseenMessages option.
+ * Unit test for unseen option.
  */
 public class MailProcessOnlyUnseenMessagesTest extends ContextTestSupport {
 
@@ -80,7 +80,7 @@
             public void configure() throws Exception {
                 from("direct:a").to("smtp://claus@localhost");
 
-                from("imap://localhost?username=claus&password=secret&processOnlyUnseenMessages=true&consumer.delay=1000").to("mock:result");
+                from("imap://localhost?username=claus&password=secret&unseen=true&consumer.delay=1000").to("mock:result");
             }
         };
     }