You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2009/03/13 09:55:59 UTC
svn commit: r753175 - in /camel/trunk/components/camel-mail/src:
main/java/org/apache/camel/component/mail/
test/java/org/apache/camel/component/mail/
Author: davsclaus
Date: Fri Mar 13 08:55:58 2009
New Revision: 753175
URL: http://svn.apache.org/viewvc?rev=753175&view=rev
Log:
CAMEL-1178: mail consumer will now rollback if the processing on the exchange is failed, to allow consuming the same mail next time. Renamed some long options for Camel 2.0.
Added:
camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java
- copied, changed from r753137, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
Modified:
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java
camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConfiguration.java Fri Mar 13 08:55:58 2009
@@ -50,9 +50,9 @@
private String defaultEncoding;
private String from = DEFAULT_FROM;
private String folderName = DEFAULT_FOLDER_NAME;
- private boolean deleteProcessedMessages;
+ private boolean delete;
+ private boolean unseen = true;
private boolean ignoreUriScheme;
- private boolean processOnlyUnseenMessages = true;
private Map<Message.RecipientType, String> recipients = new HashMap<Message.RecipientType, String>();
private int fetchSize = -1;
private boolean debugMode;
@@ -313,12 +313,12 @@
this.from = from;
}
- public boolean isDeleteProcessedMessages() {
- return deleteProcessedMessages;
+ public boolean isDelete() {
+ return delete;
}
- public void setDeleteProcessedMessages(boolean deleteProcessedMessages) {
- this.deleteProcessedMessages = deleteProcessedMessages;
+ public void setDelete(boolean delete) {
+ this.delete = delete;
}
public String getFolderName() {
@@ -337,12 +337,12 @@
this.ignoreUriScheme = ignoreUriScheme;
}
- public boolean isProcessOnlyUnseenMessages() {
- return processOnlyUnseenMessages;
+ public boolean isUnseen() {
+ return unseen;
}
- public void setProcessOnlyUnseenMessages(boolean processOnlyUnseenMessages) {
- this.processOnlyUnseenMessages = processOnlyUnseenMessages;
+ public void setUnseen(boolean unseen) {
+ this.unseen = unseen;
}
/**
Modified: camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java (original)
+++ camel/trunk/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java Fri Mar 13 08:55:58 2009
@@ -73,7 +73,7 @@
if (store == null || folder == null) {
throw new IllegalStateException("MailConsumer did not connect properly to the MailStore: "
- + endpoint.getConfiguration().getMailStoreLogInformation());
+ + endpoint.getConfiguration().getMailStoreLogInformation());
}
if (LOG.isDebugEnabled()) {
@@ -96,7 +96,7 @@
Message[] messages;
// should we process all messages or only unseen messages
- if (endpoint.getConfiguration().isProcessOnlyUnseenMessages()) {
+ if (endpoint.getConfiguration().isUnseen()) {
messages = folder.search(new FlagTerm(new Flags(Flags.Flag.SEEN), false));
} else {
messages = folder.getMessages();
@@ -107,6 +107,8 @@
} else if (count == -1) {
throw new MessagingException("Folder: " + folder.getFullName() + " is closed");
}
+ } catch (Exception e) {
+ handleException(e);
} finally {
// need to ensure we release resources
try {
@@ -153,8 +155,15 @@
for (int i = 0; i < count; i++) {
Message message = messages[i];
if (!message.getFlags().contains(Flags.Flag.DELETED)) {
- processMessage(message);
- flagMessageProcessed(message);
+
+ MailExchange exchange = endpoint.createExchange(message);
+ process(exchange);
+
+ if (!exchange.isFailed()) {
+ processCommit(exchange);
+ } else {
+ processRollback(exchange);
+ }
} else {
if (LOG.isDebugEnabled()) {
LOG.debug("Skipping message as it was flagged as deleted: " + MailUtils.dumpMessage(message));
@@ -166,10 +175,9 @@
/**
* Strategy to process the mail message.
*/
- protected void processMessage(Message message) throws Exception {
- MailExchange exchange = endpoint.createExchange(message);
+ protected void process(MailExchange exchange) throws Exception {
if (LOG.isDebugEnabled()) {
- LOG.debug("Processing message: " + MailUtils.dumpMessage(message));
+ LOG.debug("Processing message: " + MailUtils.dumpMessage(exchange.getIn().getMessage()));
}
getProcessor().process(exchange);
}
@@ -177,11 +185,23 @@
/**
* Strategy to flag the message after being processed.
*/
- protected void flagMessageProcessed(Message message) throws MessagingException {
- if (endpoint.getConfiguration().isDeleteProcessedMessages()) {
+ protected void processCommit(MailExchange exchange) throws MessagingException {
+ Message message = exchange.getIn().getMessage();
+
+ if (endpoint.getConfiguration().isDelete()) {
+ LOG.debug("Exchange processed, so flagging message as DELETED");
message.setFlag(Flags.Flag.DELETED, true);
} else {
+ LOG.debug("Exchange processed, so flagging message as SEEN");
message.setFlag(Flags.Flag.SEEN, true);
}
}
+
+ /**
+ * Strategy when processing the exchange failed.
+ */
+ protected void processRollback(MailExchange exchange) throws MessagingException {
+ LOG.warn("Exchange failed, so rolling back message status: " + exchange);
+ }
+
}
Modified: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailComponentTest.java Fri Mar 13 08:55:58 2009
@@ -71,11 +71,11 @@
assertEquals("encoding", null, config.getDefaultEncoding());
assertEquals("from", "camel@localhost", config.getFrom());
assertEquals("password", "secret", config.getPassword());
- assertEquals(false, config.isDeleteProcessedMessages());
+ assertEquals(false, config.isDelete());
assertEquals(false, config.isIgnoreUriScheme());
assertEquals("fetchSize", -1, config.getFetchSize());
assertEquals("contentType", "text/plain", config.getContentType());
- assertEquals("processOnlyUnseenMessages", true, config.isProcessOnlyUnseenMessages());
+ assertEquals("unseen", true, config.isUnseen());
}
public void testDefaultPOP3Configuration() throws Exception {
@@ -90,11 +90,11 @@
assertEquals("encoding", null, config.getDefaultEncoding());
assertEquals("from", "camel@localhost", config.getFrom());
assertEquals("password", "secret", config.getPassword());
- assertEquals(false, config.isDeleteProcessedMessages());
+ assertEquals(false, config.isDelete());
assertEquals(false, config.isIgnoreUriScheme());
assertEquals("fetchSize", -1, config.getFetchSize());
assertEquals("contentType", "text/plain", config.getContentType());
- assertEquals("processOnlyUnseenMessages", true, config.isProcessOnlyUnseenMessages());
+ assertEquals("unseen", true, config.isUnseen());
}
public void testDefaultIMAPConfiguration() throws Exception {
@@ -109,17 +109,17 @@
assertEquals("encoding", null, config.getDefaultEncoding());
assertEquals("from", "camel@localhost", config.getFrom());
assertEquals("password", "secret", config.getPassword());
- assertEquals(false, config.isDeleteProcessedMessages());
+ assertEquals(false, config.isDelete());
assertEquals(false, config.isIgnoreUriScheme());
assertEquals("fetchSize", -1, config.getFetchSize());
assertEquals("contentType", "text/plain", config.getContentType());
- assertEquals("processOnlyUnseenMessages", true, config.isProcessOnlyUnseenMessages());
+ assertEquals("unseen", true, config.isUnseen());
}
public void testManyConfigurations() throws Exception {
MailEndpoint endpoint = resolveMandatoryEndpoint("smtp://james@myhost:30/subject?password=secret"
- + "&from=me@camelriders.org&deleteProcessedMessages=true&defaultEncoding=iso-8859-1&folderName=riders"
- + "&contentType=text/html&processOnlyUnseenMessages=false");
+ + "&from=me@camelriders.org&delete=true&defaultEncoding=iso-8859-1&folderName=riders"
+ + "&contentType=text/html&unseen=false");
MailConfiguration config = endpoint.getConfiguration();
assertEquals("getProtocol()", "smtp", config.getProtocol());
assertEquals("getHost()", "myhost", config.getHost());
@@ -130,10 +130,10 @@
assertEquals("encoding", "iso-8859-1", config.getDefaultEncoding());
assertEquals("from", "me@camelriders.org", config.getFrom());
assertEquals("password", "secret", config.getPassword());
- assertEquals(true, config.isDeleteProcessedMessages());
+ assertEquals(true, config.isDelete());
assertEquals(false, config.isIgnoreUriScheme());
assertEquals("fetchSize", -1, config.getFetchSize());
- assertEquals("processOnlyUnseenMessages", false, config.isProcessOnlyUnseenMessages());
+ assertEquals("unseen", false, config.isUnseen());
assertEquals("contentType", "text/html", config.getContentType());
}
@@ -149,7 +149,7 @@
assertEquals("encoding", null, config.getDefaultEncoding());
assertEquals("from", "camel@localhost", config.getFrom());
assertEquals("password", "secret", config.getPassword());
- assertEquals(false, config.isDeleteProcessedMessages());
+ assertEquals(false, config.isDelete());
assertEquals(false, config.isIgnoreUriScheme());
assertEquals("fetchSize", -1, config.getFetchSize());
}
@@ -166,7 +166,7 @@
assertEquals("encoding", null, config.getDefaultEncoding());
assertEquals("from", "camel@localhost", config.getFrom());
assertEquals("password", "secret", config.getPassword());
- assertEquals(false, config.isDeleteProcessedMessages());
+ assertEquals(false, config.isDelete());
assertEquals(false, config.isIgnoreUriScheme());
assertEquals("fetchSize", -1, config.getFetchSize());
}
Copied: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java (from r753137, camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java?p2=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java&p1=camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java&r1=753137&r2=753175&rev=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java Fri Mar 13 08:55:58 2009
@@ -23,39 +23,35 @@
import javax.mail.internet.MimeMessage;
import org.apache.camel.ContextTestSupport;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
-import org.apache.camel.component.mock.MockEndpoint;
import org.jvnet.mock_javamail.Mailbox;
import org.springframework.mail.javamail.JavaMailSenderImpl;
/**
- * Unit test for processOnlyUnseenMessages option.
+ * Unit test for rollback option.
*/
-public class MailProcessOnlyUnseenMessagesTest extends ContextTestSupport {
+public class MailDoNotDeleteIfProcessFailsTest extends ContextTestSupport {
- public void testProcessOnlyUnseenMessages() throws Exception {
+ private static int counter;
+
+ public void testRoolbackIfProcessFails() throws Exception {
prepareMailbox();
- sendBody("direct:a", "Message 3");
+ getMockEndpoint("mock:result").expectedBodiesReceived("Message 1");
+ // the first 2 attempt should fail
+ getMockEndpoint("mock:error").expectedMessageCount(2);
+
+ assertMockEndpointsSatisfied();
- MockEndpoint mock = getMockEndpoint("mock:result");
- mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Message 3");
- mock.assertIsSatisfied();
-
- // reset mock so we can make new assertions
- mock.reset();
-
- // send a new message, now we should only receive this new massages as all the others has been SEEN
- sendBody("direct:a", "Message 4");
- mock.expectedMessageCount(1);
- mock.expectedBodiesReceived("Message 4");
- mock.assertIsSatisfied();
+ assertEquals(3, counter);
}
private void prepareMailbox() throws Exception {
// connect to mailbox
Mailbox.clearAll();
+
JavaMailSenderImpl sender = new JavaMailSenderImpl();
Store store = sender.getSession().getStore("imap");
store.connect("localhost", 25, "claus", "secret");
@@ -63,11 +59,11 @@
folder.open(Folder.READ_WRITE);
folder.expunge();
- // inserts two messages with the SEEN flag
+ // inserts two new messages
Message[] msg = new Message[2];
msg[0] = new MimeMessage(sender.getSession());
msg[0].setText("Message 1");
- msg[0].setFlag(Flags.Flag.SEEN, true);
+ msg[0].setFlag(Flags.Flag.SEEN, false);
msg[1] = new MimeMessage(sender.getSession());
msg[1].setText("Message 2");
msg[1].setFlag(Flags.Flag.SEEN, true);
@@ -78,11 +74,21 @@
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- from("direct:a").to("smtp://claus@localhost");
+ // no redelivery for unit test as we want it to be polled next time
+ errorHandler(deadLetterChannel("mock:error").maximumRedeliveries(0).logStackTrace(false));
- from("imap://localhost?username=claus&password=secret&processOnlyUnseenMessages=true&consumer.delay=1000").to("mock:result");
+ from("imap://localhost?username=claus&password=secret&unseen=true&delay=250")
+ .process(new Processor() {
+ public void process(Exchange exchange) throws Exception {
+ counter++;
+ if (counter < 3) {
+ throw new IllegalArgumentException("Forced by unit test");
+ }
+ }
+ })
+ .to("mock:result");
}
};
}
-}
+}
\ No newline at end of file
Modified: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java Fri Mar 13 08:55:58 2009
@@ -89,7 +89,7 @@
return new RouteBuilder() {
public void configure() throws Exception {
from("pop3://jones@localhost?password=secret&fetchSize=2&consumer.delay=5000"
- + "&deleteProcessedMessages=true").to("mock:result");
+ + "&delete=true").to("mock:result");
}
};
}
Modified: camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java?rev=753175&r1=753174&r2=753175&view=diff
==============================================================================
--- camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java (original)
+++ camel/trunk/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java Fri Mar 13 08:55:58 2009
@@ -29,7 +29,7 @@
import org.springframework.mail.javamail.JavaMailSenderImpl;
/**
- * Unit test for processOnlyUnseenMessages option.
+ * Unit test for unseen option.
*/
public class MailProcessOnlyUnseenMessagesTest extends ContextTestSupport {
@@ -80,7 +80,7 @@
public void configure() throws Exception {
from("direct:a").to("smtp://claus@localhost");
- from("imap://localhost?username=claus&password=secret&processOnlyUnseenMessages=true&consumer.delay=1000").to("mock:result");
+ from("imap://localhost?username=claus&password=secret&unseen=true&consumer.delay=1000").to("mock:result");
}
};
}