You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2020/11/20 02:18:16 UTC
[james-project] 10/18: JAMES-3440 Utility to populate EmailQueryView
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 98f7c13d5342ab3050f9a4ed88aba90a81b6521f
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Tue Nov 17 17:03:37 2020 +0700
JAMES-3440 Utility to populate EmailQueryView
---
.../data/jmap/EmailQueryViewPopulator.java | 209 +++++++++++++++++++++
1 file changed, 209 insertions(+)
diff --git a/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
new file mode 100644
index 0000000..2141fbd
--- /dev/null
+++ b/server/protocols/webadmin/webadmin-jmap/src/main/java/org/apache/james/webadmin/data/jmap/EmailQueryViewPopulator.java
@@ -0,0 +1,209 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.webadmin.data.jmap;
+
+import static org.apache.james.mailbox.MailboxManager.MailboxSearchFetchType.Minimal;
+
+import java.io.IOException;
+import java.time.Duration;
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+import java.util.Date;
+import java.util.Optional;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.inject.Inject;
+
+import org.apache.james.jmap.api.projections.EmailQueryView;
+import org.apache.james.mailbox.MailboxManager;
+import org.apache.james.mailbox.MailboxSession;
+import org.apache.james.mailbox.MessageManager;
+import org.apache.james.mailbox.exception.MailboxException;
+import org.apache.james.mailbox.model.FetchGroup;
+import org.apache.james.mailbox.model.MailboxId;
+import org.apache.james.mailbox.model.MailboxMetaData;
+import org.apache.james.mailbox.model.MessageId;
+import org.apache.james.mailbox.model.MessageRange;
+import org.apache.james.mailbox.model.MessageResult;
+import org.apache.james.mailbox.model.search.MailboxQuery;
+import org.apache.james.mime4j.dom.Message;
+import org.apache.james.mime4j.stream.MimeConfig;
+import org.apache.james.task.Task;
+import org.apache.james.task.Task.Result;
+import org.apache.james.user.api.UsersRepository;
+import org.apache.james.user.api.UsersRepositoryException;
+import org.apache.james.util.ReactorUtils;
+import org.apache.james.util.streams.Iterators;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.github.fge.lambdas.Throwing;
+
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
+public class EmailQueryViewPopulator {
+ private static final Logger LOGGER = LoggerFactory.getLogger(EmailQueryViewPopulator.class);
+ private static final Duration PERIOD = Duration.ofSeconds(1);
+ public static final int USER_CONCURRENCY = 1;
+ public static final int MAILBOX_CONCURRENCY = 1;
+
+ static class Progress {
+ private final AtomicLong processedUserCount;
+ private final AtomicLong processedMessageCount;
+ private final AtomicLong failedUserCount;
+ private final AtomicLong failedMessageCount;
+
+ Progress() {
+ failedUserCount = new AtomicLong();
+ processedMessageCount = new AtomicLong();
+ processedUserCount = new AtomicLong();
+ failedMessageCount = new AtomicLong();
+ }
+
+ private void incrementProcessedUserCount() {
+ processedUserCount.incrementAndGet();
+ }
+
+ private void incrementProcessedMessageCount() {
+ processedMessageCount.incrementAndGet();
+ }
+
+ private void incrementFailedUserCount() {
+ failedUserCount.incrementAndGet();
+ }
+
+ private void incrementFailedMessageCount() {
+ failedMessageCount.incrementAndGet();
+ }
+
+ long getProcessedUserCount() {
+ return processedUserCount.get();
+ }
+
+ long getProcessedMessageCount() {
+ return processedMessageCount.get();
+ }
+
+ long getFailedUserCount() {
+ return failedUserCount.get();
+ }
+
+ long getFailedMessageCount() {
+ return failedMessageCount.get();
+ }
+ }
+
+ private final UsersRepository usersRepository;
+ private final MailboxManager mailboxManager;
+ private final EmailQueryView emailQueryView;
+
+ @Inject
+ EmailQueryViewPopulator(UsersRepository usersRepository,
+ MailboxManager mailboxManager,
+ EmailQueryView emailQueryView) {
+ this.usersRepository = usersRepository;
+ this.mailboxManager = mailboxManager;
+ this.emailQueryView = emailQueryView;
+ }
+
+ Mono<Result> populateView(Progress progress, RunningOptions runningOptions) {
+ return correctProjection(listAllMailboxMessages(progress), runningOptions, progress);
+ }
+
+ private Flux<MessageResult> listAllMailboxMessages(Progress progress) {
+ try {
+ return Iterators.toFlux(usersRepository.list())
+ .map(mailboxManager::createSystemSession)
+ .doOnNext(any -> progress.incrementProcessedUserCount())
+ .flatMap(session -> listUserMailboxMessages(progress, session), USER_CONCURRENCY);
+ } catch (UsersRepositoryException e) {
+ return Flux.error(e);
+ }
+ }
+
+ private Flux<MessageResult> listUserMailboxMessages(Progress progress, MailboxSession session) {
+ return listUsersMailboxes(session)
+ .flatMap(mailboxMetadata -> retrieveMailbox(session, mailboxMetadata), MAILBOX_CONCURRENCY)
+ .flatMap(Throwing.function(messageManager -> listAllMessages(messageManager, session)), MAILBOX_CONCURRENCY)
+ .onErrorResume(MailboxException.class, e -> {
+ LOGGER.error("JMAP emailQuery view re-computation aborted for {} as we failed listing user mailboxes", session.getUser(), e);
+ progress.incrementFailedUserCount();
+ return Flux.empty();
+ });
+ }
+
+ private Mono<Result> correctProjection(MessageResult messageResult, Progress progress) {
+ return Mono.fromCallable(() -> {
+ MailboxId mailboxId = messageResult.getMailboxId();
+ MessageId messageId = messageResult.getMessageId();
+ ZonedDateTime receivedAt = ZonedDateTime.ofInstant(messageResult.getInternalDate().toInstant(), ZoneOffset.UTC);
+ Message mime4JMessage = parseMessage(messageResult);
+ Date sentAtDate = Optional.ofNullable(mime4JMessage.getDate()).orElse(messageResult.getInternalDate());
+ ZonedDateTime sentAt = ZonedDateTime.ofInstant(sentAtDate.toInstant(), ZoneOffset.UTC);
+
+ return new EmailQueryView.Entry(mailboxId, messageId, sentAt, receivedAt);
+ })
+ .flatMap(entry -> emailQueryView.save(entry.getMailboxId(), entry.getSentAt(), entry.getReceivedAt(), entry.getMessageId()))
+ .thenReturn(Result.COMPLETED)
+ .doOnSuccess(any -> progress.incrementProcessedMessageCount())
+ .onErrorResume(e -> {
+ LOGGER.error("JMAP emailQuery view re-computation aborted for {} - {} - {}",
+ messageResult.getMailboxId(),
+ messageResult.getMessageId(),
+ messageResult.getUid(), e);
+ progress.incrementFailedMessageCount();
+ return Mono.just(Result.PARTIAL);
+ });
+ }
+
+ private Mono<Result> correctProjection(Flux<MessageResult> entries, RunningOptions runningOptions, Progress progress) {
+ return entries.transform(ReactorUtils.<MessageResult, Result>throttle()
+ .elements(runningOptions.getMessagesPerSecond())
+ .per(PERIOD)
+ .forOperation(entry -> correctProjection(entry, progress)))
+ .reduce(Task::combine)
+ .switchIfEmpty(Mono.just(Result.COMPLETED));
+ }
+
+ private Flux<MailboxMetaData> listUsersMailboxes(MailboxSession session) {
+ return mailboxManager.search(MailboxQuery.privateMailboxesBuilder(session).build(), Minimal, session);
+ }
+
+ private Mono<MessageManager> retrieveMailbox(MailboxSession session, MailboxMetaData mailboxMetadata) {
+ return Mono.fromCallable(() -> mailboxManager.getMailbox(mailboxMetadata.getId(), session));
+ }
+
+ private Flux<MessageResult> listAllMessages(MessageManager messageManager, MailboxSession session) {
+ try {
+ return Iterators.toFlux(messageManager.getMessages(MessageRange.all(), FetchGroup.HEADERS, session));
+ } catch (MailboxException e) {
+ return Flux.error(e);
+ }
+ }
+
+ private Message parseMessage(MessageResult messageResult) throws IOException, MailboxException {
+ return Message.Builder
+ .of()
+ .use(MimeConfig.PERMISSIVE)
+ .parse(messageResult.getFullContent().getInputStream())
+ .build();
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org