You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/05/23 13:18:27 UTC
svn commit: r777891 - in /camel/trunk/components/camel-mail/src:
main/java/org/apache/camel/component/mail/MailConsumer.java
test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
Author: davsclaus
Date: Sat May 23 11:18:27 2009
New Revision: 777891
URL: http://svn.apache.org/viewvc?rev=777891&view=rev
Log:
CAMEL-1640: mail consumer is now batch consumer.
Added:
camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
- copied, changed from r777814, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
Modified:
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=777891&r1=777890&r2=777891&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Sat May 23 11:18:27 2009
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.mail;
+import java.util.ArrayList;
+import java.util.List;
import javax.mail.Flags;
import javax.mail.Folder;
import javax.mail.FolderNotFoundException;
@@ -24,6 +26,8 @@
import javax.mail.Store;
import javax.mail.search.FlagTerm;
+import org.apache.camel.BatchConsumer;
+import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.impl.ScheduledPollConsumer;
import org.apache.commons.logging.Log;
@@ -36,7 +40,7 @@
*
* @version $Revision$
*/
-public class MailConsumer extends ScheduledPollConsumer {
+public class MailConsumer extends ScheduledPollConsumer implements BatchConsumer {
public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
private static final transient Log LOG = LogFactory.getLog(MailConsumer.class);
@@ -102,7 +106,8 @@
messages = folder.getMessages();
}
- processMessages(messages);
+ List<Exchange> exchanges = createExchanges(messages);
+ processBatch(exchanges);
} else if (count == -1) {
throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
@@ -122,40 +127,29 @@
}
}
- protected void ensureIsConnected() throws MessagingException {
- MailConfiguration config = endpoint.getConfiguration();
-
- boolean connected = false;
- try {
- if (store != null && store.isConnected()) {
- connected = true;
- }
- } catch (Exception e) {
- LOG.debug("Exception while testing for is connected to MailStore: "
- + endpoint.getConfiguration().getMailStoreLogInformation()
- + ". Caused by: " + e.getMessage(), e);
- }
-
- if (!connected) {
- if (LOG.isDebugEnabled()) {
- LOG.debug("Connecting to MailStore: " + endpoint.getConfiguration().getMailStoreLogInformation());
- }
- store = sender.getSession().getStore(config.getProtocol());
- store.connect(config.getHost(), config.getPort(), config.getUsername(), config.getPassword());
- }
-
- if (folder == null) {
- folder = store.getFolder(config.getFolderName());
- if (folder == null || !folder.exists()) {
- throw new FolderNotFoundException(folder, "Folder not found or invalid: " + config.getFolderName());
+ public void processBatch(List<Exchange> exchanges) throws Exception {
+ int total = exchanges.size();
+ for (int index = 0; index < total && isRunAllowed(); index++) {
+ // only loop if we are started (allowed to run)
+ MailExchange exchange = (MailExchange) exchanges.get(index);
+ // add current index and total as properties
+ exchange.setProperty(Exchange.BATCH_INDEX, index);
+ exchange.setProperty(Exchange.BATCH_SIZE, total);
+ exchange.setProperty(Exchange.BATCH_COMPLETE, index == total - 1);
+
+ // process the current exchange
+ processExchange(exchange);
+ if (!exchange.isFailed()) {
+ processCommit(exchange);
+ } else {
+ processRollback(exchange);
}
}
}
- /**
- * Process all the messages
- */
- protected void processMessages(Message[] messages) throws Exception {
+ protected List<Exchange> createExchanges(Message[] messages) throws MessagingException {
+ List<Exchange> answer = new ArrayList<Exchange>();
+
int fetchSize = endpoint.getConfiguration().getFetchSize();
int count = fetchSize == -1 ? messages.length : Math.min(fetchSize, messages.length);
@@ -166,27 +160,22 @@
for (int i = 0; i < count; i++) {
Message message = messages[i];
if (!message.getFlags().contains(Flags.Flag.DELETED)) {
-
MailExchange exchange = endpoint.createExchange(message);
- process(exchange);
-
- if (!exchange.isFailed()) {
- processCommit(exchange);
- } else {
- processRollback(exchange);
- }
+ answer.add(exchange);
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping message as it was flagged as deleted: " + MailUtils.dumpMessage(message));
}
}
}
+
+ return answer;
}
/**
* Strategy to process the mail message.
*/
- protected void process(MailExchange exchange) throws Exception {
+ protected void processExchange(MailExchange exchange) throws Exception {
if (LOG.isDebugEnabled()) {
LOG.debug("Processing message: " + MailUtils.dumpMessage(exchange.getIn().getMessage()));
}
@@ -215,4 +204,34 @@
LOG.warn("Exchange failed, so rolling back message status: " + exchange);
}
+ private void ensureIsConnected() throws MessagingException {
+ MailConfiguration config = endpoint.getConfiguration();
+
+ boolean connected = false;
+ try {
+ if (store != null && store.isConnected()) {
+ connected = true;
+ }
+ } catch (Exception e) {
+ LOG.debug("Exception while testing for is connected to MailStore: "
+ + endpoint.getConfiguration().getMailStoreLogInformation()
+ + ". Caused by: " + e.getMessage(), e);
+ }
+
+ if (!connected) {
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Connecting to MailStore: " + endpoint.getConfiguration().getMailStoreLogInformation());
+ }
+ store = sender.getSession().getStore(config.getProtocol());
+ store.connect(config.getHost(), config.getPort(), config.getUsername(), config.getPassword());
+ }
+
+ if (folder == null) {
+ folder = store.getFolder(config.getFolderName());
+ if (folder == null || !folder.exists()) {
+ throw new FolderNotFoundException(folder, "Folder not found or invalid: " + config.getFolderName());
+ }
+ }
+ }
+
}
Copied: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java (from r777814, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java?p2=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java&p1=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java&r1=777814&r2=777891&rev=777891&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java Sat May 23 11:18:27 2009
@@ -22,47 +22,39 @@
import javax.mail.internet.MimeMessage;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
import org.jvnet.mock_javamail.Mailbox;
import org.springframework.mail.javamail.JavaMailSenderImpl;
/**
- * Unit test for fetch size.
+ * Unit test for batch consumer.
*/
-public class MailFetchSizeTest extends ContextTestSupport {
+public class MailBatchConsumerTest extends ContextTestSupport {
- public void testFetchSize() throws Exception {
+ public void testBatchConsumer() throws Exception {
prepareMailbox();
Mailbox mailbox = Mailbox.get("jones@localhost");
assertEquals(5, mailbox.size());
MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(2);
- mock.expectedBodiesReceived("Message 0", "Message 1");
- // should be done within 2 seconds as no delay when started
- mock.setResultWaitTime(2000L);
- mock.assertIsSatisfied();
-
- Thread.sleep(500);
- assertEquals(3, mailbox.size());
-
- // reset mock to assert the next batch of 2 messages polled
- mock.reset();
- mock.expectedMessageCount(2);
- mock.expectedBodiesReceived("Message 2", "Message 3");
- // should be done within 5 (delay) + 1 seconds (polling)
- mock.setResultWaitTime(7000L);
- mock.assertIsSatisfied();
-
- Thread.sleep(500);
- assertEquals(1, mailbox.size());
-
- // reset mock to assert the last message polled
- mock.reset();
- mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Message 4");
- mock.assertIsSatisfied();
+ mock.expectedMessageCount(5);
+ mock.expectsAscending(body());
+ mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
+ mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
+ mock.message(2).property(Exchange.BATCH_INDEX).isEqualTo(2);
+ mock.message(3).property(Exchange.BATCH_INDEX).isEqualTo(3);
+ mock.message(4).property(Exchange.BATCH_INDEX).isEqualTo(4);
+ mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
+ mock.message(4).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+ mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5);
+
+ assertMockEndpointsSatisfied();
}
private void prepareMailbox() throws Exception {
@@ -88,9 +80,9 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("pop3://jones@localhost?password=secret&fetchSize=2&consumer.delay=5000"
+ from("pop3://jones@localhost?password=secret&consumer.delay=5000"
+ "&delete=true").to("mock:result");
}
};
}
-}
+}
\ No newline at end of file