You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/23 13:18:27 UTC

svn commit: r777891 - in /camel/trunk/components/camel-mail/src: main/java/org/apache/camel/component/mail/MailConsumer.java test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java

Author: davsclaus
Date: Sat May 23 11:18:27 2009
New Revision: 777891

URL: http://svn.apache.org/viewvc?rev=777891&view=rev
Log:
CAMEL-1640: mail consumer is now batch consumer.

Added:
    camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
      - copied, changed from r777814, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
Modified:
    camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java

Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=777891&r1=777890&r2=777891&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat May 23 11:18:27 2009
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.mail;
 
+import java.util.ArrayList;
+import java.util.List;
 import javax.mail.Flags;
 import javax.mail.Folder;
 import javax.mail.FolderNotFoundException;
@@ -24,6 +26,8 @@
 import javax.mail.Store;
 import javax.mail.search.FlagTerm;
 
+import org.apache.camel.BatchConsumer;
+import org.apache.camel.Exchange;
 import org.apache.camel.Processor;
 import org.apache.camel.impl.ScheduledPollConsumer;
 import org.apache.commons.logging.Log;
@@ -36,7 +40,7 @@
  *
  * @version $Revision$
  */
-public class MailConsumer extends ScheduledPollConsumer {
+public class MailConsumer extends ScheduledPollConsumer implements BatchConsumer {
     public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
     private static final transient Log LOG = LogFactory.getLog(MailConsumer.class);
 
@@ -102,7 +106,8 @@
                     messages = folder.getMessages();
                 }
 
-                processMessages(messages);
+                List<Exchange> exchanges = createExchanges(messages);
+                processBatch(exchanges);
 
             } else if (count == -1) {
                 throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
@@ -122,40 +127,29 @@
         }
     }
 
-    protected void ensureIsConnected() throws MessagingException {
-        MailConfiguration config = endpoint.getConfiguration();
-
-        boolean connected = false;
-        try {
-            if (store != null && store.isConnected()) {
-                connected = true;
-            }
-        } catch (Exception e) {
-            LOG.debug("Exception while testing for is connected to MailStore: "
-                    + endpoint.getConfiguration().getMailStoreLogInformation()
-                    + ". Caused by: " + e.getMessage(), e);
-        }
-
-        if (!connected) {
-            if (LOG.isDebugEnabled()) {
-                LOG.debug("Connecting to MailStore: " + endpoint.getConfiguration().getMailStoreLogInformation());
-            }
-            store = sender.getSession().getStore(config.getProtocol());
-            store.connect(config.getHost(), config.getPort(), config.getUsername(), config.getPassword());
-        }
-
-        if (folder == null) {
-            folder = store.getFolder(config.getFolderName());
-            if (folder == null || !folder.exists()) {
-                throw new FolderNotFoundException(folder, "Folder not found or invalid: " + config.getFolderName());
+    public void processBatch(List<Exchange> exchanges) throws Exception {
+        int total = exchanges.size();
+        for (int index = 0; index < total && isRunAllowed(); index++) {
+            // only loop if we are started (allowed to run)
+            MailExchange exchange = (MailExchange) exchanges.get(index);
+            // add current index and total as properties
+            exchange.setProperty(Exchange.BATCH_INDEX, index);
+            exchange.setProperty(Exchange.BATCH_SIZE, total);
+            exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+            // process the current exchange
+            processExchange(exchange);
+            if (!exchange.isFailed()) {
+                processCommit(exchange);
+            } else {
+                processRollback(exchange);
             }
         }
     }
 
-    /**
-     * Process all the messages
-     */
-    protected void processMessages(Message[] messages) throws Exception {
+    protected List<Exchange> createExchanges(Message[] messages) throws MessagingException {
+        List<Exchange> answer = new ArrayList<Exchange>();
+
         int fetchSize = endpoint.getConfiguration().getFetchSize();
         int count = fetchSize == -1 ? messages.length : Math.min(fetchSize, messages.length);
 
@@ -166,27 +160,22 @@
         for (int i = 0; i < count; i++) {
             Message message = messages[i];
             if (!message.getFlags().contains(Flags.Flag.DELETED)) {
-
                 MailExchange exchange = endpoint.createExchange(message);
-                process(exchange);
-
-                if (!exchange.isFailed()) {
-                    processCommit(exchange);
-                } else {
-                    processRollback(exchange);
-                }
+                answer.add(exchange);
             } else {
                 if (LOG.isDebugEnabled()) {
                     LOG.debug("Skipping message as it was flagged as deleted: " + MailUtils.dumpMessage(message));
                 }
             }
         }
+
+        return answer;
     }
 
     /**
      * Strategy to process the mail message.
      */
-    protected void process(MailExchange exchange) throws Exception {
+    protected void processExchange(MailExchange exchange) throws Exception {
         if (LOG.isDebugEnabled()) {
             LOG.debug("Processing message: " + MailUtils.dumpMessage(exchange.getIn().getMessage()));
         }
@@ -215,4 +204,34 @@
         LOG.warn("Exchange failed, so rolling back message status: " + exchange);
     }
 
+    private void ensureIsConnected() throws MessagingException {
+        MailConfiguration config = endpoint.getConfiguration();
+
+        boolean connected = false;
+        try {
+            if (store != null && store.isConnected()) {
+                connected = true;
+            }
+        } catch (Exception e) {
+            LOG.debug("Exception while testing for is connected to MailStore: "
+                    + endpoint.getConfiguration().getMailStoreLogInformation()
+                    + ". Caused by: " + e.getMessage(), e);
+        }
+
+        if (!connected) {
+            if (LOG.isDebugEnabled()) {
+                LOG.debug("Connecting to MailStore: " + endpoint.getConfiguration().getMailStoreLogInformation());
+            }
+            store = sender.getSession().getStore(config.getProtocol());
+            store.connect(config.getHost(), config.getPort(), config.getUsername(), config.getPassword());
+        }
+
+        if (folder == null) {
+            folder = store.getFolder(config.getFolderName());
+            if (folder == null || !folder.exists()) {
+                throw new FolderNotFoundException(folder, "Folder not found or invalid: " + config.getFolderName());
+            }
+        }
+    }
+
 }

Copied: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java (from r777814, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java?p2=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java&p1=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java&r1=777814&r2=777891&rev=777891&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java Sat May 23 11:18:27 2009
@@ -22,47 +22,39 @@
 import javax.mail.internet.MimeMessage;
 
 import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
 import org.apache.camel.builder.RouteBuilder;
 import org.apache.camel.component.mock.MockEndpoint;
 import org.jvnet.mock_javamail.Mailbox;
 import org.springframework.mail.javamail.JavaMailSenderImpl;
 
 /**
- * Unit test for fetch size.
+ * Unit test for batch consumer.
  */
-public class MailFetchSizeTest extends ContextTestSupport {
+public class MailBatchConsumerTest extends ContextTestSupport {
 
-    public void testFetchSize() throws Exception {
+    public void testBatchConsumer() throws Exception {
         prepareMailbox();
         Mailbox mailbox = Mailbox.get("jones@localhost");
         assertEquals(5, mailbox.size());
 
         MockEndpoint mock = getMockEndpoint("mock:result");
-        mock.expectedMessageCount(2);
-        mock.expectedBodiesReceived("Message 0", "Message 1");
-        // should be done within 2 seconds as no delay when started
-        mock.setResultWaitTime(2000L);
-        mock.assertIsSatisfied();
-
-        Thread.sleep(500);
-        assertEquals(3, mailbox.size());
-
-        // reset mock to assert the next batch of 2 messages polled
-        mock.reset();
-        mock.expectedMessageCount(2);
-        mock.expectedBodiesReceived("Message 2", "Message 3");
-        // should be done within 5 (delay) + 1 seconds (polling)
-        mock.setResultWaitTime(7000L);
-        mock.assertIsSatisfied();
-
-        Thread.sleep(500);
-        assertEquals(1, mailbox.size());
-
-        // reset mock to assert the last message polled
-        mock.reset();
-        mock.expectedMessageCount(1);
-        mock.expectedBodiesReceived("Message 4");
-        mock.assertIsSatisfied();
+        mock.expectedMessageCount(5);
+        mock.expectsAscending(body());
+        mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+        mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+        mock.message(2).property(Exchange.BATCH_INDEX).isEqualTo(2);
+        mock.message(3).property(Exchange.BATCH_INDEX).isEqualTo(3);
+        mock.message(4).property(Exchange.BATCH_INDEX).isEqualTo(4);
+        mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(4).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+        mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5);
+
+        assertMockEndpointsSatisfied();
     }
 
     private void prepareMailbox() throws Exception {
@@ -88,9 +80,9 @@
     protected RouteBuilder createRouteBuilder() throws Exception {
         return new RouteBuilder() {
             public void configure() throws Exception {
-                from("pop3://jones@localhost?password=secret&fetchSize=2&consumer.delay=5000"
+                from("pop3://jones@localhost?password=secret&consumer.delay=5000"
                     + "&delete=true").to("mock:result");
             }
         };
     }
-}
+}
\ No newline at end of file