You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/20 12:22:10 UTC

[1/3] camel git commit: Added todo

Repository: camel
Updated Branches:
  refs/heads/master 110c8d2f4 -> bbf15fdd8


Added todo


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/bbf15fdd
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/bbf15fdd
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/bbf15fdd

Branch: refs/heads/master
Commit: bbf15fdd85005b04eda892fba79af4200fdf2b58
Parents: 56ccbef
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Feb 20 12:21:14 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Feb 20 12:22:02 2016 +0100

----------------------------------------------------------------------
 .../camel/component/jms/ActiveMQOriginalDestinationTest.java       | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/bbf15fdd/components/camel-jms/src/test/java/org/apache/camel/component/jms/ActiveMQOriginalDestinationTest.java
----------------------------------------------------------------------
diff --git a/components/camel-jms/src/test/java/org/apache/camel/component/jms/ActiveMQOriginalDestinationTest.java b/components/camel-jms/src/test/java/org/apache/camel/component/jms/ActiveMQOriginalDestinationTest.java
index 5510376..4c6ca5b 100644
--- a/components/camel-jms/src/test/java/org/apache/camel/component/jms/ActiveMQOriginalDestinationTest.java
+++ b/components/camel-jms/src/test/java/org/apache/camel/component/jms/ActiveMQOriginalDestinationTest.java
@@ -94,6 +94,8 @@ public class ActiveMQOriginalDestinationTest extends CamelTestSupport {
      */
     private class OriginalDestinationPropagateStrategy implements MessageCreatedStrategy {
 
+        // TODO: This is supported out of the box from ActiveMQ 5.14 onwards, and hence remove OriginalDestinationPropagateStrategy
+
         @Override
         public void onMessageCreated(Message message, Session session, Exchange exchange, Throwable cause) {
             if (exchange.getIn() instanceof JmsMessage) {


[2/3] camel git commit: CAMEL-7247: mail consumer add support for idempontent repository so you can concurrent poll the same mailbox.

Posted by da...@apache.org.
CAMEL-7247: mail consumer add support for idempontent repository so you can concurrent poll the same mailbox.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/56ccbef1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/56ccbef1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/56ccbef1

Branch: refs/heads/master
Commit: 56ccbef11d794c413807ffc3f029a089f0c6542f
Parents: 4c49932
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Feb 20 12:20:07 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Feb 20 12:22:02 2016 +0100

----------------------------------------------------------------------
 .../component/mail/DefaultMailUidGenerator.java |  6 ++---
 .../camel/component/mail/MailConsumer.java      | 28 +++++++++-----------
 2 files changed, 15 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/56ccbef1/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
index 58772ea..829d408 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/camel/blob/56ccbef1/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index 8ea2260..fc1417e 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -17,11 +17,8 @@
 package org.apache.camel.component.mail;
 
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import javax.mail.Flags;
 import javax.mail.Folder;
@@ -40,6 +37,7 @@ import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IntrospectionSupport;
+import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,12 +115,13 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         try {
             int count = folder.getMessageCount();
             if (count > 0) {
-                Map<String, Message> messages = retrieveMessages();
+                List<KeyValueHolder<String, Message>> messages = retrieveMessages();
 
                 // need to call setPeek on java-mail to avoid the message being flagged eagerly as SEEN on the server in case
                 // we process the message and rollback due an exception
                 if (getEndpoint().getConfiguration().isPeek()) {
-                    for (Message message : messages.values()) {
+                    for (KeyValueHolder<String, Message> entry : messages) {
+                        Message message = entry.getValue();
                         peekMessage(message);
                     }
                 }
@@ -242,8 +241,8 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
      * @return Messages from input folder according to the search and sort criteria stored in the endpoint
      * @throws MessagingException If message retrieval fails
      */
-    private Map<String, Message> retrieveMessages() throws MessagingException {
-        Map<String, Message> answer = new LinkedHashMap<String, Message>();
+    private List<KeyValueHolder<String, Message>> retrieveMessages() throws MessagingException {
+        List<KeyValueHolder<String, Message>> answer = new ArrayList<>();
 
         Message[] messages;
         final SortTerm[] sortTerm = getEndpoint().getSortTerm();
@@ -272,7 +271,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         for (Message message : messages) {
             String key = getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message);
             if (isValidMessage(key, message)) {
-                answer.put(key, message);
+                answer.add(new KeyValueHolder<>(key, message));
             }
         }
 
@@ -327,7 +326,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         return null;
     }
 
-    protected Queue<Exchange> createExchanges(Map<String, Message> messages) throws MessagingException {
+    protected Queue<Exchange> createExchanges(List<KeyValueHolder<String, Message>> messages) throws MessagingException {
         Queue<Exchange> answer = new LinkedList<Exchange>();
 
         int fetchSize = getEndpoint().getConfiguration().getFetchSize();
@@ -337,13 +336,10 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
             LOG.debug("Fetching {} messages. Total {} messages.", count, messages.size());
         }
 
-        Iterator<String> it = messages.keySet().iterator();
-        int i = 0;
-        while (i < count) {
-            i++;
-
-            String key = it.next();
-            Message message = messages.get(key);
+        for (int i = 0; i < count; i++) {
+            KeyValueHolder<String, Message> holder = messages.get(i);
+            String key = holder.getKey();
+            Message message = holder.getValue();
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Mail #{} is of type: {} - {}", new Object[]{i, ObjectHelper.classCanonicalName(message), message});


[3/3] camel git commit: CAMEL-7247: mail consumer add support for idempontent repository so you can concurrent poll the same mailbox.

Posted by da...@apache.org.
CAMEL-7247: mail consumer add support for idempontent repository so you can concurrent poll the same mailbox.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/4c499320
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/4c499320
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/4c499320

Branch: refs/heads/master
Commit: 4c499320ce15d0325c17c9372ce5947bc8bba69d
Parents: 110c8d2
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Feb 20 12:14:08 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Feb 20 12:22:02 2016 +0100

----------------------------------------------------------------------
 .../component/mail/DefaultMailUidGenerator.java |  87 +++++++++++++
 .../camel/component/mail/MailConsumer.java      | 121 +++++++++++--------
 .../camel/component/mail/MailEndpoint.java      |  51 ++++++++
 .../camel/component/mail/MailUidGenerator.java  |  34 ++++++
 .../component/mail/MailBatchConsumerTest.java   |  23 ++--
 .../mail/MailCommitOnCompletionTest.java        |   1 +
 .../camel/component/mail/MailCopyToTest.java    |   1 +
 .../mail/MailDoNotDeleteIfProcessFailsTest.java |   2 +
 .../camel/component/mail/MailFetchSizeTest.java |   1 +
 .../component/mail/MailFetchSizeZeroTest.java   |   1 +
 ...mpotentRepositoryDuplicateNotRemoveTest.java |  58 +++++++++
 .../MailIdempotentRepositoryDuplicateTest.java  | 105 ++++++++++++++++
 .../mail/MailIdempotentRepositoryTest.java      | 100 +++++++++++++++
 .../mail/MailMaxMessagesPerPollTest.java        |   1 +
 .../component/mail/MailPollEnrichTest.java      |   1 +
 .../mail/MailPostProcessActionTest.java         |   1 +
 .../mail/MailProcessOnlyUnseenMessagesTest.java |   2 +
 .../component/mail/MailSearchTermTest.java      |   6 +
 .../MailSearchTermUriConfigLast24HoursTest.java |   6 +
 .../mail/MailSearchTermUriConfigTest.java       |   6 +
 .../mail/MailShutdownCompleteAllTasksTest.java  |   1 +
 ...MailShutdownCompleteCurrentTaskOnlyTest.java |   1 +
 .../camel/component/mail/MailSortTermTest.java  |   3 +
 .../component/mail/MailSortTermThreeTest.java   |   3 +
 .../component/mail/MailSortTermTwoTest.java     |   3 +
 25 files changed, 556 insertions(+), 63 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
new file mode 100644
index 0000000..58772ea
--- /dev/null
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
@@ -0,0 +1,87 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ * <p/>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p/>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mail;
+
+import java.util.Enumeration;
+import java.util.UUID;
+import javax.mail.Header;
+import javax.mail.Message;
+import javax.mail.MessagingException;
+
+import org.apache.camel.util.ObjectHelper;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class DefaultMailUidGenerator implements MailUidGenerator {
+
+    private static final Logger LOG = LoggerFactory.getLogger(DefaultMailUidGenerator.class);
+
+    @Override
+    public String generateUuid(MailEndpoint mailEndpoint, Message message) {
+        String answer = generateMessageIdHeader(message);
+        if (answer == null) {
+            answer = generateMessageHash(message);
+        }
+        // fallback and use message number
+        if (answer == null || ObjectHelper.isEmpty(answer)) {
+            answer = Integer.toString(message.getMessageNumber());
+        }
+        return answer;
+    }
+
+    private String generateMessageIdHeader(Message message) {
+        LOG.trace("generateMessageIdHeader for msg: {}", message);
+
+        // there should be a Message-ID header with the UID
+        try {
+            String values[] = message.getHeader("Message-ID");
+            if (values != null && values.length > 0) {
+                String uid = values[0];
+                LOG.trace("Message-ID header found: {}", uid);
+                return uid;
+            }
+        } catch (MessagingException e) {
+            LOG.warn("Cannot read headers from mail message. This exception will be ignored.", e);
+        }
+
+        return null;
+    }
+
+    public String generateMessageHash(Message message) {
+        LOG.trace("generateMessageHash for msg: {}", message);
+
+        String uid = null;
+
+        // create an UID based on message headers on the message, that ought to be unique
+        StringBuilder buffer = new StringBuilder();
+        try {
+            Enumeration<?> it = message.getAllHeaders();
+            while (it.hasMoreElements()) {
+                Header header = (Header) it.nextElement();
+                buffer.append(header.getName()).append("=").append(header.getValue()).append("\n");
+            }
+            if (buffer.length() > 0) {
+                LOG.trace("Generating UID from the following:\n {}", buffer);
+                uid = UUID.nameUUIDFromBytes(buffer.toString().getBytes()).toString();
+            }
+        } catch (MessagingException e) {
+            LOG.warn("Cannot read headers from mail message. This exception will be ignored.", e);
+        }
+
+        return uid;
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index 170b5ec..8ea2260 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -17,15 +17,15 @@
 package org.apache.camel.component.mail;
 
 import java.util.ArrayList;
-import java.util.Enumeration;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
+import java.util.Map;
 import java.util.Queue;
-import java.util.UUID;
 import javax.mail.Flags;
 import javax.mail.Folder;
 import javax.mail.FolderNotFoundException;
-import javax.mail.Header;
 import javax.mail.Message;
 import javax.mail.MessagingException;
 import javax.mail.Store;
@@ -49,7 +49,8 @@ import org.slf4j.LoggerFactory;
  * {@link javax.mail.Transport Transport} and dispatches them to the {@link Processor}
  */
 public class MailConsumer extends ScheduledBatchPollingConsumer {
-    public static final String POP3_UID = "CamelPop3Uid";
+    public static final String MAIL_MESSAGE_UID = "CamelMailMessageId";
+
     public static final long DEFAULT_CONSUMER_DELAY = 60 * 1000L;
     private static final Logger LOG = LoggerFactory.getLogger(MailConsumer.class);
 
@@ -116,11 +117,12 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         try {
             int count = folder.getMessageCount();
             if (count > 0) {
-                Message[] messages = retrieveMessages();
+                Map<String, Message> messages = retrieveMessages();
+
                 // need to call setPeek on java-mail to avoid the message being flagged eagerly as SEEN on the server in case
                 // we process the message and rollback due an exception
                 if (getEndpoint().getConfiguration().isPeek()) {
-                    for (Message message : messages) {
+                    for (Message message : messages.values()) {
                         peekMessage(message);
                     }
                 }
@@ -240,7 +242,9 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
      * @return Messages from input folder according to the search and sort criteria stored in the endpoint
      * @throws MessagingException If message retrieval fails
      */
-    private Message[] retrieveMessages() throws MessagingException {
+    private Map<String, Message> retrieveMessages() throws MessagingException {
+        Map<String, Message> answer = new LinkedHashMap<String, Message>();
+
         Message[] messages;
         final SortTerm[] sortTerm = getEndpoint().getSortTerm();
         final SearchTerm searchTerm = computeSearchTerm();
@@ -264,7 +268,15 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
                 MailSorter.sortMessages(messages, sortTerm);
             }
         }
-        return messages;
+
+        for (Message message : messages) {
+            String key = getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message);
+            if (isValidMessage(key, message)) {
+                answer.put(key, message);
+            }
+        }
+
+        return answer;
     }
 
     private Message[] retrieveAllMessages() throws MessagingException {
@@ -289,6 +301,20 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         return msgs.toArray(new Message[msgs.size()]);
     }
 
+    private boolean isValidMessage(String key, Message msg) {
+        boolean answer = true;
+
+        if (getEndpoint().getIdempotentRepository() != null) {
+            if (!getEndpoint().getIdempotentRepository().add(key)) {
+                log.trace("This consumer is idempotent and the mail message has been consumed before matching idempotentKey: {}. Will skip this message: {}", key, msg);
+                answer = false;
+            }
+        }
+
+        log.debug("Message: {} with key: {} is valid: {}", msg, key, answer);
+        return answer;
+    }
+
     /**
      * @return Search term from endpoint (including "seen" check) or null if there is no search term
      */
@@ -301,18 +327,23 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         return null;
     }
 
-    protected Queue<Exchange> createExchanges(Message[] messages) throws MessagingException {
+    protected Queue<Exchange> createExchanges(Map<String, Message> messages) throws MessagingException {
         Queue<Exchange> answer = new LinkedList<Exchange>();
 
         int fetchSize = getEndpoint().getConfiguration().getFetchSize();
-        int count = fetchSize == -1 ? messages.length : Math.min(fetchSize, messages.length);
+        int count = fetchSize == -1 ? messages.size() : Math.min(fetchSize, messages.size());
 
         if (LOG.isDebugEnabled()) {
-            LOG.debug("Fetching {} messages. Total {} messages.", count, messages.length);
+            LOG.debug("Fetching {} messages. Total {} messages.", count, messages.size());
         }
 
-        for (int i = 0; i < count; i++) {
-            Message message = messages[i];
+        Iterator<String> it = messages.keySet().iterator();
+        int i = 0;
+        while (i < count) {
+            i++;
+
+            String key = it.next();
+            Message message = messages.get(key);
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Mail #{} is of type: {} - {}", new Object[]{i, ObjectHelper.classCanonicalName(message), message});
@@ -330,13 +361,9 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
 
                 // If the protocol is POP3 we need to remember the uid on the exchange
                 // so we can find the mail message again later to be able to delete it
-                if (getEndpoint().getConfiguration().getProtocol().startsWith("pop3")) {
-                    String uid = generatePop3Uid(message);
-                    if (uid != null) {
-                        exchange.setProperty(POP3_UID, uid);
-                        LOG.trace("POP3 mail message using uid {}", uid);
-                    }
-                }
+                // we also need to remember the UUID for idempotent repository
+                exchange.setProperty(MAIL_MESSAGE_UID, key);
+
                 answer.add(exchange);
             } else {
                 if (LOG.isDebugEnabled()) {
@@ -372,16 +399,17 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
                 folder.open(Folder.READ_WRITE);
             }
 
+            String uid = (String) exchange.removeProperty(MAIL_MESSAGE_UID);
+
             // If the protocol is POP3, the message needs to be synced with the folder via the UID.
             // Otherwise setting the DELETE/SEEN flag won't delete the message.
-            String uid = (String) exchange.removeProperty(POP3_UID);
-            if (uid != null) {
+            if (getEndpoint().getConfiguration().getProtocol().startsWith("pop3")) {
                 int count = folder.getMessageCount();
                 Message found = null;
                 LOG.trace("Looking for POP3Message with UID {} from folder with {} mails", uid, count);
                 for (int i = 1; i <= count; ++i) {
                     Message msg = folder.getMessage(i);
-                    if (uid.equals(generatePop3Uid(msg))) {
+                    if (uid.equals(getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), msg))) {
                         LOG.debug("Found POP3Message with UID {} from folder with {} mails", uid, count);
                         found = msg;
                         break;
@@ -422,6 +450,16 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
                 LOG.trace("Exchange processed, so flagging message as SEEN");
                 mail.setFlag(Flags.Flag.SEEN, true);
             }
+
+            // need to confirm or remove on commit at last
+            if (getEndpoint().getIdempotentRepository() != null) {
+                if (getEndpoint().isIdempotentRepositoryRemoveOnCommit()) {
+                    getEndpoint().getIdempotentRepository().remove(uid);
+                } else {
+                    getEndpoint().getIdempotentRepository().confirm(uid);
+                }
+            }
+
         } catch (MessagingException e) {
             getExceptionHandler().handleException("Error occurred during committing mail message: " + mail, exchange, e);
         }
@@ -434,6 +472,14 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
      * @param exchange the exchange
      */
     protected void processRollback(Message mail, Exchange exchange) {
+
+        String uid = (String) exchange.removeProperty(MAIL_MESSAGE_UID);
+
+        // need to remove on rollback
+        if (getEndpoint().getIdempotentRepository() != null) {
+            getEndpoint().getIdempotentRepository().remove(uid);
+        }
+
         Exception cause = exchange.getException();
         if (cause != null) {
             LOG.warn("Exchange failed, so rolling back message status: " + exchange, cause);
@@ -442,35 +488,6 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         }
     }
 
-    /**
-     * Generates an UID of the POP3Message
-     *
-     * @param message the POP3Message
-     * @return the generated uid
-     */
-    protected String generatePop3Uid(Message message) {
-        String uid = null;
-
-        // create an UID based on message headers on the POP3Message, that ought
-        // to be unique
-        StringBuilder buffer = new StringBuilder();
-        try {
-            Enumeration<?> it = message.getAllHeaders();
-            while (it.hasMoreElements()) {
-                Header header = (Header)it.nextElement();
-                buffer.append(header.getName()).append("=").append(header.getValue()).append("\n");
-            }
-            if (buffer.length() > 0) {
-                LOG.trace("Generating UID from the following:\n {}", buffer);
-                uid = UUID.nameUUIDFromBytes(buffer.toString().getBytes()).toString();
-            }
-        } catch (MessagingException e) {
-            LOG.warn("Cannot reader headers from mail message. This exception will be ignored.", e);
-        }
-
-        return uid;
-    }
-
     private void ensureIsConnected() throws MessagingException {
         MailConfiguration config = getEndpoint().getConfiguration();
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
index 493c2e9..56e2d12 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailEndpoint.java
@@ -26,6 +26,7 @@ import org.apache.camel.Processor;
 import org.apache.camel.Producer;
 import org.apache.camel.impl.ScheduledPollEndpoint;
 import org.apache.camel.spi.HeaderFilterStrategy;
+import org.apache.camel.spi.IdempotentRepository;
 import org.apache.camel.spi.UriEndpoint;
 import org.apache.camel.spi.UriParam;
 
@@ -57,6 +58,12 @@ public class MailEndpoint extends ScheduledPollEndpoint {
     private SortTerm[] sortTerm;
     @UriParam(label = "consumer,advanced")
     private MailBoxPostProcessAction postProcessAction;
+    @UriParam(label = "consumer,filter")
+    private IdempotentRepository<String> idempotentRepository;
+    @UriParam(label = "consumer,filter", defaultValue = "true")
+    private boolean idempotentRepositoryRemoveOnCommit = true;
+    @UriParam(label = "consumer,advanced")
+    private MailUidGenerator mailUidGenerator = new DefaultMailUidGenerator();
 
     public MailEndpoint() {
         // ScheduledPollConsumer default delay is 500 millis and that is too often for polling a mailbox,
@@ -229,6 +236,50 @@ public class MailEndpoint extends ScheduledPollEndpoint {
         this.postProcessAction = postProcessAction;
     }
 
+    public IdempotentRepository<String> getIdempotentRepository() {
+        return idempotentRepository;
+    }
+
+    /**
+     * A pluggable repository org.apache.camel.spi.IdempotentRepository which allows to cluster
+     * consuming from the same mailbox, and let the repository coordinate whether a mail message
+     * is valid for the consumer to process.
+     * <p/>
+     * By default no repository is in use.
+     */
+    public void setIdempotentRepository(IdempotentRepository<String> idempotentRepository) {
+        this.idempotentRepository = idempotentRepository;
+    }
+
+    public boolean isIdempotentRepositoryRemoveOnCommit() {
+        return idempotentRepositoryRemoveOnCommit;
+    }
+
+    /**
+     * When using idempotent repository, then when the mail message has been successfully processed and
+     * is committed, should the message id be removed from the idempotent repository (default) or
+     * be kept in the repository.
+     * <p/>
+     * By default its assumed the message id is unique and has no value to be kept in the repository,
+     * because the mail message will be marked as seen/moved or deleted to prevent it from being
+     * consumed again. And therefore having the message id stored in the idempotent repository has
+     * little value. However this option allows to store the message id, for whatever reason you may have.
+     */
+    public void setIdempotentRepositoryRemoveOnCommit(boolean idempotentRepositoryRemoveOnCommit) {
+        this.idempotentRepositoryRemoveOnCommit = idempotentRepositoryRemoveOnCommit;
+    }
+
+    public MailUidGenerator getMailUidGenerator() {
+        return mailUidGenerator;
+    }
+
+    /**
+     * A pluggable {@link MailUidGenerator} that allows to use custom logic to generate UUID of the mail message.
+     */
+    public void setMailUidGenerator(MailUidGenerator mailUidGenerator) {
+        this.mailUidGenerator = mailUidGenerator;
+    }
+
     /**
      * Milliseconds before the next poll.
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailUidGenerator.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailUidGenerator.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailUidGenerator.java
new file mode 100644
index 0000000..bd0cd6c
--- /dev/null
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailUidGenerator.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mail;
+
+import javax.mail.Message;
+
+/**
+ * To generate an unique ID of the mail message.
+ */
+public interface MailUidGenerator {
+
+    /**
+     * Generates an unique ID of the mail message depending on if its POP3 or IMAP protocol.
+     *
+     * @param message the mail message
+     * @return the unique id, must never be <tt>null</tt>.
+     */
+    String generateUuid(MailEndpoint mailEndpoint, Message message);
+
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
index 04b8a97..6f0ec89 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailBatchConsumerTest.java
@@ -47,17 +47,17 @@ public class MailBatchConsumerTest extends CamelTestSupport {
         MockEndpoint mock = getMockEndpoint("mock:result");
         mock.expectedMessageCount(5);
         mock.expectsAscending(body());
-        mock.message(0).property(Exchange.BATCH_INDEX).isEqualTo(0);
-        mock.message(1).property(Exchange.BATCH_INDEX).isEqualTo(1);
-        mock.message(2).property(Exchange.BATCH_INDEX).isEqualTo(2);
-        mock.message(3).property(Exchange.BATCH_INDEX).isEqualTo(3);
-        mock.message(4).property(Exchange.BATCH_INDEX).isEqualTo(4);
-        mock.message(0).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(1).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(2).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(3).property(Exchange.BATCH_COMPLETE).isEqualTo(false);
-        mock.message(4).property(Exchange.BATCH_COMPLETE).isEqualTo(true);
+        mock.message(0).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(0);
+        mock.message(1).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(1);
+        mock.message(2).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(2);
+        mock.message(3).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(3);
+        mock.message(4).exchangeProperty(Exchange.BATCH_INDEX).isEqualTo(4);
+        mock.message(0).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(1).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(2).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(3).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(false);
+        mock.message(4).exchangeProperty(Exchange.BATCH_COMPLETE).isEqualTo(true);
         mock.expectedPropertyReceived(Exchange.BATCH_SIZE, 5);
 
         assertMockEndpointsSatisfied();
@@ -77,6 +77,7 @@ public class MailBatchConsumerTest extends CamelTestSupport {
         Message[] messages = new Message[5];
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setHeader("Message-ID", "" + i);
             messages[i].setText("Message " + i);
         }
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCommitOnCompletionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCommitOnCompletionTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCommitOnCompletionTest.java
index 5daba56..6a6317f 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCommitOnCompletionTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCommitOnCompletionTest.java
@@ -70,6 +70,7 @@ public class MailCommitOnCompletionTest extends CamelTestSupport {
         Message[] messages = new Message[5];
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setHeader("Message-ID", "" + i);
             messages[i].setText("Message " + i);
         }
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCopyToTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCopyToTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCopyToTest.java
index 13c49fb..cf08c6e 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCopyToTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailCopyToTest.java
@@ -66,6 +66,7 @@ public class MailCopyToTest extends CamelTestSupport {
         Message[] messages = new Message[5];
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setHeader("Message-ID", "" + i);
             messages[i].setText("Message " + i);
         }
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java
index 4d4fcab..17c4171 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailDoNotDeleteIfProcessFailsTest.java
@@ -68,9 +68,11 @@ public class MailDoNotDeleteIfProcessFailsTest extends CamelTestSupport {
         Message[] msg = new Message[2];
         msg[0] = new MimeMessage(sender.getSession());
         msg[0].setText("Message 1");
+        msg[0].setHeader("Message-ID", "0");
         msg[0].setFlag(Flags.Flag.SEEN, false);
         msg[1] = new MimeMessage(sender.getSession());
         msg[1].setText("Message 2");
+        msg[0].setHeader("Message-ID", "1");
         msg[1].setFlag(Flags.Flag.SEEN, true);
         folder.appendMessages(msg);
         folder.close(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
index ad78c5d..35b5faf 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeTest.java
@@ -85,6 +85,7 @@ public class MailFetchSizeTest extends CamelTestSupport {
         Message[] messages = new Message[5];
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setHeader("Message-ID", "" + i);
             messages[i].setText("Message " + i);
         }
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeZeroTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeZeroTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeZeroTest.java
index 5de6241..99f7ba9 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeZeroTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailFetchSizeZeroTest.java
@@ -67,6 +67,7 @@ public class MailFetchSizeZeroTest extends CamelTestSupport {
         Message[] messages = new Message[5];
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setHeader("Message-ID", "" + i);
             messages[i].setText("Message " + i);
         }
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateNotRemoveTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateNotRemoveTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateNotRemoveTest.java
new file mode 100644
index 0000000..2c29626
--- /dev/null
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateNotRemoveTest.java
@@ -0,0 +1,58 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mail;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.junit.Test;
+import org.jvnet.mock_javamail.Mailbox;
+
+/**
+ * Unit test for idempotent repository.
+ */
+public class MailIdempotentRepositoryDuplicateNotRemoveTest extends MailIdempotentRepositoryDuplicateTest {
+
+    @Test
+    public void testIdempotent() throws Exception {
+        assertEquals(1, myRepo.getCacheSize());
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // no 3 is already in the idempotent repo
+        mock.expectedBodiesReceived("Message 0", "Message 1", "Message 2", "Message 4");
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        // windows need a little slack
+        Thread.sleep(500);
+
+        assertEquals(0, Mailbox.get("jones@localhost").getNewMessageCount());
+
+        // they are not removed so we should have all 5 in the repo now
+        assertEquals(5, myRepo.getCacheSize());
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("imap://jones@localhost?password=secret&idempotentRepository=#myRepo&idempotentRepositoryRemoveOnCommit=false").routeId("foo").noAutoStartup()
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateTest.java
new file mode 100644
index 0000000..54a52e2
--- /dev/null
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryDuplicateTest.java
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mail;
+
+import javax.mail.Folder;
+import javax.mail.Message;
+import javax.mail.Store;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.jvnet.mock_javamail.Mailbox;
+
+/**
+ * Unit test for idempotent repository.
+ */
+public class MailIdempotentRepositoryDuplicateTest extends CamelTestSupport {
+
+    MemoryIdempotentRepository myRepo = new MemoryIdempotentRepository();
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myRepo", myRepo);
+        return jndi;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        // lets assume this ID is already done
+        myRepo.add("myuid-3");
+
+        prepareMailbox();
+        super.setUp();
+    }
+
+    @Test
+    public void testIdempotent() throws Exception {
+        assertEquals(1, myRepo.getCacheSize());
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        // no 3 is already in the idempotent repo
+        mock.expectedBodiesReceived("Message 0", "Message 1", "Message 2", "Message 4");
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        // windows need a little slack
+        Thread.sleep(500);
+
+        assertEquals(0, Mailbox.get("jones@localhost").getNewMessageCount());
+
+        // they are removed on confirm
+        assertEquals(1, myRepo.getCacheSize());
+    }
+
+    private void prepareMailbox() throws Exception {
+        // connect to mailbox
+        Mailbox.clearAll();
+        JavaMailSender sender = new DefaultJavaMailSender();
+        Store store = sender.getSession().getStore("pop3");
+        store.connect("localhost", 25, "jones", "secret");
+        Folder folder = store.getFolder("INBOX");
+        folder.open(Folder.READ_WRITE);
+        folder.expunge();
+
+        // inserts 5 new messages
+        Message[] messages = new Message[5];
+        for (int i = 0; i < 5; i++) {
+            messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setText("Message " + i);
+            messages[i].setHeader("Message-ID", "myuid-" + i);
+        }
+        folder.appendMessages(messages);
+        folder.close(true);
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("imap://jones@localhost?password=secret&idempotentRepository=#myRepo").routeId("foo").noAutoStartup()
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryTest.java
new file mode 100644
index 0000000..e481426
--- /dev/null
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailIdempotentRepositoryTest.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.mail;
+
+import javax.mail.Folder;
+import javax.mail.Message;
+import javax.mail.Store;
+import javax.mail.internet.MimeMessage;
+
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.processor.idempotent.MemoryIdempotentRepository;
+import org.apache.camel.test.junit4.CamelTestSupport;
+import org.junit.Test;
+import org.jvnet.mock_javamail.Mailbox;
+
+/**
+ * Unit test for idempotent repository.
+ */
+public class MailIdempotentRepositoryTest extends CamelTestSupport {
+
+    private MemoryIdempotentRepository myRepo = new MemoryIdempotentRepository();
+
+    @Override
+    protected JndiRegistry createRegistry() throws Exception {
+        JndiRegistry jndi = super.createRegistry();
+        jndi.bind("myRepo", myRepo);
+        return jndi;
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        prepareMailbox();
+        super.setUp();
+    }
+
+    @Test
+    public void testIdempotent() throws Exception {
+        assertEquals(0, myRepo.getCacheSize());
+
+        MockEndpoint mock = getMockEndpoint("mock:result");
+        mock.expectedMessageCount(5);
+
+        context.startRoute("foo");
+
+        assertMockEndpointsSatisfied();
+
+        // windows need a little slack
+        Thread.sleep(500);
+
+        assertEquals(0, Mailbox.get("jones@localhost").getNewMessageCount());
+        // they get deleted after processing by default so we should be back to 0
+        assertEquals(0, myRepo.getCacheSize());
+    }
+
+    private void prepareMailbox() throws Exception {
+        // connect to mailbox
+        Mailbox.clearAll();
+        JavaMailSender sender = new DefaultJavaMailSender();
+        Store store = sender.getSession().getStore("pop3");
+        store.connect("localhost", 25, "jones", "secret");
+        Folder folder = store.getFolder("INBOX");
+        folder.open(Folder.READ_WRITE);
+        folder.expunge();
+
+        // inserts 5 new messages
+        Message[] messages = new Message[5];
+        for (int i = 0; i < 5; i++) {
+            messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setText("Message " + i);
+            messages[i].setHeader("Message-ID", "myuid-" + i);
+        }
+        folder.appendMessages(messages);
+        folder.close(true);
+    }
+
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            public void configure() throws Exception {
+                from("imap://jones@localhost?password=secret&idempotentRepository=#myRepo").routeId("foo").noAutoStartup()
+                        .to("mock:result");
+            }
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java
index 4af0249..012ce85 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailMaxMessagesPerPollTest.java
@@ -74,6 +74,7 @@ public class MailMaxMessagesPerPollTest extends CamelTestSupport {
         Message[] messages = new Message[5];
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setHeader("Message-ID", "" + i);
             messages[i].setText("Message " + i);
         }
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPollEnrichTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPollEnrichTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPollEnrichTest.java
index ba908d5..c84acef 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPollEnrichTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPollEnrichTest.java
@@ -78,6 +78,7 @@ public class MailPollEnrichTest extends CamelTestSupport {
         Message[] messages = new Message[5];
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
+            messages[i].setHeader("Message-ID", "" + i);
             messages[i].setText("Message " + i);
         }
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPostProcessActionTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPostProcessActionTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPostProcessActionTest.java
index c9a1655..f7c67c5 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPostProcessActionTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailPostProcessActionTest.java
@@ -90,6 +90,7 @@ public class MailPostProcessActionTest extends CamelTestSupport {
         Message[] messages = new Message[1];
         messages[0] = new MimeMessage(sender.getSession());
         messages[0].setSubject("TestSubject");
+        messages[0].setHeader("Message-ID", "0");
         messages[0].setText("TestText");
 
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
index ca4db4a..c73fb8f 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailProcessOnlyUnseenMessagesTest.java
@@ -72,9 +72,11 @@ public class MailProcessOnlyUnseenMessagesTest extends CamelTestSupport {
         Message[] msg = new Message[2];
         msg[0] = new MimeMessage(sender.getSession());
         msg[0].setText("Message 1");
+        msg[0].setHeader("Message-ID", "0");
         msg[0].setFlag(Flags.Flag.SEEN, true);
         msg[1] = new MimeMessage(sender.getSession());
         msg[1].setText("Message 2");
+        msg[0].setHeader("Message-ID", "1");
         msg[1].setFlag(Flags.Flag.SEEN, true);
         folder.appendMessages(msg);
         folder.close(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermTest.java
index 5dfa1f3..a54a975 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermTest.java
@@ -81,31 +81,37 @@ public class MailSearchTermTest extends CamelTestSupport {
         messages[0] = new MimeMessage(sender.getSession());
         messages[0].setSubject("Apache Camel rocks");
         messages[0].setText("I like riding the Camel");
+        messages[0].setHeader("Message-ID", "0");
         messages[0].setFrom(new InternetAddress("someone@somewhere.com"));
 
         messages[1] = new MimeMessage(sender.getSession());
         messages[1].setSubject("Order");
         messages[1].setText("Ordering Camel in Action");
+        messages[1].setHeader("Message-ID", "1");
         messages[1].setFrom(new InternetAddress("dude@somewhere.com"));
 
         messages[2] = new MimeMessage(sender.getSession());
         messages[2].setSubject("Order");
         messages[2].setText("Ordering ActiveMQ in Action");
+        messages[2].setHeader("Message-ID", "2");
         messages[2].setFrom(new InternetAddress("dude@somewhere.com"));
 
         messages[3] = new MimeMessage(sender.getSession());
         messages[3].setSubject("Buy pharmacy");
         messages[3].setText("This is spam");
+        messages[3].setHeader("Message-ID", "3");
         messages[3].setFrom(new InternetAddress("spam@me.com"));
 
         messages[4] = new MimeMessage(sender.getSession());
         messages[4].setSubject("Beers tonight?");
         messages[4].setText("We meet at 7pm the usual place");
+        messages[4].setHeader("Message-ID", "4");
         messages[4].setFrom(new InternetAddress("barney@simpsons.com"));
 
         messages[5] = new MimeMessage(sender.getSession());
         messages[5].setSubject("Spambot attack");
         messages[5].setText("I am attaching you");
+        messages[5].setHeader("Message-ID", "5");
         messages[5].setFrom(new InternetAddress("spambot@me.com"));
 
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigLast24HoursTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigLast24HoursTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigLast24HoursTest.java
index 770dbc8..3a5463c 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigLast24HoursTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigLast24HoursTest.java
@@ -69,6 +69,7 @@ public class MailSearchTermUriConfigLast24HoursTest extends CamelTestSupport {
         messages[0] = new MimeMessage(sender.getSession());
         messages[0].setSubject("Apache Camel rocks");
         messages[0].setText("I like riding the Camel");
+        messages[0].setHeader("Message-ID", "0");
         messages[0].setFrom(new InternetAddress("someone@somewhere.com"));
         messages[0].setSentDate(new Date(twoDaysAgo));
 
@@ -76,29 +77,34 @@ public class MailSearchTermUriConfigLast24HoursTest extends CamelTestSupport {
         messages[1].setSubject("Order");
         messages[1].setText("Ordering Camel in Action");
         messages[1].setFrom(new InternetAddress("dude@somewhere.com"));
+        messages[1].setHeader("Message-ID", "1");
         messages[1].setSentDate(new Date(twoDaysAgo));
 
         messages[2] = new MimeMessage(sender.getSession());
         messages[2].setSubject("Order");
         messages[2].setText("Ordering ActiveMQ in Action");
+        messages[2].setHeader("Message-ID", "2");
         messages[2].setFrom(new InternetAddress("dude@somewhere.com"));
         messages[2].setSentDate(new Date(twentyHoursAgo));
 
         messages[3] = new MimeMessage(sender.getSession());
         messages[3].setSubject("Buy pharmacy");
         messages[3].setText("This is spam");
+        messages[3].setHeader("Message-ID", "3");
         messages[3].setFrom(new InternetAddress("spam@me.com"));
         messages[3].setSentDate(new Date(twentyHoursAgo));
 
         messages[4] = new MimeMessage(sender.getSession());
         messages[4].setSubject("Beers tonight?");
         messages[4].setText("We meet at 7pm the usual place");
+        messages[4].setHeader("Message-ID", "4");
         messages[4].setFrom(new InternetAddress("barney@simpsons.com"));
         messages[4].setSentDate(new Date(oneHourAgo));
 
         messages[5] = new MimeMessage(sender.getSession());
         messages[5].setSubject("Spambot attack");
         messages[5].setText("I am attaching you");
+        messages[5].setHeader("Message-ID", "5");
         messages[5].setFrom(new InternetAddress("spambot@me.com"));
         messages[5].setSentDate(new Date());
         messages[5].setSentDate(new Date(oneHourAgo));

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigTest.java
index b1dd46a..4306ef1 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSearchTermUriConfigTest.java
@@ -62,31 +62,37 @@ public class MailSearchTermUriConfigTest extends CamelTestSupport {
         messages[0] = new MimeMessage(sender.getSession());
         messages[0].setSubject("Apache Camel rocks");
         messages[0].setText("I like riding the Camel");
+        messages[0].setHeader("Message-ID", "0");
         messages[0].setFrom(new InternetAddress("someone@somewhere.com"));
 
         messages[1] = new MimeMessage(sender.getSession());
         messages[1].setSubject("Order");
         messages[1].setText("Ordering Camel in Action");
+        messages[1].setHeader("Message-ID", "1");
         messages[1].setFrom(new InternetAddress("dude@somewhere.com"));
 
         messages[2] = new MimeMessage(sender.getSession());
         messages[2].setSubject("Order");
         messages[2].setText("Ordering ActiveMQ in Action");
+        messages[2].setHeader("Message-ID", "2");
         messages[2].setFrom(new InternetAddress("dude@somewhere.com"));
 
         messages[3] = new MimeMessage(sender.getSession());
         messages[3].setSubject("Buy pharmacy");
         messages[3].setText("This is spam");
+        messages[3].setHeader("Message-ID", "3");
         messages[3].setFrom(new InternetAddress("spam@me.com"));
 
         messages[4] = new MimeMessage(sender.getSession());
         messages[4].setSubject("Beers tonight?");
         messages[4].setText("We meet at 7pm the usual place");
+        messages[4].setHeader("Message-ID", "4");
         messages[4].setFrom(new InternetAddress("barney@simpsons.com"));
 
         messages[5] = new MimeMessage(sender.getSession());
         messages[5].setSubject("Spambot attack");
         messages[5].setText("I am attaching you");
+        messages[5].setHeader("Message-ID", "5");
         messages[5].setFrom(new InternetAddress("spambot@me.com"));
 
         folder.appendMessages(messages);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteAllTasksTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteAllTasksTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteAllTasksTest.java
index 357177b..c8abb30 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteAllTasksTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteAllTasksTest.java
@@ -89,6 +89,7 @@ public class MailShutdownCompleteAllTasksTest extends CamelTestSupport {
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
             messages[i].setText("Message " + i);
+            messages[i].setHeader("Message-ID", "" + i);
         }
         folder.appendMessages(messages);
         folder.close(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteCurrentTaskOnlyTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteCurrentTaskOnlyTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteCurrentTaskOnlyTest.java
index 05d37b3..506d475 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteCurrentTaskOnlyTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailShutdownCompleteCurrentTaskOnlyTest.java
@@ -72,6 +72,7 @@ public class MailShutdownCompleteCurrentTaskOnlyTest extends CamelTestSupport {
         for (int i = 0; i < 5; i++) {
             messages[i] = new MimeMessage(sender.getSession());
             messages[i].setText("Message " + i);
+            messages[i].setHeader("Message-ID", "" + i);
         }
         folder.appendMessages(messages);
         folder.close(true);

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTest.java
index 8131220..26ea02f 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTest.java
@@ -81,16 +81,19 @@ public class MailSortTermTest extends CamelTestSupport {
         Message[] messages = new Message[3];
         messages[0] = new MimeMessage(sender.getSession());
         messages[0].setText("Earlier date");
+        messages[0].setHeader("Message-ID", "0");
         messages[0].setSentDate(new Date(10000));
         messages[0].setSubject("Camel");
 
         messages[1] = new MimeMessage(sender.getSession());
         messages[1].setText("Later date");
+        messages[1].setHeader("Message-ID", "1");
         messages[1].setSentDate(new Date(20000));
         messages[1].setSubject("Camel");
 
         messages[2] = new MimeMessage(sender.getSession());
         messages[2].setText("Even later date");
+        messages[2].setHeader("Message-ID", "2");
         messages[2].setSentDate(new Date(30000));
         messages[2].setSubject("Invalid");
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermThreeTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermThreeTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermThreeTest.java
index 70c1ec9..730ee0b 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermThreeTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermThreeTest.java
@@ -79,16 +79,19 @@ public class MailSortTermThreeTest extends CamelTestSupport {
         Message[] messages = new Message[3];
         messages[0] = new MimeMessage(sender.getSession());
         messages[0].setText("Earlier date");
+        messages[0].setHeader("Message-ID", "0");
         messages[0].setSentDate(new Date(10000));
         messages[0].setSubject("Camel");
 
         messages[1] = new MimeMessage(sender.getSession());
         messages[1].setText("Later date");
+        messages[1].setHeader("Message-ID", "1");
         messages[1].setSentDate(new Date(20000));
         messages[1].setSubject("Camel");
 
         messages[2] = new MimeMessage(sender.getSession());
         messages[2].setText("Even later date");
+        messages[2].setHeader("Message-ID", "2");
         messages[2].setSentDate(new Date(30000));
         messages[2].setSubject("Invalid");
 

http://git-wip-us.apache.org/repos/asf/camel/blob/4c499320/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTwoTest.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTwoTest.java b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTwoTest.java
index c43eba9..ba6bbd5 100644
--- a/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTwoTest.java
+++ b/components/camel-mail/src/test/java/org/apache/camel/component/mail/MailSortTermTwoTest.java
@@ -79,16 +79,19 @@ public class MailSortTermTwoTest extends CamelTestSupport {
         Message[] messages = new Message[3];
         messages[0] = new MimeMessage(sender.getSession());
         messages[0].setText("Earlier date");
+        messages[0].setHeader("Message-ID", "0");
         messages[0].setSentDate(new Date(10000));
         messages[0].setSubject("Camel");
 
         messages[1] = new MimeMessage(sender.getSession());
         messages[1].setText("Later date");
+        messages[1].setHeader("Message-ID", "1");
         messages[1].setSentDate(new Date(20000));
         messages[1].setSubject("Camel");
 
         messages[2] = new MimeMessage(sender.getSession());
         messages[2].setText("Even later date");
+        messages[2].setHeader("Message-ID", "2");
         messages[2].setSentDate(new Date(30000));
         messages[2].setSubject("Invalid");