You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2016/02/20 12:22:11 UTC

[2/3] camel git commit: CAMEL-7247: mail consumer add support for idempontent repository so you can concurrent poll the same mailbox.

CAMEL-7247: mail consumer add support for idempontent repository so you can concurrent poll the same mailbox.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/56ccbef1
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/56ccbef1
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/56ccbef1

Branch: refs/heads/master
Commit: 56ccbef11d794c413807ffc3f029a089f0c6542f
Parents: 4c49932
Author: Claus Ibsen <da...@apache.org>
Authored: Sat Feb 20 12:20:07 2016 +0100
Committer: Claus Ibsen <da...@apache.org>
Committed: Sat Feb 20 12:22:02 2016 +0100

----------------------------------------------------------------------
 .../component/mail/DefaultMailUidGenerator.java |  6 ++---
 .../camel/component/mail/MailConsumer.java      | 28 +++++++++-----------
 2 files changed, 15 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/56ccbef1/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
index 58772ea..829d408 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/DefaultMailUidGenerator.java
@@ -5,9 +5,9 @@
  * The ASF licenses this file to You under the Apache License, Version 2.0
  * (the "License"); you may not use this file except in compliance with
  * the License.  You may obtain a copy of the License at
- * <p/>
- * http://www.apache.org/licenses/LICENSE-2.0
- * <p/>
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
  * Unless required by applicable law or agreed to in writing, software
  * distributed under the License is distributed on an "AS IS" BASIS,
  * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.

http://git-wip-us.apache.org/repos/asf/camel/blob/56ccbef1/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
----------------------------------------------------------------------
diff --git a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
index 8ea2260..fc1417e 100644
--- a/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
+++ b/components/camel-mail/src/main/java/org/apache/camel/component/mail/MailConsumer.java
@@ -17,11 +17,8 @@
 package org.apache.camel.component.mail;
 
 import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
 import java.util.LinkedList;
 import java.util.List;
-import java.util.Map;
 import java.util.Queue;
 import javax.mail.Flags;
 import javax.mail.Folder;
@@ -40,6 +37,7 @@ import org.apache.camel.impl.ScheduledBatchPollingConsumer;
 import org.apache.camel.support.SynchronizationAdapter;
 import org.apache.camel.util.CastUtils;
 import org.apache.camel.util.IntrospectionSupport;
+import org.apache.camel.util.KeyValueHolder;
 import org.apache.camel.util.ObjectHelper;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -117,12 +115,13 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         try {
             int count = folder.getMessageCount();
             if (count > 0) {
-                Map<String, Message> messages = retrieveMessages();
+                List<KeyValueHolder<String, Message>> messages = retrieveMessages();
 
                 // need to call setPeek on java-mail to avoid the message being flagged eagerly as SEEN on the server in case
                 // we process the message and rollback due an exception
                 if (getEndpoint().getConfiguration().isPeek()) {
-                    for (Message message : messages.values()) {
+                    for (KeyValueHolder<String, Message> entry : messages) {
+                        Message message = entry.getValue();
                         peekMessage(message);
                     }
                 }
@@ -242,8 +241,8 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
      * @return Messages from input folder according to the search and sort criteria stored in the endpoint
      * @throws MessagingException If message retrieval fails
      */
-    private Map<String, Message> retrieveMessages() throws MessagingException {
-        Map<String, Message> answer = new LinkedHashMap<String, Message>();
+    private List<KeyValueHolder<String, Message>> retrieveMessages() throws MessagingException {
+        List<KeyValueHolder<String, Message>> answer = new ArrayList<>();
 
         Message[] messages;
         final SortTerm[] sortTerm = getEndpoint().getSortTerm();
@@ -272,7 +271,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         for (Message message : messages) {
             String key = getEndpoint().getMailUidGenerator().generateUuid(getEndpoint(), message);
             if (isValidMessage(key, message)) {
-                answer.put(key, message);
+                answer.add(new KeyValueHolder<>(key, message));
             }
         }
 
@@ -327,7 +326,7 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
         return null;
     }
 
-    protected Queue<Exchange> createExchanges(Map<String, Message> messages) throws MessagingException {
+    protected Queue<Exchange> createExchanges(List<KeyValueHolder<String, Message>> messages) throws MessagingException {
         Queue<Exchange> answer = new LinkedList<Exchange>();
 
         int fetchSize = getEndpoint().getConfiguration().getFetchSize();
@@ -337,13 +336,10 @@ public class MailConsumer extends ScheduledBatchPollingConsumer {
             LOG.debug("Fetching {} messages. Total {} messages.", count, messages.size());
         }
 
-        Iterator<String> it = messages.keySet().iterator();
-        int i = 0;
-        while (i < count) {
-            i++;
-
-            String key = it.next();
-            Message message = messages.get(key);
+        for (int i = 0; i < count; i++) {
+            KeyValueHolder<String, Message> holder = messages.get(i);
+            String key = holder.getKey();
+            Message message = holder.getValue();
 
             if (LOG.isTraceEnabled()) {
                 LOG.trace("Mail #{} is of type: {} - {}", new Object[]{i, ObjectHelper.classCanonicalName(message), message});