You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/11/20 02:18:16 UTC

[james-project] 10/18: JAMES-3440 Utility to populate EmailQueryView

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 98f7c13d5342ab3050f9a4ed88aba90a81b6521f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 17 17:03:37 2020 +0700

    JAMES-3440 Utility to populate EmailQueryView
---
 .../data/jmap/EmailQueryViewPopulator.java         | 209 +++++++++++++++++++++
 1 file changed, 209 insertions(+)

diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
new file mode 100644
index 0000000..2141fbd
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
@@ -0,0 +1,209 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.webadmin.data.jmap;
+
+import static org.apache.james.mailbox.MailboxManager.MailboxSearchFetchType.Minimal;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.jmap.api.projections.EmailQueryView;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.FetchGroup;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxMetaData;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.model.search.MailboxQuery;
+import org.apache.james.mime4j.dom.Message;
+import org.apache.james.mime4j.stream.MimeConfig;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class EmailQueryViewPopulator {
+    private static final Logger LOGGER = LoggerFactory.getLogger(EmailQueryViewPopulator.class);
+    private static final Duration PERIOD = Duration.ofSeconds(1);
+    public static final int USER_CONCURRENCY = 1;
+    public static final int MAILBOX_CONCURRENCY = 1;
+
+    static class Progress {
+        private final AtomicLong processedUserCount;
+        private final AtomicLong processedMessageCount;
+        private final AtomicLong failedUserCount;
+        private final AtomicLong failedMessageCount;
+
+        Progress() {
+            failedUserCount = new AtomicLong();
+            processedMessageCount = new AtomicLong();
+            processedUserCount = new AtomicLong();
+            failedMessageCount = new AtomicLong();
+        }
+
+        private void incrementProcessedUserCount() {
+            processedUserCount.incrementAndGet();
+        }
+
+        private void incrementProcessedMessageCount() {
+            processedMessageCount.incrementAndGet();
+        }
+
+        private void incrementFailedUserCount() {
+            failedUserCount.incrementAndGet();
+        }
+
+        private void incrementFailedMessageCount() {
+            failedMessageCount.incrementAndGet();
+        }
+
+        long getProcessedUserCount() {
+            return processedUserCount.get();
+        }
+
+        long getProcessedMessageCount() {
+            return processedMessageCount.get();
+        }
+
+        long getFailedUserCount() {
+            return failedUserCount.get();
+        }
+
+        long getFailedMessageCount() {
+            return failedMessageCount.get();
+        }
+    }
+
+    private final UsersRepository usersRepository;
+    private final MailboxManager mailboxManager;
+    private final EmailQueryView emailQueryView;
+
+    @Inject
+    EmailQueryViewPopulator(UsersRepository usersRepository,
+                            MailboxManager mailboxManager,
+                            EmailQueryView emailQueryView) {
+        this.usersRepository = usersRepository;
+        this.mailboxManager = mailboxManager;
+        this.emailQueryView = emailQueryView;
+    }
+
+    Mono<Result> populateView(Progress progress, RunningOptions runningOptions) {
+        return correctProjection(listAllMailboxMessages(progress), runningOptions, progress);
+    }
+
+    private Flux<MessageResult> listAllMailboxMessages(Progress progress) {
+        try {
+            return Iterators.toFlux(usersRepository.list())
+                .map(mailboxManager::createSystemSession)
+                .doOnNext(any -> progress.incrementProcessedUserCount())
+                .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY);
+        } catch (UsersRepositoryException e) {
+            return Flux.error(e);
+        }
+    }
+
+    private Flux<MessageResult> listUserMailboxMessages(Progress progress, MailboxSession session) {
+        return listUsersMailboxes(session)
+            .flatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata), MAILBOX_CONCURRENCY)
+            .flatMap(Throwing.function(messageManager -> listAllMessages(messageManager, session)), MAILBOX_CONCURRENCY)
+            .onErrorResume(MailboxException.class, e -> {
+                LOGGER.error("JMAP emailQuery view re-computation aborted for {} as we failed listing user mailboxes", session.getUser(), e);
+                progress.incrementFailedUserCount();
+                return Flux.empty();
+            });
+    }
+
+    private Mono<Result> correctProjection(MessageResult messageResult, Progress progress) {
+        return Mono.fromCallable(() -> {
+            MailboxId mailboxId = messageResult.getMailboxId();
+            MessageId messageId = messageResult.getMessageId();
+            ZonedDateTime receivedAt = ZonedDateTime.ofInstant(messageResult.getInternalDate().toInstant(), ZoneOffset.UTC);
+            Message mime4JMessage = parseMessage(messageResult);
+            Date sentAtDate = Optional.ofNullable(mime4JMessage.getDate()).orElse(messageResult.getInternalDate());
+            ZonedDateTime sentAt = ZonedDateTime.ofInstant(sentAtDate.toInstant(), ZoneOffset.UTC);
+
+            return new EmailQueryView.Entry(mailboxId, messageId, sentAt, receivedAt);
+        })
+            .flatMap(entry -> emailQueryView.save(entry.getMailboxId(), entry.getSentAt(), entry.getReceivedAt(), entry.getMessageId()))
+            .thenReturn(Result.COMPLETED)
+            .doOnSuccess(any -> progress.incrementProcessedMessageCount())
+            .onErrorResume(e -> {
+                LOGGER.error("JMAP emailQuery view re-computation aborted for {} - {} - {}",
+                    messageResult.getMailboxId(),
+                    messageResult.getMessageId(),
+                    messageResult.getUid(), e);
+                progress.incrementFailedMessageCount();
+                return Mono.just(Result.PARTIAL);
+            });
+    }
+
+    private Mono<Result> correctProjection(Flux<MessageResult> entries, RunningOptions runningOptions, Progress progress) {
+        return entries.transform(ReactorUtils.<MessageResult, Result>throttle()
+                .elements(runningOptions.getMessagesPerSecond())
+                .per(PERIOD)
+                .forOperation(entry -> correctProjection(entry, progress)))
+            .reduce(Task::combine)
+            .switchIfEmpty(Mono.just(Result.COMPLETED));
+    }
+
+    private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) {
+        return mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(), Minimal, session);
+    }
+
+    private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) {
+        return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session));
+    }
+
+    private Flux<MessageResult> listAllMessages(MessageManager messageManager, MailboxSession session) {
+        try {
+            return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session));
+        } catch (MailboxException e) {
+            return Flux.error(e);
+        }
+    }
+
+    private Message parseMessage(MessageResult messageResult) throws IOException, MailboxException {
+        return Message.Builder
+            .of()
+            .use(MimeConfig.PERMISSIVE)
+            .parse(messageResult.getFullContent().getInputStream())
+            .build();
+    }
+}


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org