You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/05/13 04:43:16 UTC

[james-project] branch master updated (67682418b7 -> 14fb2f6d7c)

This is an automated email from the ASF dual-hosted git repository.

btellier pushed a change to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git


    from 67682418b7 [FIX] Filter non error logs as done previously
     new 643cd023a6 JAMES-3719 Reactive textual content extraction with Apache Tika
     new c9d4adf901 JAMES-3719 Avoid copying mime part content when we are not to extract its content
     new 14fb2f6d7c JAMES-3719 Reading inputstream might be blocking

The 3 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 .../james/mailbox/extractor/TextExtractor.java     |  11 ++
 .../ElasticSearchListeningMessageSearchIndex.java  |  25 ++--
 .../elasticsearch/v7/json/IndexableMessage.java    | 138 +++++++++++----------
 .../v7/json/MessageToElasticSearchJson.java        |  33 ++---
 .../mailbox/elasticsearch/v7/json/MimePart.java    | 128 ++++++++++++-------
 .../v7/json/MimePartContainerBuilder.java          |   7 +-
 .../elasticsearch/v7/json/MimePartParser.java      |   8 +-
 .../v7/json/RootMimePartContainerBuilder.java      |  11 +-
 .../v7/json/IndexableMessageTest.java              |  41 +++---
 .../v7/json/MessageToElasticSearchJsonTest.java    |  28 ++---
 .../elasticsearch/v7/json/MimePartTest.java        |   9 +-
 .../store/extractor/DefaultTextExtractor.java      |  20 ++-
 .../store/extractor/JsoupTextExtractor.java        |  28 +++++
 mailbox/tika/pom.xml                               |   8 ++
 .../james/mailbox/tika/CachingTextExtractor.java   |  76 ++++++------
 .../tika/ContentTypeFilteringTextExtractor.java    |  15 +++
 .../apache/james/mailbox/tika/TikaHttpClient.java  |   5 +-
 .../james/mailbox/tika/TikaHttpClientImpl.java     |  57 ++++++---
 .../james/mailbox/tika/TikaTextExtractor.java      |  28 +++--
 .../mailbox/tika/CachingTextExtractorTest.java     |  80 +++---------
 .../james/mailbox/tika/TikaTextExtractorTest.java  |   9 +-
 pom.xml                                            |   5 +
 22 files changed, 446 insertions(+), 324 deletions(-)


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 01/03: JAMES-3719 Reactive textual content extraction with Apache Tika

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 643cd023a6bbe57094fb03ed55a76f44f1d050bc
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Mar 2 14:31:25 2022 +0700

    JAMES-3719 Reactive textual content extraction with Apache Tika
    
    Tika was called from reactive code and was doing blocking HTTP calls from within
    the MIME parsing code.
    
    This generate:
     - An unneeded thread consumption as we have some threads waiting for Tika
       response
     - Potentially dangerous blocking calls: for instance the InVM event bus was
      doing such calls on the parallel thread pool (where it is critical NOT to
      block)...
     - Also the connection was opened on a per-call basis, not being reused.
    
     We introduce the following changes:
      - Reactification of the TextExtractor API
      - We re-implement the HTTP calls done by TikaTextExtractor with reactor-netty
      which allows us to pool HTTP connections and do this in a non-blocking
      reactive fashion.
      - We provide a reactive cache using the caffeine caching library - Guava
      caches are blocking thus not an option...
      - We uncouple the text extraction from the MIME parsing phase by introducing
      an intermediate POJO. Doing so requires us to do a post-parsing copy of
      content.
    
     TODO only do the copy if necessary. We don't want to copy large attachments for whom no text is going to be extracted...
    
      - Finally we reactify index content generation for ElasticSearch code.
---
 .../james/mailbox/extractor/TextExtractor.java     |   6 +
 .../ElasticSearchListeningMessageSearchIndex.java  |  25 ++--
 .../elasticsearch/v7/json/IndexableMessage.java    | 138 +++++++++++----------
 .../v7/json/MessageToElasticSearchJson.java        |  33 ++---
 .../mailbox/elasticsearch/v7/json/MimePart.java    |  82 ++++++++----
 .../v7/json/MimePartContainerBuilder.java          |   4 +-
 .../elasticsearch/v7/json/MimePartParser.java      |   6 +-
 .../v7/json/RootMimePartContainerBuilder.java      |   6 +-
 .../v7/json/IndexableMessageTest.java              |  40 +++---
 .../v7/json/MessageToElasticSearchJsonTest.java    |  28 ++---
 .../elasticsearch/v7/json/MimePartTest.java        |   5 +-
 mailbox/tika/pom.xml                               |   8 ++
 .../james/mailbox/tika/CachingTextExtractor.java   |  76 ++++++------
 .../tika/ContentTypeFilteringTextExtractor.java    |  10 ++
 .../apache/james/mailbox/tika/TikaHttpClient.java  |   5 +-
 .../james/mailbox/tika/TikaHttpClientImpl.java     |  57 ++++++---
 .../james/mailbox/tika/TikaTextExtractor.java      |  28 +++--
 .../mailbox/tika/CachingTextExtractorTest.java     |  80 +++---------
 .../james/mailbox/tika/TikaTextExtractorTest.java  |   9 +-
 pom.xml                                            |   5 +
 20 files changed, 363 insertions(+), 288 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
index 6b911225b6..2fdf7e555e 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
@@ -23,8 +23,14 @@ import java.io.InputStream;
 
 import org.apache.james.mailbox.model.ContentType;
 
+import reactor.core.publisher.Mono;
+
 public interface TextExtractor {
 
     ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception;
 
+    default Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) {
+        return Mono.fromCallable(() -> extractContent(inputStream, contentType));
+    }
+
 }
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java
index 591a5ddcb5..29ef16119b 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/events/ElasticSearchListeningMessageSearchIndex.java
@@ -162,23 +162,22 @@ public class ElasticSearchListeningMessageSearchIndex extends ListeningMessageSe
         RoutingKey from = routingKeyFactory.from(mailbox.getMailboxId());
         DocumentId id = indexIdFor(mailbox.getMailboxId(), message.getUid());
 
-        return Mono.fromCallable(() -> generateIndexedJson(mailbox, message, session))
+        return generateIndexedJson(mailbox, message, session)
             .flatMap(jsonContent -> elasticSearchIndexer.index(id, jsonContent, from))
             .then();
     }
 
-    private String generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) throws JsonProcessingException {
-        try {
-            return messageToElasticSearchJson.convertToJson(message);
-        } catch (Exception e) {
-            LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} without attachments ",
-                mailbox.getName(),
-                mailbox.getMailboxId().serialize(),
-                session.getUser().asString(),
-                message.getUid(),
-                e);
-            return messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
-        }
+    private Mono<String> generateIndexedJson(Mailbox mailbox, MailboxMessage message, MailboxSession session) {
+        return messageToElasticSearchJson.convertToJson(message)
+            .onErrorResume(e -> {
+                LOGGER.warn("Indexing mailbox {}-{} of user {} on message {} without attachments ",
+                    mailbox.getName(),
+                    mailbox.getMailboxId().serialize(),
+                    session.getUser().asString(),
+                    message.getUid(),
+                    e);
+                return messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
+            });
     }
 
     @Override
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java
index 072513fbaa..d224b43d56 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessage.java
@@ -39,6 +39,8 @@ import com.fasterxml.jackson.annotation.JsonProperty;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 
+import reactor.core.publisher.Mono;
+
 public class IndexableMessage {
 
     private static final DateTimeFormatter DATE_TIME_FORMATTER = DateTimeFormatter.ofPattern("yyyy-MM-dd'T'HH:mm:ssZ");
@@ -62,7 +64,7 @@ public class IndexableMessage {
         private Builder() {
         }
 
-        public IndexableMessage build() {
+        public Mono<IndexableMessage> build() {
             Preconditions.checkNotNull(message.getMailboxId());
             Preconditions.checkNotNull(textExtractor);
             Preconditions.checkNotNull(indexAttachments);
@@ -95,73 +97,77 @@ public class IndexableMessage {
             return this;
         }
 
-        private IndexableMessage instantiateIndexedMessage() throws IOException, MimeException {
+        private Mono<IndexableMessage> instantiateIndexedMessage() throws IOException, MimeException {
             String messageId = SearchUtil.getSerializedMessageIdIfSupportedByUnderlyingStorageOrNull(message);
             String threadId = SearchUtil.getSerializedThreadIdIfSupportedByUnderlyingStorageOrNull(message);
-            MimePart parsingResult = new MimePartParser(message, textExtractor).parse();
-
-            Optional<String> bodyText = parsingResult.locateFirstTextBody();
-            Optional<String> bodyHtml = parsingResult.locateFirstHtmlBody();
-
-            boolean hasAttachment = MessageAttachmentMetadata.hasNonInlinedAttachment(message.getAttachments());
-            List<MimePart> attachments = setFlattenedAttachments(parsingResult, indexAttachments);
-
-            HeaderCollection headerCollection = parsingResult.getHeaderCollection();
-            ZonedDateTime internalDate = getSanitizedInternalDate(message, zoneId);
-
-            List<HeaderCollection.Header> headers = headerCollection.getHeaders();
-            Subjects subjects = Subjects.from(headerCollection.getSubjectSet());
-            EMailers from = EMailers.from(headerCollection.getFromAddressSet());
-            EMailers to = EMailers.from(headerCollection.getToAddressSet());
-            EMailers cc = EMailers.from(headerCollection.getCcAddressSet());
-            EMailers bcc = EMailers.from(headerCollection.getBccAddressSet());
-            String sentDate = DATE_TIME_FORMATTER.format(headerCollection.getSentDate().orElse(internalDate));
-            Optional<String> mimeMessageID = headerCollection.getMessageID();
-
-            long uid = message.getUid().asLong();
-            String mailboxId = message.getMailboxId().serialize();
-            ModSeq modSeq = message.getModSeq();
-            long size = message.getFullContentOctets();
-            String date = DATE_TIME_FORMATTER.format(getSanitizedInternalDate(message, zoneId));
-            String mediaType = message.getMediaType();
-            String subType = message.getSubType();
-            boolean isAnswered = message.isAnswered();
-            boolean isDeleted = message.isDeleted();
-            boolean isDraft = message.isDraft();
-            boolean isFlagged = message.isFlagged();
-            boolean isRecent = message.isRecent();
-            boolean isUnRead = !message.isSeen();
-            String[] userFlags = message.createFlags().getUserFlags();
-
-            return new IndexableMessage(
-                    attachments,
-                    bcc,
-                    bodyHtml,
-                    bodyText,
-                    cc,
-                    date,
-                    from,
-                    hasAttachment,
-                    headers,
-                    isAnswered,
-                    isDeleted,
-                    isDraft,
-                    isFlagged,
-                    isRecent,
-                    isUnRead,
-                    mailboxId,
-                    mediaType,
-                    messageId,
-                    threadId,
-                    modSeq,
-                    sentDate,
-                    size,
-                    subjects,
-                    subType,
-                    to,
-                    uid,
-                    userFlags,
-                    mimeMessageID);
+
+            return new MimePartParser(message, textExtractor).parse()
+                .asMimePart(textExtractor)
+                .map(parsingResult -> {
+
+                    Optional<String> bodyText = parsingResult.locateFirstTextBody();
+                    Optional<String> bodyHtml = parsingResult.locateFirstHtmlBody();
+
+                    boolean hasAttachment = MessageAttachmentMetadata.hasNonInlinedAttachment(message.getAttachments());
+                    List<MimePart> attachments = setFlattenedAttachments(parsingResult, indexAttachments);
+
+                    HeaderCollection headerCollection = parsingResult.getHeaderCollection();
+                    ZonedDateTime internalDate = getSanitizedInternalDate(message, zoneId);
+
+                    List<HeaderCollection.Header> headers = headerCollection.getHeaders();
+                    Subjects subjects = Subjects.from(headerCollection.getSubjectSet());
+                    EMailers from = EMailers.from(headerCollection.getFromAddressSet());
+                    EMailers to = EMailers.from(headerCollection.getToAddressSet());
+                    EMailers cc = EMailers.from(headerCollection.getCcAddressSet());
+                    EMailers bcc = EMailers.from(headerCollection.getBccAddressSet());
+                    String sentDate = DATE_TIME_FORMATTER.format(headerCollection.getSentDate().orElse(internalDate));
+                    Optional<String> mimeMessageID = headerCollection.getMessageID();
+
+                    long uid = message.getUid().asLong();
+                    String mailboxId = message.getMailboxId().serialize();
+                    ModSeq modSeq = message.getModSeq();
+                    long size = message.getFullContentOctets();
+                    String date = DATE_TIME_FORMATTER.format(getSanitizedInternalDate(message, zoneId));
+                    String mediaType = message.getMediaType();
+                    String subType = message.getSubType();
+                    boolean isAnswered = message.isAnswered();
+                    boolean isDeleted = message.isDeleted();
+                    boolean isDraft = message.isDraft();
+                    boolean isFlagged = message.isFlagged();
+                    boolean isRecent = message.isRecent();
+                    boolean isUnRead = !message.isSeen();
+                    String[] userFlags = message.createFlags().getUserFlags();
+
+                    return new IndexableMessage(
+                        attachments,
+                        bcc,
+                        bodyHtml,
+                        bodyText,
+                        cc,
+                        date,
+                        from,
+                        hasAttachment,
+                        headers,
+                        isAnswered,
+                        isDeleted,
+                        isDraft,
+                        isFlagged,
+                        isRecent,
+                        isUnRead,
+                        mailboxId,
+                        mediaType,
+                        messageId,
+                        threadId,
+                        modSeq,
+                        sentDate,
+                        size,
+                        subjects,
+                        subType,
+                        to,
+                        uid,
+                        userFlags,
+                        mimeMessageID);
+                });
         }
 
         private List<MimePart> setFlattenedAttachments(MimePart parsingResult, IndexAttachments indexAttachments) {
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java
index 6a8168c34c..564d750083 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJson.java
@@ -33,8 +33,11 @@ import com.fasterxml.jackson.core.JsonProcessingException;
 import com.fasterxml.jackson.databind.ObjectMapper;
 import com.fasterxml.jackson.datatype.guava.GuavaModule;
 import com.fasterxml.jackson.datatype.jdk8.Jdk8Module;
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 
+import reactor.core.publisher.Mono;
+
 public class MessageToElasticSearchJson {
 
     private final ObjectMapper mapper;
@@ -56,24 +59,26 @@ public class MessageToElasticSearchJson {
         this(textExtractor, ZoneId.systemDefault(), indexAttachments);
     }
 
-    public String convertToJson(MailboxMessage message) throws JsonProcessingException {
+    public Mono<String> convertToJson(MailboxMessage message) {
         Preconditions.checkNotNull(message);
 
-        return mapper.writeValueAsString(IndexableMessage.builder()
-                .message(message)
-                .extractor(textExtractor)
-                .zoneId(zoneId)
-                .indexAttachments(indexAttachments)
-                .build());
+        return IndexableMessage.builder()
+            .message(message)
+            .extractor(textExtractor)
+            .zoneId(zoneId)
+            .indexAttachments(indexAttachments)
+            .build()
+            .map(Throwing.function(mapper::writeValueAsString));
     }
 
-    public String convertToJsonWithoutAttachment(MailboxMessage message) throws JsonProcessingException {
-        return mapper.writeValueAsString(IndexableMessage.builder()
-                .message(message)
-                .extractor(textExtractor)
-                .zoneId(zoneId)
-                .indexAttachments(IndexAttachments.NO)
-                .build());
+    public Mono<String> convertToJsonWithoutAttachment(MailboxMessage message) {
+        return IndexableMessage.builder()
+            .message(message)
+            .extractor(textExtractor)
+            .zoneId(zoneId)
+            .indexAttachments(IndexAttachments.NO)
+            .build()
+            .map(Throwing.function(mapper::writeValueAsString));
     }
 
     public String getUpdatedJsonMessagePart(Flags flags, ModSeq modSeq) throws JsonProcessingException {
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
index 722bd75756..918306fddf 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
@@ -19,6 +19,7 @@
 
 package org.apache.james.mailbox.elasticsearch.v7.json;
 
+import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
@@ -40,17 +41,21 @@ import org.slf4j.LoggerFactory;
 
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
+import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
+import reactor.core.publisher.Flux;
+import reactor.core.publisher.Mono;
+
 public class MimePart {
 
     public static class Builder implements MimePartContainerBuilder {
 
         private final HeaderCollection.Builder headerCollectionBuilder;
         private Optional<InputStream> bodyContent;
-        private final List<MimePart> children;
+        private final List<ParsedMimePart> children;
         private Optional<MediaType> mediaType;
         private Optional<SubType> subType;
         private Optional<String> fileName;
@@ -85,7 +90,7 @@ public class MimePart {
         }
 
         @Override
-        public Builder addChild(MimePart mimePart) {
+        public Builder addChild(ParsedMimePart mimePart) {
             children.add(mimePart);
             return this;
         }
@@ -129,11 +134,11 @@ public class MimePart {
         }
 
         @Override
-        public MimePart build() {
-            Optional<ParsedContent> parsedContent = parseContent(textExtractor);
-            return new MimePart(
+        public ParsedMimePart build() {
+            return new ParsedMimePart(
                 headerCollectionBuilder.build(),
-                parsedContent.flatMap(ParsedContent::getTextualContent),
+                bodyContent,
+                charset,
                 mediaType,
                 subType,
                 fileName,
@@ -141,27 +146,61 @@ public class MimePart {
                 contentDisposition,
                 children);
         }
+    }
 
-        private Optional<ParsedContent> parseContent(TextExtractor textExtractor) {
-            if (bodyContent.isPresent()) {
-                try {
-                    return Optional.of(extractText(textExtractor, bodyContent.get()));
-                } catch (Exception e) {
-                    LOGGER.warn("Failed parsing attachment", e);
-                }
-            }
-            return Optional.empty();
+    public static class ParsedMimePart {
+        private final HeaderCollection headerCollection;
+        private final Optional<byte[]> bodyContent;
+        private final Optional<Charset> charset;
+        private final Optional<MediaType> mediaType;
+        private final Optional<SubType> subType;
+        private final Optional<String> fileName;
+        private final Optional<String> fileExtension;
+        private final Optional<String> contentDisposition;
+        private final List<ParsedMimePart> attachments;
+
+        public ParsedMimePart(HeaderCollection headerCollection, Optional<InputStream> bodyContent, Optional<Charset> charset,
+                              Optional<MediaType> mediaType,
+                              Optional<SubType> subType, Optional<String> fileName, Optional<String> fileExtension,
+                              Optional<String> contentDisposition, List<ParsedMimePart> attachments) {
+            this.headerCollection = headerCollection;
+            this.bodyContent = bodyContent.map(Throwing.function(IOUtils::toByteArray));
+            this.mediaType = mediaType;
+            this.subType = subType;
+            this.fileName = fileName;
+            this.fileExtension = fileExtension;
+            this.contentDisposition = contentDisposition;
+            this.attachments = attachments;
+            this.charset = charset;
         }
 
-        private ParsedContent extractText(TextExtractor textExtractor, InputStream bodyContent) throws Exception {
+        public Mono<MimePart> asMimePart(TextExtractor textExtractor) {
+            return Flux.fromIterable(attachments)
+                .concatMap(attachment -> attachment.asMimePart(textExtractor))
+                .collectList()
+                .flatMap(attachments -> extractText(textExtractor)
+                    .map(Optional::ofNullable)
+                    .switchIfEmpty(Mono.just(Optional.empty()))
+                    .onErrorResume(e -> {
+                        LOGGER.warn("Failure extracting text message for some attachments", e);
+                        return Mono.just(Optional.empty());
+                    })
+                    .map(text -> new MimePart(headerCollection, text.flatMap(ParsedContent::getTextualContent),
+                        mediaType, subType, fileName, fileExtension, contentDisposition, attachments)));
+        }
+
+        private Mono<ParsedContent> extractText(TextExtractor textExtractor) {
+            if (bodyContent.isEmpty()) {
+                return Mono.empty();
+            }
             if (shouldPerformTextExtraction()) {
-                return textExtractor.extractContent(
-                    bodyContent,
+                return textExtractor.extractContentReactive(
+                    new ByteArrayInputStream(bodyContent.get()),
                     computeContentType().orElse(null));
             }
-            return new ParsedContent(
-                Optional.ofNullable(IOUtils.toString(bodyContent, charset.orElse(StandardCharsets.UTF_8))),
-                ImmutableMap.of());
+            return Mono.fromCallable(() -> new ParsedContent(
+                Optional.ofNullable(IOUtils.toString(new ByteArrayInputStream(bodyContent.get()), charset.orElse(StandardCharsets.UTF_8))),
+                ImmutableMap.of()));
         }
 
         private boolean shouldPerformTextExtraction() {
@@ -185,7 +224,6 @@ public class MimePart {
                 return Optional.empty();
             }
         }
-
     }
     
     public static Builder builder() {
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
index b5083f5377..d3d88c4f67 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
@@ -29,7 +29,7 @@ import org.apache.james.mime4j.stream.Field;
 
 public interface MimePartContainerBuilder {
 
-    MimePart build();
+    MimePart.ParsedMimePart build();
 
     MimePartContainerBuilder using(TextExtractor textExtractor);
 
@@ -37,7 +37,7 @@ public interface MimePartContainerBuilder {
 
     MimePartContainerBuilder addBodyContent(InputStream bodyContent);
 
-    MimePartContainerBuilder addChild(MimePart mimePart);
+    MimePartContainerBuilder addChild(MimePart.ParsedMimePart mimePart);
 
     MimePartContainerBuilder addFileName(String fileName);
 
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
index f5091f3201..81a1a15e8a 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
@@ -50,7 +50,7 @@ public class MimePartParser {
     private final TextExtractor textExtractor;
     private final MimeTokenStream stream;
     private final Deque<MimePartContainerBuilder> builderStack;
-    private MimePart result;
+    private MimePart.ParsedMimePart result;
     private MimePartContainerBuilder currentlyBuildMimePart;
 
     public MimePartParser(Message message, TextExtractor textExtractor) {
@@ -63,7 +63,7 @@ public class MimePartParser {
             new DefaultBodyDescriptorBuilder(null, FIELD_PARSER, DecodeMonitor.SILENT));
     }
 
-    public MimePart parse() throws IOException, MimeException {
+    public MimePart.ParsedMimePart parse() throws IOException, MimeException {
         stream.parse(message.getFullContent());
         for (EntityState state = stream.getState(); state != EntityState.T_END_OF_STREAM; state = stream.next()) {
             processMimePart(stream, state);
@@ -107,7 +107,7 @@ public class MimePartParser {
     }
     
     private void closeMimePart() {
-        MimePart bodyMimePart = currentlyBuildMimePart.using(textExtractor).build();
+        MimePart.ParsedMimePart bodyMimePart = currentlyBuildMimePart.using(textExtractor).build();
         if (!builderStack.isEmpty()) {
             builderStack.peek().addChild(bodyMimePart);
         } else {
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
index 48fadf28f5..93755a6790 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
@@ -33,10 +33,10 @@ public class RootMimePartContainerBuilder implements MimePartContainerBuilder {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(RootMimePartContainerBuilder.class);
 
-    private MimePart rootMimePart;
+    private MimePart.ParsedMimePart rootMimePart;
 
     @Override
-    public MimePart build() {
+    public MimePart.ParsedMimePart build() {
         return rootMimePart;
     }
 
@@ -57,7 +57,7 @@ public class RootMimePartContainerBuilder implements MimePartContainerBuilder {
     }
 
     @Override
-    public MimePartContainerBuilder addChild(MimePart mimePart) {
+    public MimePartContainerBuilder addChild(MimePart.ParsedMimePart mimePart) {
         if (rootMimePart == null) {
             rootMimePart = mimePart;
         } else {
diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
index d36b9691c7..62e4d85871 100644
--- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
+++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
@@ -56,6 +56,8 @@ import org.junit.jupiter.api.extension.RegisterExtension;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import reactor.core.publisher.Mono;
+
 class IndexableMessageTest {
     static final MessageUid MESSAGE_UID = MessageUid.of(154);
 
@@ -108,7 +110,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getHasAttachment()).isTrue();
@@ -140,7 +143,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.NO)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getHasAttachment()).isFalse();
@@ -170,7 +174,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.NO)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getAttachments()).isEmpty();
@@ -200,7 +205,8 @@ class IndexableMessageTest {
                 .extractor(new DefaultTextExtractor())
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getAttachments()).isNotEmpty();
@@ -226,10 +232,10 @@ class IndexableMessageTest {
             .thenReturn(MESSAGE_UID);
 
         TextExtractor textExtractor = mock(TextExtractor.class);
-        when(textExtractor.extractContent(any(), any()))
-            .thenReturn(new ParsedContent(Optional.of("first attachment content"), ImmutableMap.of()))
-            .thenThrow(new RuntimeException("second cannot be parsed"))
-            .thenReturn(new ParsedContent(Optional.of("third attachment content"), ImmutableMap.of()));
+        when(textExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.just(new ParsedContent(Optional.of("first attachment content"), ImmutableMap.of())))
+            .thenReturn(Mono.error(new RuntimeException("second cannot be parsed")))
+            .thenReturn(Mono.just(new ParsedContent(Optional.of("third attachment content"), ImmutableMap.of())));
 
         // When
         IndexableMessage indexableMessage = IndexableMessage.builder()
@@ -237,7 +243,8 @@ class IndexableMessageTest {
                 .extractor(textExtractor)
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         String NO_TEXTUAL_BODY = "The textual body is not present";
@@ -273,7 +280,8 @@ class IndexableMessageTest {
                 .extractor(textExtractor)
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getMessageId()).isNull();
@@ -306,7 +314,8 @@ class IndexableMessageTest {
             .extractor(textExtractor)
             .zoneId(ZoneId.of("Europe/Paris"))
             .indexAttachments(IndexAttachments.YES)
-            .build();
+            .build()
+            .block();
 
         // Then
         assertThat(indexableMessage.getThreadId()).isNull();
@@ -336,7 +345,8 @@ class IndexableMessageTest {
                 .extractor(textExtractor)
                 .zoneId(ZoneId.of("Europe/Paris"))
                 .indexAttachments(IndexAttachments.YES)
-                .build();
+                .build()
+                .block();
 
         // Then
         assertThat(indexableMessage.getMessageId()).isNull();
@@ -368,7 +378,8 @@ class IndexableMessageTest {
             .extractor(textExtractor)
             .zoneId(ZoneId.of("Europe/Paris"))
             .indexAttachments(IndexAttachments.YES)
-            .build();
+            .build()
+            .block();
 
         // Then
         assertThat(indexableMessage.getThreadId()).isNull();
@@ -401,7 +412,8 @@ class IndexableMessageTest {
             .extractor(new DefaultTextExtractor())
             .zoneId(ZoneId.of("Europe/Paris"))
             .indexAttachments(IndexAttachments.NO)
-            .build();
+            .build()
+            .block();
 
         // Then
         assertThat(indexableMessage.getThreadId()).isEqualTo("42");
diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java
index e538721925..e9ac8a028e 100644
--- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java
+++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MessageToElasticSearchJsonTest.java
@@ -107,7 +107,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         spamMail.setUid(UID);
         spamMail.setModSeq(MOD_SEQ);
-        assertThatJson(messageToElasticSearchJson.convertToJson(spamMail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(spamMail).block())
             .when(IGNORING_ARRAY_ORDER)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/spamMail.json"));
     }
@@ -129,7 +129,7 @@ class MessageToElasticSearchJsonTest {
         mail.setUid(UID);
         mail.setModSeq(MOD_SEQ);
 
-        assertThatJson(messageToElasticSearchJson.convertToJson(mail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(mail).block())
             .when(IGNORING_ARRAY_ORDER)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/inlined-mixed.json"));
     }
@@ -151,7 +151,7 @@ class MessageToElasticSearchJsonTest {
         spamMail.setUid(UID);
         spamMail.setModSeq(MOD_SEQ);
 
-        String actual = messageToElasticSearchJson.convertToJson(spamMail);
+        String actual = messageToElasticSearchJson.convertToJson(spamMail).block();
         assertThatJson(actual)
             .when(IGNORING_ARRAY_ORDER)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/invalidCharset.json"));
@@ -173,7 +173,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         htmlMail.setModSeq(MOD_SEQ);
         htmlMail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(htmlMail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(htmlMail).block())
             .when(IGNORING_ARRAY_ORDER)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/htmlMail.json"));
     }
@@ -194,7 +194,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         pgpSignedMail.setModSeq(MOD_SEQ);
         pgpSignedMail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(pgpSignedMail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(pgpSignedMail).block())
             .when(IGNORING_ARRAY_ORDER)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/pgpSignedMail.json"));
     }
@@ -215,7 +215,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         mail.setModSeq(MOD_SEQ);
         mail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(mail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(mail).block())
             .when(IGNORING_ARRAY_ORDER).when(IGNORING_VALUES)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/mail.json"));
     }
@@ -236,7 +236,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         recursiveMail.setModSeq(MOD_SEQ);
         recursiveMail.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(recursiveMail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(recursiveMail).block())
             .when(IGNORING_ARRAY_ORDER).when(IGNORING_VALUES)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/recursiveMail.json"));
     }
@@ -257,7 +257,7 @@ class MessageToElasticSearchJsonTest {
                 MAILBOX_ID);
         mailWithNoInternalDate.setModSeq(MOD_SEQ);
         mailWithNoInternalDate.setUid(UID);
-        assertThatJson(messageToElasticSearchJson.convertToJson(mailWithNoInternalDate))
+        assertThatJson(messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block())
             .when(IGNORING_ARRAY_ORDER)
             .when(IGNORING_VALUES)
             .isEqualTo(ClassLoaderUtils.getSystemResourceAsString("eml/recursiveMail.json"));
@@ -283,7 +283,7 @@ class MessageToElasticSearchJsonTest {
             new DefaultTextExtractor(),
             ZoneId.of("Europe/Paris"),
             IndexAttachments.YES);
-        String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate);
+        String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block();
 
         // Then
         assertThatJson(convertToJson)
@@ -312,7 +312,7 @@ class MessageToElasticSearchJsonTest {
             new DefaultTextExtractor(),
             ZoneId.of("Europe/Paris"),
             IndexAttachments.NO);
-        String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate);
+        String convertToJson = messageToElasticSearchJson.convertToJson(mailWithNoInternalDate).block();
 
         // Then
         assertThatJson(convertToJson)
@@ -388,7 +388,7 @@ class MessageToElasticSearchJsonTest {
         spamMail.setUid(UID);
         spamMail.setModSeq(MOD_SEQ);
 
-        assertThatJson(messageToElasticSearchJson.convertToJson(spamMail))
+        assertThatJson(messageToElasticSearchJson.convertToJson(spamMail).block())
             .when(IGNORING_ARRAY_ORDER)
             .isEqualTo(
                 ClassLoaderUtils.getSystemResourceAsString("eml/nonTextual.json", StandardCharsets.UTF_8));
@@ -414,7 +414,7 @@ class MessageToElasticSearchJsonTest {
                 new DefaultTextExtractor(),
                 ZoneId.of("Europe/Paris"),
                 IndexAttachments.NO);
-        String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
+        String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message).block();
 
         // Then
         assertThatJson(convertToJsonWithoutAttachment)
@@ -443,9 +443,7 @@ class MessageToElasticSearchJsonTest {
                 new JsoupTextExtractor(),
                 ZoneId.of("Europe/Paris"),
                 IndexAttachments.NO);
-        String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message);
-
-        System.out.println(convertToJsonWithoutAttachment);
+        String convertToJsonWithoutAttachment = messageToElasticSearchJson.convertToJsonWithoutAttachment(message).block();
 
         // Then
         assertThatJson(convertToJsonWithoutAttachment)
diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
index 7a0112ed4f..f2afdd0793 100644
--- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
+++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
@@ -23,6 +23,7 @@ import static org.assertj.core.api.Assertions.assertThat;
 import java.io.ByteArrayInputStream;
 import java.nio.charset.StandardCharsets;
 
+import org.apache.james.mailbox.extractor.ParsedContent;
 import org.apache.james.mailbox.model.ContentType.MediaType;
 import org.apache.james.mailbox.model.ContentType.SubType;
 import org.junit.jupiter.api.Test;
@@ -45,7 +46,9 @@ class MimePartTest {
             .addBodyContent(new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)))
             .addMediaType(MediaType.of("text"))
             .addSubType(SubType.of("plain"))
-            .build();
+            .build()
+            .asMimePart((in, contentType) -> ParsedContent.empty())
+            .block();
 
         assertThat(mimePart.getTextualBody()).contains(body);
     }
diff --git a/mailbox/tika/pom.xml b/mailbox/tika/pom.xml
index 0abe712a88..567bd92787 100644
--- a/mailbox/tika/pom.xml
+++ b/mailbox/tika/pom.xml
@@ -63,10 +63,18 @@
             <groupId>com.fasterxml.jackson.core</groupId>
             <artifactId>jackson-databind</artifactId>
         </dependency>
+        <dependency>
+            <groupId>com.github.ben-manes.caffeine</groupId>
+            <artifactId>caffeine</artifactId>
+        </dependency>
         <dependency>
             <groupId>com.google.guava</groupId>
             <artifactId>guava</artifactId>
         </dependency>
+        <dependency>
+            <groupId>io.projectreactor.netty</groupId>
+            <artifactId>reactor-netty</artifactId>
+        </dependency>
         <dependency>
             <groupId>org.apache.commons</groupId>
             <artifactId>commons-lang3</artifactId>
diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java
index 30cd94f35b..562fa60cc8 100644
--- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java
+++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/CachingTextExtractor.java
@@ -20,11 +20,10 @@
 package org.apache.james.mailbox.tika;
 
 import java.io.ByteArrayInputStream;
+import java.io.IOException;
 import java.io.InputStream;
 import java.time.Duration;
 import java.util.Optional;
-import java.util.concurrent.ExecutionException;
-import java.util.concurrent.TimeUnit;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.mailbox.extractor.ParsedContent;
@@ -34,17 +33,18 @@ import org.apache.james.metrics.api.GaugeRegistry;
 import org.apache.james.metrics.api.Metric;
 import org.apache.james.metrics.api.MetricFactory;
 
+import com.github.benmanes.caffeine.cache.AsyncCache;
+import com.github.benmanes.caffeine.cache.Caffeine;
+import com.github.benmanes.caffeine.cache.RemovalListener;
+import com.github.benmanes.caffeine.cache.Weigher;
 import com.google.common.annotations.VisibleForTesting;
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.Weigher;
 import com.google.common.hash.Hashing;
-import com.google.common.util.concurrent.UncheckedExecutionException;
+
+import reactor.core.publisher.Mono;
 
 public class CachingTextExtractor implements TextExtractor {
     private final TextExtractor underlying;
-    private final Cache<String, ParsedContent> cache;
+    private final AsyncCache<String, ParsedContent> cache;
     private final Metric weightMetric;
 
     public CachingTextExtractor(TextExtractor underlying, Duration cacheEvictionPeriod, Long cacheWeightInBytes,
@@ -52,20 +52,22 @@ public class CachingTextExtractor implements TextExtractor {
         this.underlying = underlying;
         this.weightMetric = metricFactory.generate("textExtractor.cache.weight");
 
-        Weigher<String, ParsedContent> weigher =
-            (key, parsedContent) -> computeWeight(parsedContent);
         RemovalListener<String, ParsedContent> removalListener =
-            notification -> Optional.ofNullable(notification.getValue())
+            (key, value, removalCause) -> Optional.ofNullable(value)
                 .map(this::computeWeight)
                 .ifPresent(weightMetric::remove);
 
-        this.cache = CacheBuilder.newBuilder()
-            .expireAfterAccess(cacheEvictionPeriod.toMillis(), TimeUnit.MILLISECONDS)
+        Weigher<String, ParsedContent> weigher =
+            (key, parsedContent) -> computeWeight(parsedContent);
+
+        cache = Caffeine.newBuilder()
+            .expireAfterAccess(Duration.ofMillis(cacheEvictionPeriod.toMillis()))
             .maximumWeight(cacheWeightInBytes)
             .weigher(weigher)
+            .evictionListener(removalListener)
             .recordStats()
-            .removalListener(removalListener)
-            .build();
+            .buildAsync();
+
         recordStats(gaugeRegistry);
     }
 
@@ -73,28 +75,28 @@ public class CachingTextExtractor implements TextExtractor {
         gaugeRegistry
             .register(
                 "textExtractor.cache.hit.rate",
-                () -> cache.stats().hitRate())
+                () -> cache.synchronous().stats().hitRate())
             .register(
                 "textExtractor.cache.hit.count",
-                () -> cache.stats().hitCount());
+                () -> cache.synchronous().stats().hitCount());
             gaugeRegistry.register(
                 "textExtractor.cache.load.count",
-                () -> cache.stats().loadCount())
+                () -> cache.synchronous().stats().loadCount())
             .register(
                 "textExtractor.cache.eviction.count",
-                () -> cache.stats().evictionCount())
+                () -> cache.synchronous().stats().evictionCount())
             .register(
                 "textExtractor.cache.load.exception.rate",
-                () -> cache.stats().loadExceptionRate())
+                () -> cache.synchronous().stats().loadFailureRate())
             .register(
                 "textExtractor.cache.load.miss.rate",
-                () -> cache.stats().missRate())
+                () -> cache.synchronous().stats().missRate())
             .register(
                 "textExtractor.cache.load.miss.count",
-                () -> cache.stats().missCount())
+                () -> cache.synchronous().stats().missCount())
             .register(
                 "textExtractor.cache.size",
-                cache::size);
+                cache.synchronous()::estimatedSize);
     }
 
     private int computeWeight(ParsedContent parsedContent) {
@@ -109,21 +111,25 @@ public class CachingTextExtractor implements TextExtractor {
     }
 
     @Override
-    public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception {
-        byte[] bytes = IOUtils.toByteArray(inputStream);
-        String key = Hashing.sha256().hashBytes(bytes).toString();
-
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) {
         try {
-            return cache.get(key, () -> retrieveAndUpdateWeight(bytes, contentType));
-        } catch (ExecutionException | UncheckedExecutionException e) {
-            throw unwrap(e);
+            byte[] bytes = IOUtils.toByteArray(inputStream);
+            String key = Hashing.sha256().hashBytes(bytes).toString();
+
+            return Mono.fromFuture(cache.get(key, (a, b) -> retrieveAndUpdateWeight(bytes, contentType).toFuture()));
+        } catch (IOException e) {
+            throw new RuntimeException(e);
         }
     }
 
-    private ParsedContent retrieveAndUpdateWeight(byte[] bytes, ContentType contentType) throws Exception {
-        ParsedContent parsedContent = underlying.extractContent(new ByteArrayInputStream(bytes), contentType);
-        weightMetric.add(computeWeight(parsedContent));
-        return parsedContent;
+    @Override
+    public ParsedContent extractContent(InputStream inputStream, ContentType contentType) {
+        return extractContentReactive(inputStream, contentType).block();
+    }
+
+    private Mono<ParsedContent> retrieveAndUpdateWeight(byte[] bytes, ContentType contentType) {
+        return underlying.extractContentReactive(new ByteArrayInputStream(bytes), contentType)
+            .doOnNext(next -> weightMetric.add(computeWeight(next)));
     }
 
     private Exception unwrap(Exception e) {
@@ -135,6 +141,6 @@ public class CachingTextExtractor implements TextExtractor {
 
     @VisibleForTesting
     long size() {
-        return cache.size();
+        return cache.synchronous().estimatedSize();
     }
 }
diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
index d846b75268..63cf46ee3f 100644
--- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
+++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
@@ -28,6 +28,8 @@ import org.apache.james.mailbox.model.ContentType.MimeType;
 
 import com.google.common.collect.ImmutableSet;
 
+import reactor.core.publisher.Mono;
+
 public class ContentTypeFilteringTextExtractor implements TextExtractor {
 
     private final TextExtractor textExtractor;
@@ -46,6 +48,14 @@ public class ContentTypeFilteringTextExtractor implements TextExtractor {
         return textExtractor.extractContent(inputStream, contentType);
     }
 
+    @Override
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) {
+        if (isBlacklisted(contentType.mimeType())) {
+            return Mono.just(ParsedContent.empty());
+        }
+        return textExtractor.extractContentReactive(inputStream, contentType);
+    }
+
     private boolean isBlacklisted(MimeType contentType) {
         return contentTypeBlacklist.contains(contentType);
     }
diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java
index ceae8ffd33..434a109598 100644
--- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java
+++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClient.java
@@ -19,11 +19,12 @@
 package org.apache.james.mailbox.tika;
 
 import java.io.InputStream;
-import java.util.Optional;
 
 import org.apache.james.mailbox.model.ContentType;
 
+import reactor.core.publisher.Mono;
+
 public interface TikaHttpClient {
 
-    Optional<InputStream> recursiveMetaDataAsJson(InputStream inputStream, ContentType contentType);
+    Mono<InputStream> recursiveMetaDataAsJson(InputStream inputStream, ContentType contentType);
 }
diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java
index 5f00ca9efd..e7afed7fb7 100644
--- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java
+++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaHttpClientImpl.java
@@ -18,19 +18,24 @@
  ****************************************************************/
 package org.apache.james.mailbox.tika;
 
-import java.io.IOException;
 import java.io.InputStream;
 import java.net.URI;
 import java.net.URISyntaxException;
 import java.nio.charset.Charset;
-import java.util.Optional;
+import java.time.Duration;
 
-import org.apache.http.client.fluent.Request;
 import org.apache.http.client.utils.URIBuilder;
 import org.apache.http.entity.ContentType;
+import org.apache.james.util.ReactorUtils;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
+import io.netty.buffer.Unpooled;
+import io.netty.handler.codec.http.HttpHeaderNames;
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+import reactor.netty.http.client.HttpClient;
+
 public class TikaHttpClientImpl implements TikaHttpClient {
 
     private static final Logger LOGGER = LoggerFactory.getLogger(TikaHttpClientImpl.class);
@@ -38,10 +43,14 @@ public class TikaHttpClientImpl implements TikaHttpClient {
 
     private final TikaConfiguration tikaConfiguration;
     private final URI recursiveMetaData;
+    private final HttpClient httpClient;
 
     public TikaHttpClientImpl(TikaConfiguration tikaConfiguration) throws URISyntaxException {
         this.tikaConfiguration = tikaConfiguration;
         this.recursiveMetaData = buildURI(tikaConfiguration).resolve(RECURSIVE_METADATA_AS_TEXT_ENDPOINT);
+
+        httpClient = HttpClient.create()
+            .responseTimeout(Duration.ofMillis(tikaConfiguration.getTimeoutInMillis()));
     }
 
     private URI buildURI(TikaConfiguration tikaConfiguration) throws URISyntaxException {
@@ -53,23 +62,31 @@ public class TikaHttpClientImpl implements TikaHttpClient {
     }
 
     @Override
-    public Optional<InputStream> recursiveMetaDataAsJson(InputStream inputStream, org.apache.james.mailbox.model.ContentType contentType) {
-        try {
-            ContentType httpContentType = ContentType.create(contentType.mimeType().asString(),
-                contentType.charset()
-                    .map(Charset::name)
-                    .orElse(null));
-            return Optional.ofNullable(
-                    Request.Put(recursiveMetaData)
-                        .socketTimeout(tikaConfiguration.getTimeoutInMillis())
-                        .bodyStream(inputStream, httpContentType)
-                        .execute()
-                        .returnContent()
-                        .asStream());
-        } catch (IOException e) {
-            LOGGER.warn("Failing to call Tika for content type {}", contentType, e);
-            return Optional.empty();
-        }
+    public Mono<InputStream> recursiveMetaDataAsJson(InputStream inputStream, org.apache.james.mailbox.model.ContentType contentType) {
+        ContentType httpContentType = ContentType.create(contentType.mimeType().asString(),
+            contentType.charset()
+                .map(Charset::name)
+                .orElse(null));
+
+        return httpClient
+            .headers(headers -> headers.set(HttpHeaderNames.CONTENT_TYPE, httpContentType.toString()))
+            .put()
+            .uri(recursiveMetaData)
+            .send(ReactorUtils.toChunks(inputStream, 16 * 1024)
+                .map(Unpooled::wrappedBuffer)
+                .subscribeOn(Schedulers.elastic()))
+            .responseSingle((resp, content) -> {
+                if (resp.status().code() == 200) {
+                    return content.asInputStream();
+                } else {
+                    LOGGER.warn("Failing to call Tika for content type {} status {}", contentType, resp.status().code());
+                    return Mono.empty();
+                }
+            })
+            .onErrorResume(e -> {
+                LOGGER.warn("Failing to call Tika for content type {}", contentType, e);
+                return Mono.empty();
+            });
     }
 
 }
diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java
index b7fc39fa50..8c639096b7 100644
--- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java
+++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/TikaTextExtractor.java
@@ -52,6 +52,8 @@ import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableList;
 import com.google.common.collect.ImmutableMap;
 
+import reactor.core.publisher.Mono;
+
 public class TikaTextExtractor implements TextExtractor {
     private static final ContentType.MediaType TEXT = ContentType.MediaType.of("text");
 
@@ -77,24 +79,30 @@ public class TikaTextExtractor implements TextExtractor {
     }
 
     @Override
-    public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception {
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) {
         if (contentType.mediaType().equals(TEXT)) {
-            return jsoupTextExtractor.extractContent(inputStream, contentType);
+            return jsoupTextExtractor.extractContentReactive(inputStream, contentType);
         }
-        return metricFactory.decorateSupplierWithTimerMetric("tikaTextExtraction", Throwing.supplier(
-            () -> performContentExtraction(inputStream, contentType))
-            .sneakyThrow());
+        return Mono.from(metricFactory.decoratePublisherWithTimerMetric("tikaTextExtraction",
+            performContentExtraction(inputStream, contentType)));
+    }
+
+    @Override
+    public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception {
+        return extractContentReactive(inputStream, contentType)
+            .block();
     }
 
-    public ParsedContent performContentExtraction(InputStream inputStream, ContentType contentType) throws IOException {
-        ContentAndMetadata contentAndMetadata = convert(tikaHttpClient.recursiveMetaDataAsJson(inputStream, contentType));
-        return new ParsedContent(contentAndMetadata.getContent(), contentAndMetadata.getMetadata());
+    public Mono<ParsedContent> performContentExtraction(InputStream inputStream, ContentType contentType) {
+        Mono<ContentAndMetadata> contentAndMetadata = convert(tikaHttpClient.recursiveMetaDataAsJson(inputStream, contentType));
+        return contentAndMetadata
+            .map(result -> new ParsedContent(result.getContent(), result.getMetadata()));
     }
 
-    private ContentAndMetadata convert(Optional<InputStream> maybeInputStream) throws IOException {
+    private Mono<ContentAndMetadata> convert(Mono<InputStream> maybeInputStream) {
         return maybeInputStream
                 .map(Throwing.function(inputStream -> objectMapper.readValue(inputStream, ContentAndMetadata.class)))
-                .orElse(ContentAndMetadata.empty());
+                .switchIfEmpty(Mono.just(ContentAndMetadata.empty()));
     }
 
     @VisibleForTesting
diff --git a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java
index 6722733e87..54cea13a42 100644
--- a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java
+++ b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/CachingTextExtractorTest.java
@@ -19,7 +19,6 @@
 
 package org.apache.james.mailbox.tika;
 
-import static org.assertj.core.api.Assertions.assertThat;
 import static org.assertj.core.api.Assertions.assertThatThrownBy;
 import static org.mockito.ArgumentMatchers.any;
 import static org.mockito.Mockito.mock;
@@ -54,7 +53,9 @@ import com.github.fge.lambdas.Throwing;
 import com.google.common.base.Strings;
 import com.google.common.collect.ImmutableMap;
 
-public class CachingTextExtractorTest {
+import reactor.core.publisher.Mono;
+
+class CachingTextExtractorTest {
 
     private static final ParsedContent RESULT = new ParsedContent(Optional.of("content"), ImmutableMap.of());
     public static final String BIG_STRING = Strings.repeat("0123456789", 103 * 1024);
@@ -69,7 +70,7 @@ public class CachingTextExtractorTest {
     private TextExtractor wrappedTextExtractor;
 
     @BeforeEach
-    void setUp() throws Exception {
+    void setUp() {
         wrappedTextExtractor = mock(TextExtractor.class);
         textExtractor = new CachingTextExtractor(wrappedTextExtractor,
             TikaConfiguration.DEFAULT_CACHE_EVICTION_PERIOD,
@@ -77,15 +78,15 @@ public class CachingTextExtractorTest {
             new RecordingMetricFactory(),
             new NoopGaugeRegistry());
 
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(RESULT);
+        when(wrappedTextExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.just(RESULT));
     }
 
     @Test
     void extractContentShouldCallUnderlyingTextExtractor() throws Exception {
         textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE);
 
-        verify(wrappedTextExtractor, times(1)).extractContent(any(), any());
+        verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), any());
         verifyNoMoreInteractions(wrappedTextExtractor);
     }
 
@@ -94,79 +95,30 @@ public class CachingTextExtractorTest {
         textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE);
         textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE);
 
-        verify(wrappedTextExtractor, times(1)).extractContent(any(), any());
+        verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), any());
         verifyNoMoreInteractions(wrappedTextExtractor);
     }
 
     @Test
-    void extractContentShouldPropagateCheckedException() throws Exception {
+    void extractContentShouldPropagateCheckedException() {
         IOException ioException = new IOException("Any");
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenThrow(ioException);
+        when(wrappedTextExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.error(ioException));
 
         assertThatThrownBy(() -> textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE))
-            .isEqualTo(ioException);
+            .hasCause(ioException);
     }
 
     @Test
-    void extractContentShouldPropagateRuntimeException() throws Exception {
+    void extractContentShouldPropagateRuntimeException() {
         RuntimeException runtimeException = new RuntimeException("Any");
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenThrow(runtimeException);
+        when(wrappedTextExtractor.extractContentReactive(any(), any()))
+            .thenReturn(Mono.error(runtimeException));
 
         assertThatThrownBy(() -> textExtractor.extractContent(INPUT_STREAM.get(), CONTENT_TYPE))
             .isEqualTo(runtimeException);
     }
 
-    @Test
-    void cacheShouldEvictEntriesWhenFull() throws Exception {
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        IntStream.range(0, 10)
-            .mapToObj(STREAM_GENERATOR::apply)
-            .forEach(Throwing.consumer(inputStream -> textExtractor.extractContent(inputStream, CONTENT_TYPE)));
-
-        assertThat(textExtractor.size())
-            .isLessThanOrEqualTo(5);
-    }
-
-    @Test
-    void olderEntriesShouldBeEvictedFirst() throws Exception {
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        IntStream.range(0, 10)
-            .mapToObj(STREAM_GENERATOR::apply)
-            .forEach(Throwing.consumer(inputStream -> textExtractor.extractContent(inputStream, CONTENT_TYPE)));
-
-        reset(wrappedTextExtractor);
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        textExtractor.extractContent(STREAM_GENERATOR.apply(1), CONTENT_TYPE);
-
-        verify(wrappedTextExtractor).extractContent(any(), any());
-    }
-
-    @Test
-    void youngerEntriesShouldBePreservedByEviction() throws Exception {
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        IntStream.range(0, 10)
-            .mapToObj(STREAM_GENERATOR::apply)
-            .forEach(Throwing.consumer(inputStream -> textExtractor.extractContent(inputStream, CONTENT_TYPE)));
-
-        reset(wrappedTextExtractor);
-        when(wrappedTextExtractor.extractContent(any(), any()))
-            .thenReturn(_2MiB_RESULT);
-
-        textExtractor.extractContent(STREAM_GENERATOR.apply(9), CONTENT_TYPE);
-
-        verifyZeroInteractions(wrappedTextExtractor);
-    }
-
     @Test
     void frequentlyAccessedEntriesShouldBePreservedByEviction() throws Exception {
         when(wrappedTextExtractor.extractContent(any(), any()))
@@ -191,7 +143,7 @@ public class CachingTextExtractorTest {
             .threadCount(10)
             .runSuccessfullyWithin(Duration.ofMinutes(1));
 
-        verify(wrappedTextExtractor, times(1)).extractContent(any(), any());
+        verify(wrappedTextExtractor, times(1)).extractContentReactive(any(), any());
     }
 
 }
\ No newline at end of file
diff --git a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java
index e47cee2cee..6c0f76ac0f 100644
--- a/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java
+++ b/mailbox/tika/src/test/java/org/apache/james/mailbox/tika/TikaTextExtractorTest.java
@@ -26,7 +26,6 @@ import java.io.ByteArrayInputStream;
 import java.io.InputStream;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
-import java.util.Optional;
 
 import org.apache.commons.io.IOUtils;
 import org.apache.james.mailbox.extractor.ParsedContent;
@@ -42,6 +41,8 @@ import com.fasterxml.jackson.databind.node.ArrayNode;
 import com.fasterxml.jackson.databind.node.JsonNodeFactory;
 import com.fasterxml.jackson.databind.node.TextNode;
 
+import reactor.core.publisher.Mono;
+
 class TikaTextExtractorTest {
 
     TextExtractor textExtractor;
@@ -170,7 +171,7 @@ class TikaTextExtractorTest {
     void deserializerShouldNotThrowWhenMoreThanOneNode() throws Exception {
         TikaTextExtractor textExtractor = new TikaTextExtractor(
             new RecordingMetricFactory(),
-            (inputStream, contentType) -> Optional.of(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"This is an awesome LibreOffice document !\"}, " +
+            (inputStream, contentType) -> Mono.just(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"This is an awesome LibreOffice document !\"}, " +
                                                             "{\"Chroma BlackIsZero\": \"true\"}]")
                                                         .getBytes(StandardCharsets.UTF_8))));
 
@@ -183,7 +184,7 @@ class TikaTextExtractorTest {
         String expectedExtractedContent = "content A";
         TikaTextExtractor textExtractor = new TikaTextExtractor(
             new RecordingMetricFactory(),
-            (inputStream, contentType) -> Optional.of(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"" + expectedExtractedContent + "\"}, " +
+            (inputStream, contentType) -> Mono.just(new ByteArrayInputStream(("[{\"X-TIKA:content\": \"" + expectedExtractedContent + "\"}, " +
                                                             "{\"X-TIKA:content\": \"content B\"}]")
                                                         .getBytes(StandardCharsets.UTF_8))));
 
@@ -198,7 +199,7 @@ class TikaTextExtractorTest {
     void deserializerShouldThrowWhenNodeIsNotAnObject() {
         TikaTextExtractor textExtractor = new TikaTextExtractor(
             new RecordingMetricFactory(),
-            (inputStream, contentType) -> Optional.of(new ByteArrayInputStream("[\"value1\"]"
+            (inputStream, contentType) -> Mono.just(new ByteArrayInputStream("[\"value1\"]"
                                                         .getBytes(StandardCharsets.UTF_8))));
 
         InputStream inputStream = new ByteArrayInputStream("toto".getBytes(StandardCharsets.UTF_8));
diff --git a/pom.xml b/pom.xml
index dce21c398c..0817a60cf4 100644
--- a/pom.xml
+++ b/pom.xml
@@ -2092,6 +2092,11 @@
                 <artifactId>jackson-datatype-jsr310</artifactId>
                 <version>${jackson.version}</version>
             </dependency>
+            <dependency>
+                <groupId>com.github.ben-manes.caffeine</groupId>
+                <artifactId>caffeine</artifactId>
+                <version>3.0.5</version>
+            </dependency>
             <dependency>
                 <groupId>com.github.dpaukov</groupId>
                 <artifactId>combinatoricslib3</artifactId>


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 03/03: JAMES-3719 Reading inputstream might be blocking

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit 14fb2f6d7c7df15459a87032df5c3a38adbbdb42
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed May 11 15:15:58 2022 +0700

    JAMES-3719 Reading inputstream might be blocking
    
    Subscribes on an elastic scheduler when blocking reads
    might be performed.
---
 .../james/mailbox/extractor/TextExtractor.java       |  4 +++-
 .../store/extractor/DefaultTextExtractor.java        | 14 ++++++++++++++
 .../mailbox/store/extractor/JsoupTextExtractor.java  | 20 ++++++++++++++++++++
 3 files changed, 37 insertions(+), 1 deletion(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
index 2822ee02e8..7891557039 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
@@ -24,6 +24,7 @@ import java.io.InputStream;
 import org.apache.james.mailbox.model.ContentType;
 
 import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
 
 public interface TextExtractor {
     default boolean applicable(ContentType contentType) {
@@ -33,7 +34,8 @@ public interface TextExtractor {
     ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception;
 
     default Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) {
-        return Mono.fromCallable(() -> extractContent(inputStream, contentType));
+        return Mono.fromCallable(() -> extractContent(inputStream, contentType))
+            .subscribeOn(Schedulers.elastic());
     }
 
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java
index 50cc8b68e3..2605f850e6 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java
@@ -30,6 +30,9 @@ import org.apache.james.mailbox.extractor.ParsedContent;
 import org.apache.james.mailbox.extractor.TextExtractor;
 import org.apache.james.mailbox.model.ContentType;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 /**
  * A default text extractor that is directly based on the input file provided.
  * 
@@ -50,4 +53,15 @@ public class DefaultTextExtractor implements TextExtractor {
             return new ParsedContent(Optional.empty(), new HashMap<>());
         }
     }
+
+    @Override
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) {
+        if (applicable(contentType)) {
+            Charset charset = contentType.charset().orElse(StandardCharsets.UTF_8);
+            return Mono.fromCallable(() -> new ParsedContent(Optional.ofNullable(IOUtils.toString(inputStream, charset)), new HashMap<>()))
+                .subscribeOn(Schedulers.elastic());
+        } else {
+            return Mono.just(new ParsedContent(Optional.empty(), new HashMap<>()));
+        }
+    }
 }
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java
index b06f55ffc0..48988c1dd0 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java
@@ -37,6 +37,9 @@ import org.jsoup.nodes.Document;
 
 import com.google.common.collect.ImmutableMap;
 
+import reactor.core.publisher.Mono;
+import reactor.core.scheduler.Schedulers;
+
 public class JsoupTextExtractor implements TextExtractor {
     private static final String TITLE_HTML_TAG = "title";
     private static final String NO_BASE_URI = "";
@@ -67,6 +70,23 @@ public class JsoupTextExtractor implements TextExtractor {
         return ParsedContent.empty();
     }
 
+    @Override
+    public Mono<ParsedContent> extractContentReactive(InputStream inputStream, ContentType contentType) {
+        if (inputStream == null || contentType == null) {
+            return Mono.just(ParsedContent.empty());
+        }
+        Charset charset = contentType.charset().orElse(StandardCharsets.UTF_8);
+        if (contentType.mimeType().equals(TEXT_HTML)) {
+            return Mono.fromCallable(() -> parseHtmlContent(inputStream, charset))
+                .subscribeOn(Schedulers.elastic());
+        }
+        if (contentType.mimeType().equals(TEXT_PLAIN)) {
+            return Mono.fromCallable(() -> parsePlainTextContent(inputStream, charset))
+                .subscribeOn(Schedulers.elastic());
+        }
+        return Mono.just(ParsedContent.empty());
+    }
+
     private ParsedContent parsePlainTextContent(InputStream inputStream, Charset charset) throws IOException {
         return new ParsedContent(Optional.ofNullable(IOUtils.toString(inputStream, charset)), EMPTY_METADATA);
     }


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org


[james-project] 02/03: JAMES-3719 Avoid copying mime part content when we are not to extract its content

Posted by bt...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit c9d4adf901ff1f575b5fcf3cbf4291c7cfca9e89
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Wed Mar 2 14:45:44 2022 +0700

    JAMES-3719 Avoid copying mime part content when we are not to extract its content
---
 .../james/mailbox/extractor/TextExtractor.java     |  3 ++
 .../mailbox/elasticsearch/v7/json/MimePart.java    | 52 +++++++++++-----------
 .../v7/json/MimePartContainerBuilder.java          |  3 --
 .../elasticsearch/v7/json/MimePartParser.java      |  4 +-
 .../v7/json/RootMimePartContainerBuilder.java      |  5 ---
 .../v7/json/IndexableMessageTest.java              |  1 +
 .../elasticsearch/v7/json/MimePartTest.java        |  4 +-
 .../store/extractor/DefaultTextExtractor.java      |  6 ++-
 .../store/extractor/JsoupTextExtractor.java        |  8 ++++
 .../tika/ContentTypeFilteringTextExtractor.java    |  5 +++
 10 files changed, 51 insertions(+), 40 deletions(-)

diff --git a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
index 2fdf7e555e..2822ee02e8 100644
--- a/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
+++ b/mailbox/api/src/main/java/org/apache/james/mailbox/extractor/TextExtractor.java
@@ -26,6 +26,9 @@ import org.apache.james.mailbox.model.ContentType;
 import reactor.core.publisher.Mono;
 
 public interface TextExtractor {
+    default boolean applicable(ContentType contentType) {
+        return true;
+    }
 
     ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception;
 
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
index 918306fddf..8a286b057d 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePart.java
@@ -25,6 +25,7 @@ import java.nio.charset.Charset;
 import java.nio.charset.StandardCharsets;
 import java.util.List;
 import java.util.Optional;
+import java.util.function.Predicate;
 import java.util.stream.Stream;
 
 import org.apache.commons.io.FilenameUtils;
@@ -34,7 +35,6 @@ import org.apache.james.mailbox.extractor.TextExtractor;
 import org.apache.james.mailbox.model.ContentType;
 import org.apache.james.mailbox.model.ContentType.MediaType;
 import org.apache.james.mailbox.model.ContentType.SubType;
-import org.apache.james.mailbox.store.extractor.DefaultTextExtractor;
 import org.apache.james.mime4j.stream.Field;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -42,7 +42,6 @@ import org.slf4j.LoggerFactory;
 import com.fasterxml.jackson.annotation.JsonIgnore;
 import com.fasterxml.jackson.annotation.JsonProperty;
 import com.github.fge.lambdas.Throwing;
-import com.google.common.base.Preconditions;
 import com.google.common.collect.ImmutableMap;
 import com.google.common.collect.Lists;
 
@@ -62,9 +61,10 @@ public class MimePart {
         private Optional<String> fileExtension;
         private Optional<String> contentDisposition;
         private Optional<Charset> charset;
-        private TextExtractor textExtractor;
+        private Predicate<ContentType> shouldCaryOverContent;
 
-        private Builder() {
+        private Builder(Predicate<ContentType> shouldCaryOverContent) {
+            this.shouldCaryOverContent = shouldCaryOverContent;
             children = Lists.newArrayList();
             headerCollectionBuilder = HeaderCollection.builder();
             this.bodyContent = Optional.empty();
@@ -74,7 +74,6 @@ public class MimePart {
             this.fileExtension = Optional.empty();
             this.contentDisposition = Optional.empty();
             this.charset = Optional.empty();
-            this.textExtractor = new DefaultTextExtractor();
         }
 
         @Override
@@ -120,27 +119,32 @@ public class MimePart {
             return this;
         }
 
-        @Override
-        public MimePartContainerBuilder using(TextExtractor textExtractor) {
-            Preconditions.checkArgument(textExtractor != null, "Provided text extractor should not be null");
-            this.textExtractor = textExtractor;
-            return this;
-        }
-
         @Override
         public MimePartContainerBuilder charset(Charset charset) {
             this.charset = Optional.of(charset);
             return this;
         }
 
+        private Optional<ContentType> computeContentType() {
+            if (mediaType.isPresent() && subType.isPresent()) {
+                return Optional.of(ContentType.of(
+                    ContentType.MimeType.of(mediaType.get(), subType.get()),
+                    charset));
+            } else {
+                return Optional.empty();
+            }
+        }
+
         @Override
         public ParsedMimePart build() {
+            final Optional<ContentType> contentType = computeContentType();
             return new ParsedMimePart(
                 headerCollectionBuilder.build(),
-                bodyContent,
+                bodyContent.filter(any -> shouldCaryOverContent.test(contentType.orElse(null))),
                 charset,
                 mediaType,
                 subType,
+                contentType,
                 fileName,
                 fileExtension,
                 contentDisposition,
@@ -154,6 +158,7 @@ public class MimePart {
         private final Optional<Charset> charset;
         private final Optional<MediaType> mediaType;
         private final Optional<SubType> subType;
+        private Optional<ContentType> contentType;
         private final Optional<String> fileName;
         private final Optional<String> fileExtension;
         private final Optional<String> contentDisposition;
@@ -161,17 +166,19 @@ public class MimePart {
 
         public ParsedMimePart(HeaderCollection headerCollection, Optional<InputStream> bodyContent, Optional<Charset> charset,
                               Optional<MediaType> mediaType,
-                              Optional<SubType> subType, Optional<String> fileName, Optional<String> fileExtension,
+                              Optional<SubType> subType, Optional<ContentType> contentType, Optional<String> fileName, Optional<String> fileExtension,
                               Optional<String> contentDisposition, List<ParsedMimePart> attachments) {
             this.headerCollection = headerCollection;
-            this.bodyContent = bodyContent.map(Throwing.function(IOUtils::toByteArray));
             this.mediaType = mediaType;
             this.subType = subType;
+            this.contentType = contentType;
             this.fileName = fileName;
             this.fileExtension = fileExtension;
             this.contentDisposition = contentDisposition;
             this.attachments = attachments;
             this.charset = charset;
+
+            this.bodyContent = bodyContent.map(Throwing.function(IOUtils::toByteArray));
         }
 
         public Mono<MimePart> asMimePart(TextExtractor textExtractor) {
@@ -196,7 +203,7 @@ public class MimePart {
             if (shouldPerformTextExtraction()) {
                 return textExtractor.extractContentReactive(
                     new ByteArrayInputStream(bodyContent.get()),
-                    computeContentType().orElse(null));
+                    contentType.orElse(null));
             }
             return Mono.fromCallable(() -> new ParsedContent(
                 Optional.ofNullable(IOUtils.toString(new ByteArrayInputStream(bodyContent.get()), charset.orElse(StandardCharsets.UTF_8))),
@@ -215,19 +222,10 @@ public class MimePart {
             return isTextBody() && subType.map(SubType.of("html")::equals).orElse(false);
         }
 
-        private Optional<ContentType> computeContentType() {
-            if (mediaType.isPresent() && subType.isPresent()) {
-                return Optional.of(ContentType.of(
-                    ContentType.MimeType.of(mediaType.get(), subType.get()),
-                    charset));
-            } else {
-                return Optional.empty();
-            }
-        }
     }
     
-    public static Builder builder() {
-        return new Builder();
+    public static Builder builder(Predicate<ContentType> shouldCaryOverContent) {
+        return new Builder(shouldCaryOverContent);
     }
 
     private static final Logger LOGGER = LoggerFactory.getLogger(MimePart.class);
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
index d3d88c4f67..5e4fb1ee89 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartContainerBuilder.java
@@ -22,7 +22,6 @@ package org.apache.james.mailbox.elasticsearch.v7.json;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 
-import org.apache.james.mailbox.extractor.TextExtractor;
 import org.apache.james.mailbox.model.ContentType.MediaType;
 import org.apache.james.mailbox.model.ContentType.SubType;
 import org.apache.james.mime4j.stream.Field;
@@ -31,8 +30,6 @@ public interface MimePartContainerBuilder {
 
     MimePart.ParsedMimePart build();
 
-    MimePartContainerBuilder using(TextExtractor textExtractor);
-
     MimePartContainerBuilder addToHeaders(Field field);
 
     MimePartContainerBuilder addBodyContent(InputStream bodyContent);
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
index 81a1a15e8a..ef6c18f50f 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartParser.java
@@ -78,7 +78,7 @@ public class MimePartParser {
                 stackCurrent();
                 break;
             case T_START_HEADER:
-                currentlyBuildMimePart = MimePart.builder();
+                currentlyBuildMimePart = MimePart.builder(textExtractor::applicable);
                 break;
             case T_FIELD:
                 currentlyBuildMimePart.addToHeaders(stream.getField());
@@ -107,7 +107,7 @@ public class MimePartParser {
     }
     
     private void closeMimePart() {
-        MimePart.ParsedMimePart bodyMimePart = currentlyBuildMimePart.using(textExtractor).build();
+        MimePart.ParsedMimePart bodyMimePart = currentlyBuildMimePart.build();
         if (!builderStack.isEmpty()) {
             builderStack.peek().addChild(bodyMimePart);
         } else {
diff --git a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
index 93755a6790..fc394205ca 100644
--- a/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
+++ b/mailbox/elasticsearch-v7/src/main/java/org/apache/james/mailbox/elasticsearch/v7/json/RootMimePartContainerBuilder.java
@@ -22,7 +22,6 @@ package org.apache.james.mailbox.elasticsearch.v7.json;
 import java.io.InputStream;
 import java.nio.charset.Charset;
 
-import org.apache.james.mailbox.extractor.TextExtractor;
 import org.apache.james.mailbox.model.ContentType.MediaType;
 import org.apache.james.mailbox.model.ContentType.SubType;
 import org.apache.james.mime4j.stream.Field;
@@ -40,10 +39,6 @@ public class RootMimePartContainerBuilder implements MimePartContainerBuilder {
         return rootMimePart;
     }
 
-    @Override public MimePartContainerBuilder using(TextExtractor textExtractor) {
-        return this;
-    }
-
     @Override
     public MimePartContainerBuilder addToHeaders(Field field) {
         LOGGER.warn("Trying to add headers to the Root MimePart container");
diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
index 62e4d85871..c51cdb4929 100644
--- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
+++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/IndexableMessageTest.java
@@ -232,6 +232,7 @@ class IndexableMessageTest {
             .thenReturn(MESSAGE_UID);
 
         TextExtractor textExtractor = mock(TextExtractor.class);
+        when(textExtractor.applicable(any())).thenReturn(true);
         when(textExtractor.extractContentReactive(any(), any()))
             .thenReturn(Mono.just(new ParsedContent(Optional.of("first attachment content"), ImmutableMap.of())))
             .thenReturn(Mono.error(new RuntimeException("second cannot be parsed")))
diff --git a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
index f2afdd0793..072e8d493b 100644
--- a/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
+++ b/mailbox/elasticsearch-v7/src/test/java/org/apache/james/mailbox/elasticsearch/v7/json/MimePartTest.java
@@ -32,7 +32,7 @@ class MimePartTest {
 
     @Test
     void buildShouldWorkWhenTextualContentFromParserIsEmpty() {
-        MimePart.builder()
+        MimePart.builder(contentType -> true)
             .addBodyContent(new ByteArrayInputStream(new byte[] {}))
             .addMediaType(MediaType.of("text"))
             .addSubType(SubType.of("plain"))
@@ -42,7 +42,7 @@ class MimePartTest {
     @Test
     void buildShouldWorkWhenTextualContentFromParserIsNonEmpty() {
         String body = "text";
-        MimePart mimePart = MimePart.builder()
+        MimePart mimePart = MimePart.builder(contentType -> true)
             .addBodyContent(new ByteArrayInputStream(body.getBytes(StandardCharsets.UTF_8)))
             .addMediaType(MediaType.of("text"))
             .addSubType(SubType.of("plain"))
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java
index 65c03bb625..50cc8b68e3 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/DefaultTextExtractor.java
@@ -36,10 +36,14 @@ import org.apache.james.mailbox.model.ContentType;
  * Costs less calculations that TikaTextExtractor, but result is not that good.
  */
 public class DefaultTextExtractor implements TextExtractor {
+    @Override
+    public boolean applicable(ContentType contentType) {
+        return contentType != null && contentType.asString().startsWith("text/");
+    }
 
     @Override
     public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception {
-        if (contentType != null && contentType.asString().startsWith("text/")) {
+        if (applicable(contentType)) {
             Charset charset = contentType.charset().orElse(StandardCharsets.UTF_8);
             return new ParsedContent(Optional.ofNullable(IOUtils.toString(inputStream, charset)), new HashMap<>());
         } else {
diff --git a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java
index 45378aefb8..b06f55ffc0 100644
--- a/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java
+++ b/mailbox/store/src/main/java/org/apache/james/mailbox/store/extractor/JsoupTextExtractor.java
@@ -44,6 +44,14 @@ public class JsoupTextExtractor implements TextExtractor {
     private static final MimeType TEXT_HTML = MimeType.of("text/html");
     private static final MimeType TEXT_PLAIN = MimeType.of("text/plain");
 
+    @Override
+    public boolean applicable(ContentType contentType) {
+        if (contentType == null) {
+            return false;
+        }
+        return contentType.mimeType().equals(TEXT_HTML) || contentType.mimeType().equals(TEXT_PLAIN);
+    }
+
     @Override
     public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception {
         if (inputStream == null || contentType == null) {
diff --git a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
index 63cf46ee3f..80538174de 100644
--- a/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
+++ b/mailbox/tika/src/main/java/org/apache/james/mailbox/tika/ContentTypeFilteringTextExtractor.java
@@ -40,6 +40,11 @@ public class ContentTypeFilteringTextExtractor implements TextExtractor {
         this.contentTypeBlacklist = contentTypeBlacklist;
     }
 
+    @Override
+    public boolean applicable(ContentType contentType) {
+        return !isBlacklisted(contentType.mimeType());
+    }
+
     @Override
     public ParsedContent extractContent(InputStream inputStream, ContentType contentType) throws Exception {
         if (isBlacklisted(contentType.mimeType())) {


---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org