You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by ma...@apache.org on 2018/08/01 12:34:26 UTC

[08/14] james-project git commit: JAMES-2502 merge util and util-java8

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java b/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
new file mode 100644
index 0000000..9dcae7a
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/FluentFutureStream.java
@@ -0,0 +1,191 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.Optional;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+
+public class FluentFutureStream<T> {
+
+    private final CompletableFuture<Stream<T>> completableFuture;
+
+    /**
+     * Constructs a FluentFutureStream from a future of Stream.
+     */
+    public static <T> FluentFutureStream<T> of(CompletableFuture<Stream<T>> completableFuture) {
+        return new FluentFutureStream<>(completableFuture);
+    }
+
+    /**
+     * Constructs a FluentFutureStream from a Stream of Future
+     */
+    public static <T> FluentFutureStream<T> of(Stream<CompletableFuture<T>> completableFutureStream) {
+        return new FluentFutureStream<>(CompletableFutureUtil.allOf(completableFutureStream));
+    }
+
+    /**
+     * Constructs a FluentFutureStream from a Stream of Future of Stream.
+     *
+     * Underlying streams are flatMapped.
+     */
+    public static <T> FluentFutureStream<T> ofNestedStreams(Stream<CompletableFuture<Stream<T>>> completableFuture) {
+        return of(completableFuture)
+            .flatMap(Function.identity());
+    }
+
+    /**
+     * Constructs a FluentFutureStream from a Stream of Future of Optionals.
+     *
+     * Underlying optionals are unboxed.
+     */
+    public static <T> FluentFutureStream<T> ofOptionals(Stream<CompletableFuture<Optional<T>>> completableFuture) {
+        return of(completableFuture)
+            .flatMapOptional(Function.identity());
+    }
+
+    /**
+     * Constructs a FluentFutureStream from the supplied futures.
+     */
+    @SafeVarargs
+    public static <T> FluentFutureStream<T> ofFutures(CompletableFuture<T>... completableFutures) {
+        return new FluentFutureStream<>(CompletableFutureUtil.allOfArray(completableFutures));
+    }
+
+    private FluentFutureStream(CompletableFuture<Stream<T>> completableFuture) {
+        this.completableFuture = completableFuture;
+    }
+
+    /**
+     * For all values of the underlying stream, an action will be performed.
+     */
+    public FluentFutureStream<T> performOnAll(Function<T, CompletableFuture<Void>> action) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.performOnAll(completableFuture(), action));
+    }
+
+    /**
+     * Apply a transformation to all values of the underlying stream.
+     */
+    public <U> FluentFutureStream<U> map(Function<T, U> function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.map(completableFuture(), function));
+    }
+
+    /**
+     * Apply a transformation to all value of the underlying stream.
+     *
+     * As the supplied transformation produces streams, the results will be flatMapped.
+     */
+    public <U> FluentFutureStream<U> flatMap(Function<T, Stream<U>> function) {
+        return FluentFutureStream.of(completableFuture().thenApply(stream ->
+            stream.flatMap(function)));
+    }
+
+    /**
+     * Apply a transformation to all value of the underlying stream.
+     *
+     * As the supplied transformation produces optionals, the results will be unboxed.
+     */
+    public <U> FluentFutureStream<U> flatMapOptional(Function<T, Optional<U>> function) {
+        return map(function)
+            .flatMap(OptionalUtils::toStream);
+    }
+
+    /**
+     * Apply a transformation to all value of the underlying stream.
+     *
+     * As the supplied transformation produces futures, we need to compose the returned values.
+     */
+    public <U> FluentFutureStream<U> thenComposeOnAll(Function<T, CompletableFuture<U>> function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.thenComposeOnAll(completableFuture(), function));
+    }
+
+    /**
+     * Apply a transformation to all value of the underlying stream.
+     *
+     * As the supplied transformation produces futures of stream, we need to compose then flatMap the returned values.
+     */
+    public <U> FluentFutureStream<U> thenFlatCompose(Function<T, CompletableFuture<Stream<U>>> function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.thenComposeOnAll(completableFuture(), function))
+            .flatMap(Function.identity());
+    }
+
+    /**
+     * Apply a transformation to all value of the underlying stream.
+     *
+     * As the supplied transformation produces futures of optionals, we need to compose then unbox the returned values.
+     */
+    public <U> FluentFutureStream<U> thenFlatComposeOnOptional(Function<T, CompletableFuture<Optional<U>>> function) {
+        return FluentFutureStream.of(
+            CompletableFutureUtil.thenComposeOnAll(completableFuture(), function))
+            .flatMapOptional(Function.identity());
+    }
+
+    /**
+     * Filter the values of the underlying stream.
+     */
+    public FluentFutureStream<T> filter(Predicate<T> predicate) {
+        return FluentFutureStream.of(completableFuture
+            .thenApply(stream -> stream.filter(predicate)));
+    }
+
+    /**
+     * Reduces the underlying stream. Reduced value is supplied as a Future of optional, as no empty value is supplied.
+     */
+    public CompletableFuture<Optional<T>> reduce(BinaryOperator<T> combiner) {
+        return CompletableFutureUtil.reduce(combiner, completableFuture);
+    }
+
+    /**
+     * educes the underlying stream. Reduced value is supplied as a Future, as an empty value is specified.
+     */
+    public CompletableFuture<T> reduce(T emptyAccumulator, BinaryOperator<T> combiner) {
+        return CompletableFutureUtil.reduce(combiner, completableFuture, emptyAccumulator);
+    }
+
+    /**
+     * Returns a future of the underlying stream.
+     */
+    public CompletableFuture<Stream<T>> completableFuture() {
+        return this.completableFuture;
+    }
+
+    /**
+     * Returns the future of the underlying collected stream.
+     */
+    public <C> CompletableFuture<C> collect(Collector<T, ?, C> collector) {
+        return this.completableFuture
+            .thenApply(stream -> stream.collect(collector));
+    }
+
+    /**
+     * Join and returns the underlying stream.
+     */
+    public Stream<T> join() {
+        return completableFuture().join();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/GuavaUtils.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/GuavaUtils.java b/server/container/util/src/main/java/org/apache/james/util/GuavaUtils.java
new file mode 100644
index 0000000..518ead1
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/GuavaUtils.java
@@ -0,0 +1,37 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.List;
+import java.util.Map;
+
+import org.apache.commons.lang3.tuple.Pair;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.collect.ImmutableListMultimap;
+
+public class GuavaUtils {
+    public static <K, V> ImmutableListMultimap<K, V> toMultimap(Map<K, List<V>> rights) {
+        return rights.entrySet()
+            .stream()
+            .flatMap(e -> e.getValue().stream().map(right -> Pair.of(e.getKey(), right)))
+            .collect(Guavate.toImmutableListMultimap(Pair::getKey, Pair::getValue));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/Host.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/Host.java b/server/container/util/src/main/java/org/apache/james/util/Host.java
new file mode 100644
index 0000000..cb6acce
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/Host.java
@@ -0,0 +1,137 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.List;
+import java.util.Optional;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Objects;
+import com.google.common.base.Preconditions;
+import com.google.common.base.Splitter;
+import com.google.common.base.Strings;
+import com.google.common.collect.ImmutableList;
+
+public class Host {
+
+    public static ImmutableList<Host> parseHosts(String hostsString) {
+        return parseHosts(hostsString, Optional.empty());
+    }
+
+    public static ImmutableList<Host> parseHosts(String hostsString, int defaultPort) {
+        return parseHosts(hostsString, Optional.of(defaultPort));
+    }
+
+    private static ImmutableList<Host> parseHosts(String hostsString, Optional<Integer> defaultPort) {
+        return Splitter.on(',')
+            .omitEmptyStrings()
+            .splitToList(hostsString)
+            .stream()
+            .map(string -> Host.parse(string, defaultPort))
+            .distinct()
+            .collect(Guavate.toImmutableList());
+    }
+
+    public static Host from(String hostname, int port) {
+        return new Host(hostname, port);
+    }
+
+    public static Host parseConfString(String ipAndPort, int defaultPort) {
+        return parse(ipAndPort, Optional.of(defaultPort));
+    }
+
+    public static Host parseConfString(String ipAndPort) {
+        return parse(ipAndPort, Optional.empty());
+    }
+
+    public static Host parse(String ipAndPort, Optional<Integer> defaultPort) {
+        Preconditions.checkNotNull(ipAndPort);
+        Preconditions.checkArgument(!ipAndPort.isEmpty());
+
+        List<String> parts = retrieveHostParts(ipAndPort);
+
+        String ip = parts.get(0);
+        int port = getPortFromConfPart(parts, defaultPort);
+
+        return new Host(ip, port);
+    }
+
+    private static List<String> retrieveHostParts(String ipAndPort) {
+        List<String> parts = Splitter.on(':')
+                .trimResults()
+                .splitToList(ipAndPort);
+
+        if (parts.size() < 1 || parts.size() > 2) {
+            throw new IllegalArgumentException(ipAndPort + " is not a valid cassandra node");
+        }
+        return parts;
+    }
+
+    private static int getPortFromConfPart(List<String> parts, Optional<Integer> defaultPort) {
+        if (parts.size() == 2) {
+            return Integer.valueOf(parts.get(1));
+        }
+        if (parts.size() == 1) {
+            return defaultPort.orElseThrow(() -> new IllegalArgumentException("Host do not have port part but no default port provided"));
+        }
+        throw new RuntimeException("A host should be either a hostname or a hostname and a port separated by a ':'");
+    }
+
+    private final String hostName;
+    private final int port;
+
+    @VisibleForTesting
+    Host(String hostName, int port) {
+        Preconditions.checkNotNull(hostName, "Hostname could not be null");
+        Preconditions.checkArgument(!Strings.isNullOrEmpty(hostName), "Hostname could not be empty");
+        Port.assertValid(port);
+        this.hostName = hostName;
+        this.port = port;
+    }
+
+    public String getHostName() {
+        return hostName;
+    }
+
+    public int getPort() {
+        return port;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hashCode(hostName, port);
+    }
+
+    @Override
+    public final boolean equals(Object object) {
+        if (object instanceof Host) {
+            Host that = (Host) object;
+            return Objects.equal(this.hostName, that.hostName)
+                && Objects.equal(this.port, that.port);
+        }
+        return false;
+    }
+
+    @Override
+    public String toString() {
+        return this.hostName + ":" + this.port;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/MDCBuilder.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/MDCBuilder.java b/server/container/util/src/main/java/org/apache/james/util/MDCBuilder.java
new file mode 100644
index 0000000..e6709c8
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/MDCBuilder.java
@@ -0,0 +1,147 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.io.Closeable;
+import java.io.IOException;
+import java.util.List;
+import java.util.Map;
+import java.util.Optional;
+import java.util.function.Supplier;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.slf4j.MDC;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.annotations.VisibleForTesting;
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ImmutableList;
+import com.google.common.collect.ImmutableMap;
+
+public class MDCBuilder {
+
+    public interface VoidOperation {
+        void perform();
+    }
+
+    public static <T> T withMdc(MDCBuilder mdcBuilder, Supplier<T> answerSupplier) {
+        try (Closeable closeable = mdcBuilder.build()) {
+            try {
+                return answerSupplier.get();
+            } catch (RuntimeException e) {
+                LOGGER.error("Got error, logging its context", e);
+                throw e;
+            }
+        } catch (IOException e) {
+            throw new RuntimeException(e);
+        }
+    }
+
+    public static void withMdc(MDCBuilder mdcBuilder, VoidOperation logOperation) {
+        withMdc(mdcBuilder, () -> {
+            logOperation.perform();
+            return null;
+        });
+    }
+
+    public static final String HOST = "host";
+    public static final String IP = "ip";
+    public static final String PROTOCOL = "protocol";
+    public static final String USER = "user";
+    public static final String ACTION = "action";
+    public static final String SESSION_ID = "sessionId";
+    public static final String CHARSET = "charset";
+
+    private static final Logger LOGGER = LoggerFactory.getLogger(MDCBuilder.class);
+
+    public static class Closeables implements Closeable {
+        private final List<Closeable> closeables;
+
+        public Closeables(List<Closeable> closeables) {
+            Preconditions.checkNotNull(closeables);
+            this.closeables = ImmutableList.copyOf(closeables);
+        }
+
+        @Override
+        public void close() throws IOException {
+            closeables.forEach(this::closeQuietly);
+        }
+
+        private void closeQuietly(Closeable closeable) {
+            try {
+                closeable.close();
+            } catch (IOException e) {
+                LOGGER.warn("Failed to close Closeable", e);
+            }
+        }
+    }
+
+    public static MDCBuilder create() {
+        return new MDCBuilder();
+    }
+
+    private final ImmutableMap.Builder<String, String> contextMap = ImmutableMap.builder();
+    private final ImmutableList.Builder<MDCBuilder> nestedBuilder = ImmutableList.builder();
+
+    private MDCBuilder() {}
+
+    public MDCBuilder addContext(MDCBuilder nested) {
+        this.nestedBuilder.add(nested);
+        return this;
+    }
+
+    public MDCBuilder addContext(String key, Object value) {
+        Preconditions.checkNotNull(key);
+        Optional.ofNullable(value)
+            .ifPresent(nonNullValue -> contextMap.put(key, nonNullValue.toString()));
+        return this;
+    }
+
+    @VisibleForTesting
+    Map<String, String> buildContextMap() {
+        return ImmutableMap.<String, String>builder()
+            .putAll(nestedBuilder.build()
+                .stream()
+                .map(MDCBuilder::buildContextMap)
+                .flatMap(map -> map.entrySet().stream())
+                .collect(Guavate.toImmutableMap(Map.Entry::getKey, Map.Entry::getValue)))
+            .putAll(contextMap.build())
+            .build();
+    }
+
+    public <T> T execute(Supplier<T> supplier) {
+        return MDCBuilder.withMdc(this, supplier);
+    }
+
+    public <T> Supplier<T> wrapArround(Supplier<T> supplier) {
+        return () -> execute(supplier);
+    }
+
+    public Closeable build() {
+        return new Closeables(
+            buildContextMap()
+                .entrySet()
+                .stream()
+                .map(entry -> MDC.putCloseable(entry.getKey(), entry.getValue()))
+                .collect(Guavate.toImmutableList()));
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/MDCStructuredLogger.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/MDCStructuredLogger.java b/server/container/util/src/main/java/org/apache/james/util/MDCStructuredLogger.java
new file mode 100644
index 0000000..cf970ea
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/MDCStructuredLogger.java
@@ -0,0 +1,50 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+
+public class MDCStructuredLogger implements StructuredLogger {
+
+    public static MDCStructuredLogger forLogger(Logger logger) {
+        return new MDCStructuredLogger(logger);
+    }
+
+    private final Logger logger;
+    private final MDCBuilder mdcBuilder;
+
+    public MDCStructuredLogger(Logger logger) {
+        this.logger = logger;
+        this.mdcBuilder = MDCBuilder.create();
+    }
+
+    @Override
+    public StructuredLogger addField(String name, Object value) {
+        mdcBuilder.addContext(name, value);
+        return this;
+    }
+
+    @Override
+    public void log(Consumer<Logger> logOperation) {
+        MDCBuilder.withMdc(mdcBuilder, () -> logOperation.accept(logger));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java b/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
new file mode 100644
index 0000000..d0b4e3b
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/MemoizedSupplier.java
@@ -0,0 +1,30 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.function.Supplier;
+
+import com.google.common.base.Suppliers;
+
+public class MemoizedSupplier {
+    public static <T> Supplier<T> of(Supplier<T> originalSupplier) {
+        return Suppliers.memoize(originalSupplier::get)::get;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/OptionalUtils.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/OptionalUtils.java b/server/container/util/src/main/java/org/apache/james/util/OptionalUtils.java
new file mode 100644
index 0000000..4a3449c
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/OptionalUtils.java
@@ -0,0 +1,68 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util;
+
+import java.util.Arrays;
+import java.util.Optional;
+import java.util.function.Supplier;
+import java.util.stream.Stream;
+
+public class OptionalUtils {
+
+    @FunctionalInterface
+    public interface Operation {
+        void perform();
+    }
+
+    public static <T> Optional<T> executeIfEmpty(Optional<T> optional, Operation operation) {
+        if (!optional.isPresent()) {
+            operation.perform();
+        }
+        return optional;
+    }
+
+    public static <T> Stream<T> toStream(Optional<T> optional) {
+        return optional.map(Stream::of)
+            .orElse(Stream.of());
+    }
+
+    @SafeVarargs
+    public static <T> Optional<T> or(Optional<T>... optionals) {
+        return orStream(Arrays.stream(optionals));
+    }
+
+    @SafeVarargs
+    public static <T> Optional<T> orSuppliers(Supplier<Optional<T>>... suppliers) {
+        return orStream(Arrays.stream(suppliers)
+            .map(Supplier::get));
+    }
+
+    private static <T> Optional<T> orStream(Stream<Optional<T>> stream) {
+        return stream
+            .filter(Optional::isPresent)
+            .findFirst()
+            .orElse(Optional.empty());
+    }
+
+    public static <T> boolean containsDifferent(Optional<T> requestValue, T storeValue) {
+        return requestValue
+            .filter(value -> !value.equals(storeValue))
+            .isPresent();
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/Port.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/Port.java b/server/container/util/src/main/java/org/apache/james/util/Port.java
new file mode 100644
index 0000000..016c4b4
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/Port.java
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.Random;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.Range;
+
+public class Port {
+    public static final int MAX_PORT_VALUE = 65535;
+    public static final int PRIVILEGED_PORT_BOUND = 1024;
+    private static final Range<Integer> VALID_PORT_RANGE = Range.closed(1, MAX_PORT_VALUE);
+
+    public static int generateValidUnprivilegedPort() {
+        return new Random().nextInt(Port.MAX_PORT_VALUE - PRIVILEGED_PORT_BOUND) + PRIVILEGED_PORT_BOUND;
+    }
+
+    public static void assertValid(int port) {
+        Preconditions.checkArgument(isValid(port), "Port should be between 1 and 65535");
+    }
+
+    public static boolean isValid(int port) {
+        return VALID_PORT_RANGE.contains(port);
+    }
+
+    private final int value;
+
+    public Port(int value) {
+        validate(value);
+        this.value = value;
+    }
+
+    protected void validate(int port) {
+        assertValid(port);
+    }
+
+    public int getValue() {
+        return value;
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/Runnables.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/Runnables.java b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
new file mode 100644
index 0000000..c199f31
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/Runnables.java
@@ -0,0 +1,40 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.Arrays;
+import java.util.concurrent.CompletableFuture;
+import java.util.function.Supplier;
+
+public class Runnables {
+    public static void runParallel(Runnable... runnables) {
+        FluentFutureStream.of(
+            Arrays.stream(runnables)
+                .map(runnable -> CompletableFuture.supplyAsync(toVoidSupplier(runnable))))
+            .join();
+    }
+
+    private static Supplier<Void> toVoidSupplier(Runnable runnable) {
+        return () -> {
+            runnable.run();
+            return null;
+        };
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java b/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java
new file mode 100644
index 0000000..e8a71bd
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/StreamUtils.java
@@ -0,0 +1,52 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.stream.Stream;
+
+public class StreamUtils {
+
+    public static <T> Stream<T> ofNullable(T[] array) {
+        return ofOptional(Optional.ofNullable(array));
+    }
+
+    public static <T> Stream<T> ofOptional(Optional<T[]> array) {
+        return array
+            .map(Arrays::stream)
+            .orElse(Stream.empty());
+    }
+
+    public static <T> Stream<T> flatten(Collection<Stream<T>> streams) {
+        return flatten(streams.stream());
+    }
+
+    public static <T> Stream<T> flatten(Stream<Stream<T>> streams) {
+        return streams.flatMap(Function.identity());
+    }
+
+    @SafeVarargs
+    public static <T> Stream<T> flatten(Stream<T>... streams) {
+        return flatten(Arrays.stream(streams));
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/StructuredLogger.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/StructuredLogger.java b/server/container/util/src/main/java/org/apache/james/util/StructuredLogger.java
new file mode 100644
index 0000000..f6fcec4
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/StructuredLogger.java
@@ -0,0 +1,30 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.function.Consumer;
+
+import org.slf4j.Logger;
+
+public interface StructuredLogger {
+    StructuredLogger addField(String name, Object value);
+
+    void log(Consumer<Logger> logOperation);
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/ValuePatch.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/ValuePatch.java b/server/container/util/src/main/java/org/apache/james/util/ValuePatch.java
new file mode 100644
index 0000000..5c3d01a
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/ValuePatch.java
@@ -0,0 +1,125 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.NoSuchElementException;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+
+import com.google.common.base.Preconditions;
+
+public class ValuePatch<T> {
+
+    private enum State {
+        KEEP,
+        REMOVE,
+        MODIFY
+    }
+
+    public static <T> ValuePatch<T> modifyTo(T value) {
+        Preconditions.checkNotNull(value);
+        return new ValuePatch<>(value, State.MODIFY);
+    }
+
+    public static <T> ValuePatch<T> ofNullable(T value) {
+        return ofOptional(Optional.ofNullable(value));
+    }
+
+    public static <T> ValuePatch<T> ofOptional(Optional<T> value) {
+        Preconditions.checkNotNull(value);
+        return value.map(ValuePatch::modifyTo)
+            .orElse(ValuePatch.remove());
+    }
+
+    public static <T> ValuePatch<T> remove() {
+        return new ValuePatch<>(null, State.REMOVE);
+    }
+
+    public static <T> ValuePatch<T> keep() {
+        return new ValuePatch<>(null, State.KEEP);
+    }
+
+    private final T value;
+    private final State state;
+
+    private ValuePatch(T value, State state) {
+        this.value = value;
+        this.state = state;
+    }
+
+    public boolean isRemoved() {
+        return state == State.REMOVE;
+    }
+
+    public boolean isModified() {
+        return state == State.MODIFY;
+    }
+
+    public boolean isKept() {
+        return state == State.KEEP;
+    }
+
+    public <S> Optional<S> mapNotKeptToOptional(Function<Optional<T>, S> updateTransformation) {
+        if (isKept()) {
+            return Optional.empty();
+        }
+        return Optional.of(updateTransformation.apply(Optional.ofNullable(value)));
+    }
+
+    public T get() {
+        if (isModified()) {
+            return value;
+        } else {
+            throw new NoSuchElementException();
+        }
+    }
+
+    public Optional<T> notKeptOrElse(Optional<T> replacement) {
+        if (isKept()) {
+            return replacement;
+        }
+        return Optional.ofNullable(value);
+    }
+
+    public Optional<T> toOptional() {
+        return Optional.ofNullable(value);
+    }
+
+    public T getOrElse(T replacement) {
+        return toOptional().orElse(replacement);
+    }
+
+    @Override
+    public boolean equals(Object o) {
+        if (o instanceof ValuePatch) {
+            ValuePatch<?> that = (ValuePatch<?>) o;
+            return Objects.equals(this.value, that.value) &&
+                Objects.equals(this.state, that.state);
+        } else {
+            return false;
+        }
+    }
+
+    @Override
+    public int hashCode() {
+        return Objects.hash(value, state);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/date/DefaultZonedDateTimeProvider.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/date/DefaultZonedDateTimeProvider.java b/server/container/util/src/main/java/org/apache/james/util/date/DefaultZonedDateTimeProvider.java
new file mode 100644
index 0000000..47b776a
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/date/DefaultZonedDateTimeProvider.java
@@ -0,0 +1,31 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.date;
+
+import java.time.ZoneOffset;
+import java.time.ZonedDateTime;
+
+public class DefaultZonedDateTimeProvider implements ZonedDateTimeProvider {
+
+    @Override
+    public ZonedDateTime get() {
+        return ZonedDateTime.now(ZoneOffset.UTC);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/date/ImapDateTimeFormatter.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/date/ImapDateTimeFormatter.java b/server/container/util/src/main/java/org/apache/james/util/date/ImapDateTimeFormatter.java
new file mode 100644
index 0000000..0a0d633
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/date/ImapDateTimeFormatter.java
@@ -0,0 +1,94 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.util.date;
+
+import static java.time.temporal.ChronoField.DAY_OF_MONTH;
+import static java.time.temporal.ChronoField.DAY_OF_WEEK;
+import static java.time.temporal.ChronoField.HOUR_OF_DAY;
+import static java.time.temporal.ChronoField.MINUTE_OF_HOUR;
+import static java.time.temporal.ChronoField.MONTH_OF_YEAR;
+import static java.time.temporal.ChronoField.SECOND_OF_MINUTE;
+import static java.time.temporal.ChronoField.YEAR;
+
+import java.time.format.DateTimeFormatter;
+import java.time.format.DateTimeFormatterBuilder;
+import java.time.format.SignStyle;
+
+import com.google.common.collect.ImmutableMap;
+
+public class ImapDateTimeFormatter {
+
+    private static final int INITIAL_YEAR = 1970;
+
+    public static DateTimeFormatter rfc5322() {
+        return new DateTimeFormatterBuilder()
+                .parseCaseInsensitive()
+                .parseLenient()
+                .optionalStart()
+                    .appendText(DAY_OF_WEEK, dayOfWeek())
+                    .appendLiteral(", ")
+                .optionalEnd()
+                .appendValue(DAY_OF_MONTH, 1, 2, SignStyle.NOT_NEGATIVE)
+                .appendLiteral(' ')
+                .appendText(MONTH_OF_YEAR, monthOfYear())
+                .appendLiteral(' ')
+                .appendValueReduced(YEAR, 2, 4, INITIAL_YEAR)
+                .appendLiteral(' ')
+                .appendValue(HOUR_OF_DAY, 2)
+                .appendLiteral(':')
+                .appendValue(MINUTE_OF_HOUR, 2)
+                .optionalStart()
+                    .appendLiteral(':')
+                    .appendValue(SECOND_OF_MINUTE, 2)
+                .optionalEnd()
+                .appendLiteral(' ')
+                .appendOffset("+HHMM", "GMT")
+                .toFormatter();
+    }
+
+    private static ImmutableMap<Long, String> monthOfYear() {
+        return ImmutableMap.<Long, String>builder()
+                .put(1L, "Jan")
+                .put(2L, "Feb")
+                .put(3L, "Mar")
+                .put(4L, "Apr")
+                .put(5L, "May")
+                .put(6L, "Jun")
+                .put(7L, "Jul")
+                .put(8L, "Aug")
+                .put(9L, "Sep")
+                .put(10L, "Oct")
+                .put(11L, "Nov")
+                .put(12L, "Dec")
+                .build();
+    }
+
+    private static ImmutableMap<Long, String> dayOfWeek() {
+        return ImmutableMap.<Long, String>builder()
+                .put(1L, "Mon")
+                .put(2L, "Tue")
+                .put(3L, "Wed")
+                .put(4L, "Thu")
+                .put(5L, "Fri")
+                .put(6L, "Sat")
+                .put(7L, "Sun")
+                .build();
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/date/ZonedDateTimeProvider.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/date/ZonedDateTimeProvider.java b/server/container/util/src/main/java/org/apache/james/util/date/ZonedDateTimeProvider.java
new file mode 100644
index 0000000..6f80e76
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/date/ZonedDateTimeProvider.java
@@ -0,0 +1,28 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.date;
+
+import java.time.ZonedDateTime;
+
+import javax.inject.Provider;
+
+public interface ZonedDateTimeProvider extends Provider<ZonedDateTime> {
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/mime/MessageContentExtractor.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/mime/MessageContentExtractor.java b/server/container/util/src/main/java/org/apache/james/util/mime/MessageContentExtractor.java
new file mode 100644
index 0000000..74bb660
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/mime/MessageContentExtractor.java
@@ -0,0 +1,233 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.mime;
+
+import java.io.IOException;
+import java.nio.charset.Charset;
+import java.util.Objects;
+import java.util.Optional;
+import java.util.function.Function;
+import java.util.function.Predicate;
+import java.util.stream.Stream;
+
+import javax.mail.internet.MimeMessage;
+
+import org.apache.commons.io.IOUtils;
+import org.apache.james.mime4j.dom.Body;
+import org.apache.james.mime4j.dom.Entity;
+import org.apache.james.mime4j.dom.Multipart;
+import org.apache.james.mime4j.dom.TextBody;
+
+import com.github.fge.lambdas.Throwing;
+import com.github.fge.lambdas.functions.ThrowingFunction;
+
+public class MessageContentExtractor {
+
+    public static final String CONTENT_ID = "Content-ID";
+    public static final String MULTIPART_ALTERNATIVE = "multipart/alternative";
+    public static final String TEXT_HTML = "text/html";
+    public static final String TEXT_PLAIN = "text/plain";
+
+    public MessageContent extract(org.apache.james.mime4j.dom.Message message) throws IOException {
+        Body body = message.getBody();
+        if (body instanceof TextBody) {
+            return parseTextBody(message, (TextBody)body);
+        }
+        if (body instanceof Multipart) {
+            return parseMultipart(message, (Multipart)body);
+        }
+        return MessageContent.empty();
+    }
+
+    private MessageContent parseTextBody(Entity entity, TextBody textBody) throws IOException {
+        Optional<String> bodyContent = asString(textBody);
+        if (TEXT_HTML.equals(entity.getMimeType())) {
+            return MessageContent.ofHtmlOnly(bodyContent);
+        }
+        return MessageContent.ofTextOnly(bodyContent);
+    }
+
+    private MessageContent parseMultipart(Entity entity, Multipart multipart) throws IOException {
+        MessageContent messageContent = parseMultipartContent(entity, multipart);
+        if (!messageContent.isEmpty()) {
+            return messageContent;
+        }
+        return parseFirstFoundMultipart(multipart);
+    }
+
+    private MessageContent parseMultipartContent(Entity entity, Multipart multipart) throws IOException {
+        switch (entity.getMimeType()) {
+        case MULTIPART_ALTERNATIVE:
+            return retrieveHtmlAndPlainTextContent(multipart);
+        default:
+            return retrieveFirstReadablePart(multipart);
+        }
+    }
+
+    private MessageContent parseFirstFoundMultipart(Multipart multipart) throws IOException {
+        ThrowingFunction<Entity, MessageContent> parseMultipart = firstPart -> parseMultipart(firstPart, (Multipart)firstPart.getBody());
+        return multipart.getBodyParts()
+            .stream()
+            .filter(part -> part.getBody() instanceof Multipart)
+            .findFirst()
+            .map(Throwing.function(parseMultipart).sneakyThrow())
+            .orElse(MessageContent.empty());
+    }
+
+    private Optional<String> asString(TextBody textBody) throws IOException {
+        return Optional.ofNullable(IOUtils.toString(textBody.getInputStream(), charset(Optional.ofNullable(textBody.getMimeCharset()))));
+    }
+
+    private Charset charset(Optional<String> charset) {
+        return charset
+                .map(Charset::forName)
+                .orElse(org.apache.james.mime4j.Charsets.DEFAULT_CHARSET);
+    }
+
+    private MessageContent retrieveHtmlAndPlainTextContent(Multipart multipart) throws IOException {
+        Optional<String> textBody = getFirstMatchingTextBody(multipart, TEXT_PLAIN);
+        Optional<String> htmlBody = getFirstMatchingTextBody(multipart, TEXT_HTML);
+        MessageContent directChildTextBodies = new MessageContent(textBody, htmlBody);
+        if (!directChildTextBodies.isComplete()) {
+            MessageContent fromInnerMultipart = parseFirstFoundMultipart(multipart);
+            return directChildTextBodies.merge(fromInnerMultipart);
+        }
+        return directChildTextBodies;
+    }
+
+    private MessageContent retrieveFirstReadablePart(Multipart multipart) throws IOException {
+        return retrieveFirstReadablePartMatching(multipart, this::isNotAttachment)
+            .orElseGet(() -> retrieveFirstReadablePartMatching(multipart, this::isInlinedWithoutCid)
+                .orElse(MessageContent.empty()));
+    }
+
+    private Optional<MessageContent> retrieveFirstReadablePartMatching(Multipart multipart, Predicate<Entity> predicate) {
+        return multipart.getBodyParts()
+            .stream()
+            .filter(predicate)
+            .flatMap(Throwing.function(this::extractContentIfReadable).sneakyThrow())
+            .findFirst();
+    }
+
+    private Stream<MessageContent> extractContentIfReadable(Entity entity) throws IOException {
+        if (TEXT_HTML.equals(entity.getMimeType()) && entity.getBody() instanceof TextBody) {
+            return Stream.of(
+                    MessageContent.ofHtmlOnly(asString((TextBody)entity.getBody())));
+        }
+        if (TEXT_PLAIN.equals(entity.getMimeType()) && entity.getBody() instanceof TextBody) {
+            return Stream.of(
+                    MessageContent.ofTextOnly(asString((TextBody)entity.getBody())));
+        }
+        if (entity.isMultipart() && entity.getBody() instanceof Multipart) {
+            MessageContent innerMultipartContent = parseMultipart(entity, (Multipart)entity.getBody());
+            if (!innerMultipartContent.isEmpty()) {
+                return Stream.of(innerMultipartContent);
+            }
+        }
+        return Stream.empty();
+    }
+
+    private Optional<String> getFirstMatchingTextBody(Multipart multipart, String mimeType) throws IOException {
+        Optional<String> firstMatchingTextBody = getFirstMatchingTextBody(multipart, mimeType, this::isNotAttachment);
+        if (firstMatchingTextBody.isPresent()) {
+            return firstMatchingTextBody;
+        }
+        Optional<String> fallBackInlinedBodyWithoutCid = getFirstMatchingTextBody(multipart, mimeType, this::isInlinedWithoutCid);
+        return fallBackInlinedBodyWithoutCid;
+    }
+
+    private Optional<String> getFirstMatchingTextBody(Multipart multipart, String mimeType, Predicate<Entity> condition) {
+        Function<TextBody, Optional<String>> textBodyOptionalFunction = Throwing
+            .function(this::asString).sneakyThrow();
+
+        return multipart.getBodyParts()
+            .stream()
+            .filter(part -> mimeType.equals(part.getMimeType()))
+            .filter(condition)
+            .map(Entity::getBody)
+            .filter(TextBody.class::isInstance)
+            .map(TextBody.class::cast)
+            .findFirst()
+            .flatMap(textBodyOptionalFunction);
+    }
+
+    private boolean isNotAttachment(Entity part) {
+        return part.getDispositionType() == null;
+    }
+
+    private boolean isInlinedWithoutCid(Entity part) {
+        return Objects.equals(part.getDispositionType(), MimeMessage.INLINE)
+            && part.getHeader().getField(CONTENT_ID) == null;
+    }
+
+    public static class MessageContent {
+        private final Optional<String> textBody;
+        private final Optional<String> htmlBody;
+
+        public MessageContent(Optional<String> textBody, Optional<String> htmlBody) {
+            this.textBody = textBody;
+            this.htmlBody = htmlBody;
+        }
+
+        public static MessageContent ofTextOnly(Optional<String> textBody) {
+            return new MessageContent(textBody, Optional.empty());
+        }
+
+        public static MessageContent ofHtmlOnly(Optional<String> htmlBody) {
+            return new MessageContent(Optional.empty(), htmlBody);
+        }
+
+        public static MessageContent empty() {
+            return new MessageContent(Optional.empty(), Optional.empty());
+        }
+        
+        public Optional<String> getTextBody() {
+            return textBody;
+        }
+
+        public Optional<String> getHtmlBody() {
+            return htmlBody;
+        }
+        
+        public boolean isEmpty() {
+            return equals(empty());
+        }
+
+        public boolean isComplete() {
+            return textBody.isPresent() && htmlBody.isPresent();
+        }
+
+        public MessageContent merge(MessageContent fromInnerMultipart) {
+            return new MessageContent(
+                    textBody.map(Optional::of).orElse(fromInnerMultipart.getTextBody()),
+                    htmlBody.map(Optional::of).orElse(fromInnerMultipart.getHtmlBody()));
+        }
+
+        @Override
+        public boolean equals(Object other) {
+            if (other == null || !(other instanceof MessageContent)) {
+                return false;
+            }
+            MessageContent otherMessageContent = (MessageContent)other;
+            return Objects.equals(this.textBody, otherMessageContent.textBody)
+                    && Objects.equals(this.htmlBody, otherMessageContent.htmlBody);
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/streams/Iterators.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/Iterators.java b/server/container/util/src/main/java/org/apache/james/util/streams/Iterators.java
new file mode 100644
index 0000000..ba3a06b
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/streams/Iterators.java
@@ -0,0 +1,32 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.streams;
+
+import java.util.Iterator;
+import java.util.stream.Stream;
+import java.util.stream.StreamSupport;
+
+public class Iterators {
+
+    public static <T> Stream<T> toStream(Iterator<T> iterator) {
+        Iterable<T> iterable = () -> iterator;
+        return StreamSupport.stream(iterable.spliterator(), false);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/streams/JamesCollectors.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/JamesCollectors.java b/server/container/util/src/main/java/org/apache/james/util/streams/JamesCollectors.java
new file mode 100644
index 0000000..e705063
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/streams/JamesCollectors.java
@@ -0,0 +1,80 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.streams;
+
+import java.util.Collection;
+import java.util.Set;
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.function.BiConsumer;
+import java.util.function.BinaryOperator;
+import java.util.function.Function;
+import java.util.function.Supplier;
+import java.util.stream.Collector;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+import com.google.common.collect.ArrayListMultimap;
+import com.google.common.collect.ImmutableSet;
+import com.google.common.collect.Multimap;
+
+public class JamesCollectors {
+    public static <D> Collector<D, ?, Stream<Collection<D>>> chunker(int chunkSize) {
+        return new ChunkCollector<>(chunkSize);
+    }
+
+    public static class ChunkCollector<D> implements Collector<D, Multimap<Integer, D>, Stream<Collection<D>>> {
+        private final int chunkSize;
+        private final AtomicInteger counter;
+
+        private ChunkCollector(int chunkSize) {
+            Preconditions.checkArgument(chunkSize > 0, "ChunkSize should be strictly positive");
+            this.chunkSize = chunkSize;
+            this.counter = new AtomicInteger(-1);
+        }
+
+        @Override
+        public Supplier<Multimap<Integer, D>> supplier() {
+            return ArrayListMultimap::create;
+        }
+
+        @Override
+        public BiConsumer<Multimap<Integer, D>, D> accumulator() {
+            return (accumulator, value) -> accumulator.put(counter.incrementAndGet() / chunkSize, value);
+        }
+
+        @Override
+        public BinaryOperator<Multimap<Integer, D>> combiner() {
+            return (accumulator1, accumulator2) -> {
+                accumulator1.putAll(accumulator2);
+                return accumulator1;
+            };
+        }
+
+        @Override
+        public Function<Multimap<Integer, D>, Stream<Collection<D>>> finisher() {
+            return accumulator -> accumulator.asMap().values().stream();
+        }
+
+        @Override
+        public Set<Characteristics> characteristics() {
+            return ImmutableSet.of();
+        }
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
new file mode 100644
index 0000000..268ed5e
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/streams/Limit.java
@@ -0,0 +1,81 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.streams;
+
+import java.util.Objects;
+import java.util.Optional;
+import java.util.stream.Stream;
+
+import com.google.common.base.Preconditions;
+
+public class Limit {
+
+    public static Limit from(int limit) {
+        if (limit > 0) {
+            return new Limit(Optional.of(limit));
+        } else {
+            return unlimited();
+        }
+    }
+
+    public static Limit from(Optional<Integer> limit) {
+        return limit.map(Limit::from)
+            .orElse(unlimited());
+    }
+
+    public static Limit unlimited() {
+        return new Limit(Optional.empty());
+    }
+
+    public static Limit limit(int limit) {
+        Preconditions.checkArgument(limit > 0, "limit should be positive");
+        return new Limit(Optional.of(limit));
+    }
+
+    private final Optional<Integer> limit;
+
+    private Limit(Optional<Integer> limit) {
+        this.limit = limit;
+    }
+
+    public Optional<Integer> getLimit() {
+        return limit;
+    }
+
+    public <T> Stream<T> applyOnStream(Stream<T> stream) {
+        return limit
+            .map(stream::limit)
+            .orElse(stream);
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof Limit) {
+            Limit other = (Limit) o;
+            return Objects.equals(limit, other.limit);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(limit);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/main/java/org/apache/james/util/streams/Offset.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/main/java/org/apache/james/util/streams/Offset.java b/server/container/util/src/main/java/org/apache/james/util/streams/Offset.java
new file mode 100644
index 0000000..109ecae
--- /dev/null
+++ b/server/container/util/src/main/java/org/apache/james/util/streams/Offset.java
@@ -0,0 +1,66 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util.streams;
+
+import java.util.Objects;
+import java.util.Optional;
+
+import com.google.common.base.Preconditions;
+
+public class Offset {
+
+    public static Offset from(Optional<Integer> offset) {
+        return offset.map(Offset::from)
+            .orElse(none());
+    }
+
+    public static Offset none() {
+        return new Offset(0);
+    }
+
+    public static Offset from(int offset) {
+        Preconditions.checkArgument(offset >= 0, "offset should be positive");
+        return new Offset(offset);
+    }
+
+    private final int offset;
+
+    private Offset(int offset) {
+        this.offset = offset;
+    }
+
+    public int getOffset() {
+        return offset;
+    }
+
+    @Override
+    public final boolean equals(Object o) {
+        if (o instanceof Offset) {
+            Offset other = (Offset) o;
+            return Objects.equals(this.offset, other.offset);
+        }
+        return false;
+    }
+
+    @Override
+    public final int hashCode() {
+        return Objects.hash(offset);
+    }
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/test/java/org/apache/james/util/CommutativityChecker.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/CommutativityChecker.java b/server/container/util/src/test/java/org/apache/james/util/CommutativityChecker.java
new file mode 100644
index 0000000..15976e9
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/CommutativityChecker.java
@@ -0,0 +1,58 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.paukov.combinatorics3.Generator;
+
+import com.github.steveash.guavate.Guavate;
+import com.google.common.base.Preconditions;
+
+public class CommutativityChecker<T> {
+    private final Set<T> valuesToTest;
+    private final BinaryOperator<T> operationToTest;
+
+    public CommutativityChecker(Set<T> valuesToTest, BinaryOperator<T> operationToTest) {
+        Preconditions.checkNotNull(valuesToTest);
+        Preconditions.checkNotNull(operationToTest);
+        Preconditions.checkArgument(valuesToTest.size() > 1, "You must to pass more than one value to check commutativity");
+        this.valuesToTest = valuesToTest;
+        this.operationToTest = operationToTest;
+    }
+
+    public Set<Pair<T, T>> findNonCommutativeInput() {
+        return Generator.combination(valuesToTest)
+            .simple(2)
+            .stream()
+            .map(list -> Pair.of(list.get(0), list.get(1)))
+            .filter(this::isNotCommutative)
+            .collect(Guavate.toImmutableSet());
+    }
+
+    private boolean isNotCommutative(Pair<T, T> pair) {
+        T leftThenRight = operationToTest.apply(pair.getLeft(), pair.getRight());
+        T rightThenLeft = operationToTest.apply(pair.getRight(), pair.getLeft());
+        return !leftThenRight.equals(rightThenLeft);
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/james-project/blob/630dcab1/server/container/util/src/test/java/org/apache/james/util/CommutativityCheckerTest.java
----------------------------------------------------------------------
diff --git a/server/container/util/src/test/java/org/apache/james/util/CommutativityCheckerTest.java b/server/container/util/src/test/java/org/apache/james/util/CommutativityCheckerTest.java
new file mode 100644
index 0000000..ec0802a
--- /dev/null
+++ b/server/container/util/src/test/java/org/apache/james/util/CommutativityCheckerTest.java
@@ -0,0 +1,102 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.util;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+
+import java.util.Set;
+import java.util.function.BinaryOperator;
+
+import org.apache.commons.lang3.tuple.Pair;
+import org.junit.Test;
+
+import com.google.common.collect.ImmutableSet;
+
+public class CommutativityCheckerTest {
+    @Test
+    public void constructorShouldThrowWhenNullValuesToTest() throws Exception {
+        BinaryOperator<Integer> binaryOperator = (a, b) -> a * a + b;
+
+        assertThatThrownBy(() -> new CommutativityChecker<>(null, binaryOperator))
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    public void constructorShouldThrowWhenEmptyValuesToTest() throws Exception {
+        BinaryOperator<Integer> binaryOperator = (a, b) -> a * a + b;
+
+        assertThatThrownBy(() -> new CommutativityChecker<>(ImmutableSet.of(), binaryOperator))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void constructorShouldThrowWhenSingleValueToTest() throws Exception {
+        BinaryOperator<Integer> binaryOperator = (a, b) -> a * a + b;
+
+        assertThatThrownBy(() -> new CommutativityChecker<>(ImmutableSet.of(0), binaryOperator))
+            .isInstanceOf(IllegalArgumentException.class);
+    }
+
+    @Test
+    public void constructorShouldThrowWhenNullOperation() throws Exception {
+        assertThatThrownBy(() -> new CommutativityChecker<>(ImmutableSet.of(0, 1), null))
+            .isInstanceOf(NullPointerException.class);
+    }
+
+    @Test
+    public void findNonCommutativeInputShouldReturnEmptyWhenCommutativeOperation() throws Exception {
+        Set<Integer> integers = ImmutableSet.of(5, 4, 3, 2, 1);
+        BinaryOperator<Integer> commutativeOperator = (a, b) -> a + b;
+        CommutativityChecker<Integer> commutativityChecker = new CommutativityChecker<>(integers, commutativeOperator);
+
+        assertThat(commutativityChecker.findNonCommutativeInput()).isEmpty();
+    }
+
+    @Test
+    public void findNonCommutativeInputShouldReturnDataWhenNonCommutativeOperation() throws Exception {
+        Set<Integer> integers = ImmutableSet.of(2, 1);
+        BinaryOperator<Integer> nonCommutativeOperator = (a, b) -> 2 * a + b;
+        CommutativityChecker<Integer> commutativityChecker = new CommutativityChecker<>(integers, nonCommutativeOperator);
+
+        assertThat(commutativityChecker.findNonCommutativeInput())
+            .containsOnly(Pair.of(2, 1));
+    }
+
+    @Test
+    public void findNonCommutativeInputShouldNotReturnStableValues() throws Exception {
+        Set<Integer> integers = ImmutableSet.of(0, 1, 2);
+        BinaryOperator<Integer> nonCommutativeOperatorWithStableValues = (a, b) -> a * a + b;
+        CommutativityChecker<Integer> commutativityChecker = new CommutativityChecker<>(integers, nonCommutativeOperatorWithStableValues);
+
+        assertThat(commutativityChecker.findNonCommutativeInput())
+            .containsOnly(Pair.of(1, 2),
+                Pair.of(0, 2));
+    }
+
+    @Test
+    public void findNonCommutativeInputShouldReturnEmptyWhenNonCommutativeOperationButOnlyStableValues() throws Exception {
+        Set<Integer> stableValues = ImmutableSet.of(0, 1);
+        BinaryOperator<Integer> nonCommutativeOperatorWithStableValues = (a, b) -> a * a + b;
+        CommutativityChecker<Integer> commutativityChecker = new CommutativityChecker<>(stableValues, nonCommutativeOperatorWithStableValues);
+
+        assertThat(commutativityChecker.findNonCommutativeInput()).isEmpty();
+    }
+}
\ No newline at end of file


---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org