You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by bt...@apache.org on 2020/06/21 09:16:27 UTC

[james-project] 04/12: JAMES-3202 Add `retrieveIndexedFlags` in MessageSearchIndex API

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 683504103a573c06b24211931acc6f1cd63847ab
Author: Rene Cordier <rc...@linagora.com>
AuthorDate: Wed Jun 10 17:40:32 2020 +0700

    JAMES-3202 Add `retrieveIndexedFlags` in MessageSearchIndex API
---
 .../ElasticSearchListeningMessageSearchIndex.java  |   6 +
 ...asticSearchListeningMessageSearchIndexTest.java |   3 +-
 .../lucene/search/LuceneMessageSearchIndex.java    |   5 +
 .../store/search/LazyMessageSearchIndex.java       | 307 +++++++++++----------
 .../store/search/ListeningMessageSearchIndex.java  |   7 +
 .../org/apache/james/FakeMessageSearchIndex.java   |   7 +
 6 files changed, 184 insertions(+), 151 deletions(-)

diff --git a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
index 091db6b..20caacb 100644
--- a/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch/src/main/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndex.java
@@ -29,6 +29,7 @@ import java.util.stream.Stream;
 
 import javax.inject.Inject;
 import javax.inject.Named;
+import javax.mail.Flags;
 
 import org.apache.james.backends.es.DocumentId;
 import org.apache.james.backends.es.ElasticSearchIndexer;
@@ -213,4 +214,9 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
             LOGGER.error("No messageUid for {} in mailbox {}", searchResult.getMessageUid(), searchResult.getMailboxId());
         }
     }
+
+    @Override
+    public Mono<Flags> retrieveIndexedFlags(Mailbox mailbox, MessageUid uid) {
+        return Mono.empty();
+    }
 }
diff --git a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
index 5b617bc..b5a5359 100644
--- a/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
+++ b/mailbox/elasticsearch/src/test/java/org/apache/james/mailbox/elasticsearch/events/ElasticSearchListeningMessageSearchIndexTest.java
@@ -92,9 +92,11 @@ class ElasticSearchListeningMessageSearchIndexTest {
     static final MessageUid MESSAGE_UID_1 = MessageUid.of(25);
     static final MessageUid MESSAGE_UID_2 = MessageUid.of(26);
     static final MessageUid MESSAGE_UID_3 = MessageUid.of(27);
+    static final MessageUid MESSAGE_UID_4 = MessageUid.of(28);
     static final MessageId MESSAGE_ID_1 = TestMessageId.of(18L);
     static final MessageId MESSAGE_ID_2 = TestMessageId.of(19L);
     static final MessageId MESSAGE_ID_3 = TestMessageId.of(20L);
+    static final MessageId MESSAGE_ID_4 = TestMessageId.of(21L);
 
     static final SimpleMailboxMessage.Builder MESSAGE_BUILDER = SimpleMailboxMessage.builder()
         .mailboxId(MAILBOX_ID)
@@ -441,5 +443,4 @@ class ElasticSearchListeningMessageSearchIndexTest {
         assertThatCode(() -> testee.deleteAll(session, mailbox.getMailboxId()).block())
             .doesNotThrowAnyException();
     }
-
 }
\ No newline at end of file
diff --git a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
index d3da37c..7337a4f 100644
--- a/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
+++ b/mailbox/lucene/src/main/java/org/apache/james/mailbox/lucene/search/LuceneMessageSearchIndex.java
@@ -1297,4 +1297,9 @@ public class LuceneMessageSearchIndex extends ListeningMessageSearchIndex {
     public void commit() throws IOException {
         writer.commit();
     }
+
+    @Override
+    public Mono<Flags> retrieveIndexedFlags(Mailbox mailbox, MessageUid uid) {
+        return Mono.empty();
+    }
 }
\ No newline at end of file
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
index eac5ee5..459cc22 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/LazyMessageSearchIndex.java
@@ -1,150 +1,157 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one   *
- * or more contributor license agreements.  See the NOTICE file *
- * distributed with this work for additional information        *
- * regarding copyright ownership.  The ASF licenses this file   *
- * to you under the Apache License, Version 2.0 (the            *
- * "License"); you may not use this file except in compliance   *
- * with the License.  You may obtain a copy of the License at   *
- *                                                              *
- *   http://www.apache.org/licenses/LICENSE-2.0                 *
- *                                                              *
- * Unless required by applicable law or agreed to in writing,   *
- * software distributed under the License is distributed on an  *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
- * KIND, either express or implied.  See the License for the    *
- * specific language governing permissions and limitations      *
- * under the License.                                           *
- ****************************************************************/
-package org.apache.james.mailbox.store.search;
-
-import java.util.Collection;
-import java.util.EnumSet;
-import java.util.Iterator;
-import java.util.List;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.stream.Stream;
-
-import org.apache.james.mailbox.MailboxManager;
-import org.apache.james.mailbox.MailboxManager.SearchCapabilities;
-import org.apache.james.mailbox.MailboxSession;
-import org.apache.james.mailbox.MessageUid;
-import org.apache.james.mailbox.SessionProvider;
-import org.apache.james.mailbox.events.Group;
-import org.apache.james.mailbox.exception.MailboxException;
-import org.apache.james.mailbox.exception.UnsupportedSearchException;
-import org.apache.james.mailbox.model.Mailbox;
-import org.apache.james.mailbox.model.MailboxId;
-import org.apache.james.mailbox.model.MessageId;
-import org.apache.james.mailbox.model.MessageRange;
-import org.apache.james.mailbox.model.SearchQuery;
-import org.apache.james.mailbox.model.UpdatedFlags;
-import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
-import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
-import org.apache.james.mailbox.store.mail.model.MailboxMessage;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.base.Preconditions;
-
-import reactor.core.publisher.Flux;
-import reactor.core.publisher.Mono;
-
-/**
- * {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it.
- * 
- * The only special thing about this is that it will index all the mails in the mailbox on the first call of {@link #search(MailboxSession, Mailbox, SearchQuery)}
- * 
- * This class is mostly useful for in-memory indexes or for indexed that should be recreated on every server restart.
- * 
- *
- */
-public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
-    public static class LazyMessageSearchIndexGroup extends Group {
-
-    }
-
-    private static final Logger LOGGER = LoggerFactory.getLogger(LazyMessageSearchIndex.class);
-    private static final Group GROUP = new LazyMessageSearchIndexGroup();
-
-    private final ListeningMessageSearchIndex index;
-    private final ConcurrentHashMap<MailboxId, Object> indexed = new ConcurrentHashMap<>();
-    private final MailboxSessionMapperFactory factory;
-    
-    
-    public LazyMessageSearchIndex(ListeningMessageSearchIndex index, MailboxSessionMapperFactory factory, SessionProvider sessionProvider) {
-        super(factory, sessionProvider);
-        this.index = index;
-        this.factory = factory;
-    }
-
-    @Override
-    public Group getDefaultGroup() {
-        return GROUP;
-    }
-    
-    @Override
-    public EnumSet<SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities) {
-        return EnumSet.noneOf(SearchCapabilities.class);
-    }
-
-    @Override
-    public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
-        return index.add(session, mailbox, message);
-    }
-
-    @Override
-    public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
-        return index.delete(session, mailbox, expungedUids);
-    }
-
-    @Override
-    public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
-        return index.deleteAll(session, mailboxId);
-    }
-
-    /**
-     * Lazy index the mailbox on first search request if it was not indexed before. After indexing is done it delegate the search request to the wrapped
-     * {@link MessageSearchIndex}. Be aware that concurrent search requests are blocked on the same "not-yet-indexed" mailbox till it the index process was 
-     * complete
-     * 
-     */
-    @Override
-    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
-        Preconditions.checkArgument(session != null, "'session' is mandatory");
-        MailboxId id = mailbox.getMailboxId();
-        
-        Object done = indexed.get(id);
-        if (done == null) {
-            done = new Object();
-            Object oldDone = indexed.putIfAbsent(id, done);
-            if (oldDone != null) {
-                done = oldDone;
-            }
-            synchronized (done) {
-                Iterator<MailboxMessage> messages = factory.getMessageMapper(session).findInMailbox(mailbox, MessageRange.all(), FetchType.Full, UNLIMITED);
-                while (messages.hasNext()) {
-                    final MailboxMessage message = messages.next();
-                    try {
-                        add(session, mailbox, message).block();
-                    } catch (Exception e) {
-                        LOGGER.error("Unable to index message {} in mailbox {}", message.getUid(), mailbox.getName(), e);
-                    }
-                }
-            }
-        }
-       
-        return index.search(session, mailbox, searchQuery);
-    }
-
-    @Override
-    public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
-        return index.update(session, mailbox, updatedFlagsList);
-    }
-    
-
-    @Override
-    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
-        throw new UnsupportedSearchException();
-    }
-}
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.mailbox.store.search;
+
+import java.util.Collection;
+import java.util.EnumSet;
+import java.util.Iterator;
+import java.util.List;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.stream.Stream;
+
+import javax.mail.Flags;
+
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxManager.SearchCapabilities;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageUid;
+import org.apache.james.mailbox.SessionProvider;
+import org.apache.james.mailbox.events.Group;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.exception.UnsupportedSearchException;
+import org.apache.james.mailbox.model.Mailbox;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.SearchQuery;
+import org.apache.james.mailbox.model.UpdatedFlags;
+import org.apache.james.mailbox.store.MailboxSessionMapperFactory;
+import org.apache.james.mailbox.store.mail.MessageMapper.FetchType;
+import org.apache.james.mailbox.store.mail.model.MailboxMessage;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.base.Preconditions;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+/**
+ * {@link ListeningMessageSearchIndex} implementation which wraps another {@link ListeningMessageSearchIndex} and will forward all calls to it.
+ * 
+ * The only special thing about this is that it will index all the mails in the mailbox on the first call of {@link #search(MailboxSession, Mailbox, SearchQuery)}
+ * 
+ * This class is mostly useful for in-memory indexes or for indexed that should be recreated on every server restart.
+ * 
+ *
+ */
+public class LazyMessageSearchIndex extends ListeningMessageSearchIndex {
+    public static class LazyMessageSearchIndexGroup extends Group {
+
+    }
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(LazyMessageSearchIndex.class);
+    private static final Group GROUP = new LazyMessageSearchIndexGroup();
+
+    private final ListeningMessageSearchIndex index;
+    private final ConcurrentHashMap<MailboxId, Object> indexed = new ConcurrentHashMap<>();
+    private final MailboxSessionMapperFactory factory;
+    
+    
+    public LazyMessageSearchIndex(ListeningMessageSearchIndex index, MailboxSessionMapperFactory factory, SessionProvider sessionProvider) {
+        super(factory, sessionProvider);
+        this.index = index;
+        this.factory = factory;
+    }
+
+    @Override
+    public Group getDefaultGroup() {
+        return GROUP;
+    }
+    
+    @Override
+    public EnumSet<SearchCapabilities> getSupportedCapabilities(EnumSet<MailboxManager.MessageCapabilities> messageCapabilities) {
+        return EnumSet.noneOf(SearchCapabilities.class);
+    }
+
+    @Override
+    public Mono<Void> add(MailboxSession session, Mailbox mailbox, MailboxMessage message) {
+        return index.add(session, mailbox, message);
+    }
+
+    @Override
+    public Mono<Void> delete(MailboxSession session, Mailbox mailbox, Collection<MessageUid> expungedUids) {
+        return index.delete(session, mailbox, expungedUids);
+    }
+
+    @Override
+    public Mono<Void> deleteAll(MailboxSession session, MailboxId mailboxId) {
+        return index.deleteAll(session, mailboxId);
+    }
+
+    /**
+     * Lazy index the mailbox on first search request if it was not indexed before. After indexing is done it delegate the search request to the wrapped
+     * {@link MessageSearchIndex}. Be aware that concurrent search requests are blocked on the same "not-yet-indexed" mailbox till it the index process was 
+     * complete
+     * 
+     */
+    @Override
+    public Stream<MessageUid> search(MailboxSession session, Mailbox mailbox, SearchQuery searchQuery) throws MailboxException {
+        Preconditions.checkArgument(session != null, "'session' is mandatory");
+        MailboxId id = mailbox.getMailboxId();
+        
+        Object done = indexed.get(id);
+        if (done == null) {
+            done = new Object();
+            Object oldDone = indexed.putIfAbsent(id, done);
+            if (oldDone != null) {
+                done = oldDone;
+            }
+            synchronized (done) {
+                Iterator<MailboxMessage> messages = factory.getMessageMapper(session).findInMailbox(mailbox, MessageRange.all(), FetchType.Full, UNLIMITED);
+                while (messages.hasNext()) {
+                    final MailboxMessage message = messages.next();
+                    try {
+                        add(session, mailbox, message).block();
+                    } catch (Exception e) {
+                        LOGGER.error("Unable to index message {} in mailbox {}", message.getUid(), mailbox.getName(), e);
+                    }
+                }
+            }
+        }
+       
+        return index.search(session, mailbox, searchQuery);
+    }
+
+    @Override
+    public Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList) {
+        return index.update(session, mailbox, updatedFlagsList);
+    }
+    
+
+    @Override
+    public Flux<MessageId> search(MailboxSession session, Collection<MailboxId> mailboxIds, SearchQuery searchQuery, long limit) throws MailboxException {
+        throw new UnsupportedSearchException();
+    }
+
+    @Override
+    public Mono<Flags> retrieveIndexedFlags(Mailbox mailbox, MessageUid uid) {
+        return index.retrieveIndexedFlags(mailbox, uid);
+    }
+}
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
index d9ebfe9..59f4260 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/search/ListeningMessageSearchIndex.java
@@ -21,6 +21,8 @@ package org.apache.james.mailbox.store.search;
 import java.util.Collection;
 import java.util.List;
 
+import javax.mail.Flags;
+
 import org.apache.james.mailbox.MailboxSession;
 import org.apache.james.mailbox.MessageUid;
 import org.apache.james.mailbox.SessionProvider;
@@ -142,4 +144,9 @@ public abstract class ListeningMessageSearchIndex implements MessageSearchIndex,
      * @param updatedFlagsList list of flags that were updated
      */
     public abstract Mono<Void> update(MailboxSession session, Mailbox mailbox, List<UpdatedFlags> updatedFlagsList);
+
+    /**
+     * Retrieves flags of an indexed message
+     */
+    public abstract Mono<Flags> retrieveIndexedFlags(Mailbox mailbox, MessageUid uid);
 }
diff --git a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
index 46fe6d0..3c7beef 100644
--- a/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
+++ b/server/container/guice/memory-guice/src/main/java/org/apache/james/FakeMessageSearchIndex.java
@@ -24,6 +24,8 @@ import java.util.EnumSet;
 import java.util.List;
 import java.util.stream.Stream;
 
+import javax.mail.Flags;
+
 import org.apache.commons.lang3.NotImplementedException;
 import org.apache.james.mailbox.MailboxManager;
 import org.apache.james.mailbox.MailboxSession;
@@ -96,4 +98,9 @@ public class FakeMessageSearchIndex extends ListeningMessageSearchIndex {
     public ExecutionMode getExecutionMode() {
         throw new NotImplementedException("not implemented");
     }
+
+    @Override
+    public Mono<Flags> retrieveIndexedFlags(Mailbox mailbox, MessageUid uid) {
+        throw new NotImplementedException("not implemented");
+    }
 }


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org