You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/03/15 07:17:35 UTC

[james-project] 02/03: JAMES-3724 - Implement LeakAware

This is an automated email from the ASF dual-hosted git repository.

rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git

commit daf3d4bba15ecde6f42e235236204e98523a545e
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Wed Mar 9 17:04:00 2022 +0700

    JAMES-3724 - Implement LeakAware
---
 .../core/BufferedDeferredFileOutputStream.java     |   9 +-
 .../server/core/MimeMessageInputStreamSource.java  | 128 +++++++------
 .../james/server/core/MimeMessageWrapper.java      |   2 +-
 .../core/MimeMessageInputStreamSourceTest.java     |   6 +-
 .../james/server/core/MimeMessageWrapperTest.java  |   6 +-
 server/container/lifecycle-api/pom.xml             |   5 +
 .../org/apache/james/lifecycle/api/Disposable.java | 177 ++++++++++++++++++
 .../apache/james/lifecycle/api/LeakAwareTest.java  | 200 +++++++++++++++++++++
 .../james/smtpserver/JamesDataCmdHandler.java      |   2 +-
 9 files changed, 470 insertions(+), 65 deletions(-)

diff --git a/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java b/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java
index 761f9b1..b5e3f1e 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils;
 import org.apache.commons.io.output.ByteArrayOutputStream;
 import org.apache.commons.io.output.DeferredFileOutputStream;
 import org.apache.commons.io.output.ThresholdingOutputStream;
+import org.apache.james.lifecycle.api.Disposable;
 
 /**
  * An almost copy of {@link DeferredFileOutputStream} with buffered file stream.
@@ -46,7 +47,7 @@ import org.apache.commons.io.output.ThresholdingOutputStream;
   *
   * @link https://issues.apache.org/jira/browse/JAMES-2343
  */
-public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream {
+public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream implements Disposable {
 
     /**
      * The output stream to which data will be written prior to the theshold
@@ -261,4 +262,10 @@ public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream {
             }
         }
     }
+
+    @Override
+    public void dispose() {
+        // Fasten GC of the big byte array
+        memoryOutputStream = null;
+    }
 }
\ No newline at end of file
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java
index aec61fa..4419909 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java
@@ -44,7 +44,7 @@ import org.apache.james.util.SizeFormat;
  *
  * This class is not thread safe!
  */
-public class MimeMessageInputStreamSource extends Disposable.LeakAware implements MimeMessageSource, Disposable {
+public class MimeMessageInputStreamSource extends Disposable.LeakAware<MimeMessageInputStreamSource.Resource> implements MimeMessageSource {
     /**
      * 100kb threshold for the stream.
      */
@@ -58,14 +58,6 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
     }
 
     private static final int THRESHOLD = threshold();
-
-    private final Set<InputStream> streams = new HashSet<>();
-
-    /**
-     * A temporary file used to hold the message stream
-     */
-    private BufferedDeferredFileOutputStream out;
-
     /**
      * The full path of the temporary file
      */
@@ -76,6 +68,62 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
      */
     private static final File TMPDIR = new File(System.getProperty("java.io.tmpdir"));
 
+    static class Resource extends LeakAware.Resource {
+        private final BufferedDeferredFileOutputStream out;
+        private final Set<InputStream> streams;
+
+        Resource(BufferedDeferredFileOutputStream out, Set<InputStream> streams) {
+            super(() -> {
+                // explicit close all streams
+                for (InputStream stream : streams) {
+                    try {
+                        stream.close();
+                    } catch (IOException e) {
+                        //ignore exception during close
+                    }
+                }
+
+                if (out != null) {
+                    try {
+                        out.close();
+                    } catch (IOException e) {
+                        //ignore exception during close
+                    }
+                    File file = out.getFile();
+                    if (file != null) {
+                        FileUtils.deleteQuietly(file);
+                        file = null;
+                    }
+                    out.dispose();
+                }
+            });
+            this.out = out;
+            this.streams = streams;
+        }
+
+        public BufferedDeferredFileOutputStream getOut() {
+            return out;
+        }
+
+
+    }
+
+    public static MimeMessageInputStreamSource create(String key, InputStream in) throws MessagingException {
+        Disposable.LeakAware.track();
+        BufferedDeferredFileOutputStream out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR);
+        Resource resource = new Resource(out, new HashSet<>());
+
+        return new MimeMessageInputStreamSource(resource, key, in);
+    }
+
+    public static MimeMessageInputStreamSource create(String key) {
+        Disposable.LeakAware.track();
+        BufferedDeferredFileOutputStream out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR);
+        Resource resource = new Resource(out, new HashSet<>());
+
+        return new MimeMessageInputStreamSource(resource, key);
+    }
+
     /**
      * Construct a new MimeMessageInputStreamSource from an
      * <code>InputStream</code> that contains the bytes of a MimeMessage.
@@ -84,24 +132,23 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
      * @param in  the stream containing the MimeMessage
      * @throws MessagingException if an error occurs while trying to store the stream
      */
-    public MimeMessageInputStreamSource(String key, InputStream in) throws MessagingException {
-        super();
+    private MimeMessageInputStreamSource(Resource resource, String key, InputStream in) throws MessagingException {
+        super(resource);
         // We want to immediately read this into a temporary file
         // Create a temp file and channel the input stream into it
         try {
-            out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR);
-            IOUtils.copy(in, out);
+            IOUtils.copy(in, resource.out);
             sourceId = key;
         } catch (IOException ioe) {
-            File file = out.getFile();
+            File file = resource.out.getFile();
             if (file != null) {
                 FileUtils.deleteQuietly(file);
             }
             throw new MessagingException("Unable to retrieve the data: " + ioe.getMessage(), ioe);
         } finally {
             try {
-                if (out != null) {
-                    out.close();
+                if (resource.out != null) {
+                    resource.out.close();
                 }
             } catch (IOException ioe) {
                 // Ignored - logging unavailable to log this non-fatal error.
@@ -118,9 +165,8 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
         }
     }
 
-    public MimeMessageInputStreamSource(String key) {
-        super();
-        out = new BufferedDeferredFileOutputStream(THRESHOLD, key, ".m64", TMPDIR);
+    private MimeMessageInputStreamSource(Resource resource, String key) {
+        super(resource);
         sourceId = key;
     }
 
@@ -142,12 +188,12 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
     @Override
     public InputStream getInputStream() throws IOException {
         InputStream in;
-        if (out.isInMemory()) {
-            in = new SharedByteArrayInputStream(out.getData());
+        if (getResource().getOut().isInMemory()) {
+            in = new SharedByteArrayInputStream(getResource().getOut().getData());
         } else {
-            in = new SharedFileInputStream(out.getFile());
+            in = new SharedFileInputStream(getResource().getOut().getFile());
         }
-        streams.add(in);
+        getResource().streams.add(in);
         return in;
     }
 
@@ -155,43 +201,13 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
      * Get the size of the temp file
      *
      * @return the size of the temp file
-     * @throws IOException if an error is encoutered while computing the size of the
-     *                     message
      */
     @Override
-    public long getMessageSize() throws IOException {
-        return out.getByteCount();
+    public long getMessageSize() {
+        return getResource().getOut().getByteCount();
     }
 
     public OutputStream getWritableOutputStream() {
-        return out;
-    }
-
-    @Override
-    public void dispose() {
-        // explicit close all streams
-        for (InputStream stream : streams) {
-            try {
-                stream.close();
-            } catch (IOException e) {
-                //ignore exception during close
-            }
-        }
-
-        if (out != null) {
-            try {
-                out.close();
-            } catch (IOException e) {
-                //ignore exception during close
-            }
-            File file = out.getFile();
-            if (file != null) {
-                FileUtils.deleteQuietly(file);
-                file = null;
-            }
-            out = null;
-        }
-        disposed();
+        return getResource().getOut();
     }
-
 }
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java
index 5255387..99bd591 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java
@@ -153,7 +153,7 @@ public class MimeMessageWrapper extends MimeMessage implements Disposable {
                     in.close();
                     saved = true;
                 } else {
-                    MimeMessageInputStreamSource src = new MimeMessageInputStreamSource("MailCopy-" + UUID.randomUUID().toString());
+                    MimeMessageInputStreamSource src = MimeMessageInputStreamSource.create("MailCopy-" + UUID.randomUUID().toString());
                     OutputStream out = src.getWritableOutputStream();
                     original.writeTo(out);
                     out.close();
diff --git a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java
index e98f597..b870aa7 100644
--- a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java
+++ b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java
@@ -41,20 +41,20 @@ public class MimeMessageInputStreamSourceTest {
     
     @Test
     public void streamWith1MBytesShouldBeReadable() throws MessagingException, IOException {
-        testee = new MimeMessageInputStreamSource("myKey", new ZeroedInputStream(_1M));
+        testee = MimeMessageInputStreamSource.create("myKey", new ZeroedInputStream(_1M));
         assertThat(testee.getInputStream()).hasSameContentAs(new ZeroedInputStream(_1M));
     }
     
     @Test
     public void streamWith10KBytesShouldBeReadable() throws MessagingException, IOException {
-        testee = new MimeMessageInputStreamSource("myKey", new ZeroedInputStream(_10KB));
+        testee = MimeMessageInputStreamSource.create("myKey", new ZeroedInputStream(_10KB));
         assertThat(testee.getInputStream()).hasSameContentAs(new ZeroedInputStream(_10KB));
     }
 
     @Test
     public void streamWithVeryShortNameShouldWork() throws MessagingException, IOException {
         String veryShortName = "1";
-        testee = new MimeMessageInputStreamSource(veryShortName, new ZeroedInputStream(_1M));
+        testee = MimeMessageInputStreamSource.create(veryShortName, new ZeroedInputStream(_1M));
         assertThat(testee.getInputStream()).isNotNull();
     }
 }
diff --git a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java
index 9686fe2..5c5ca59 100644
--- a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java
+++ b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java
@@ -92,7 +92,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest {
 
     @Override
     protected TestableMimeMessageWrapper getMessageFromSources(String sources) throws Exception {
-        MimeMessageInputStreamSource mmis = new MimeMessageInputStreamSource("test", new SharedByteArrayInputStream(sources.getBytes()));
+        MimeMessageInputStreamSource mmis = MimeMessageInputStreamSource.create("test", new SharedByteArrayInputStream(sources.getBytes()));
         return new TestableMimeMessageWrapper(mmis);
     }
 
@@ -348,7 +348,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest {
 
     @Test
     public void getMessageSizeShouldBeAccurateWhenHeadersAreModified() throws Exception {
-        MimeMessageWrapper wrapper = new MimeMessageWrapper(new MimeMessageInputStreamSource(MailImpl.getId(),
+        MimeMessageWrapper wrapper = new MimeMessageWrapper(MimeMessageInputStreamSource.create(MailImpl.getId(),
             ClassLoaderUtils.getSystemResourceAsSharedStream("JAMES-1593.eml")));
         wrapper.setHeader("header", "vss");
 
@@ -358,7 +358,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest {
 
     @Test
     public void getMessageSizeShouldBeAccurateWhenHeadersAreModifiedAndOtherEncoding() throws Exception {
-        MimeMessageWrapper wrapper = new MimeMessageWrapper(new MimeMessageInputStreamSource(MailImpl.getId(),
+        MimeMessageWrapper wrapper = new MimeMessageWrapper(MimeMessageInputStreamSource.create(MailImpl.getId(),
             ClassLoaderUtils.getSystemResourceAsSharedStream("mail-containing-unicode-characters.eml")));
         wrapper.setHeader("header", "vss");
 
diff --git a/server/container/lifecycle-api/pom.xml b/server/container/lifecycle-api/pom.xml
index 8a56000..15fc676 100644
--- a/server/container/lifecycle-api/pom.xml
+++ b/server/container/lifecycle-api/pom.xml
@@ -47,6 +47,11 @@
             <artifactId>commons-configuration2</artifactId>
         </dependency>
         <dependency>
+            <groupId>org.awaitility</groupId>
+            <artifactId>awaitility</artifactId>
+            <scope>test</scope>
+        </dependency>
+        <dependency>
             <groupId>org.slf4j</groupId>
             <artifactId>slf4j-api</artifactId>
         </dependency>
diff --git a/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java b/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java
index 1e99a40..bd69789 100644
--- a/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java
+++ b/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java
@@ -19,6 +19,18 @@
 
 package org.apache.james.lifecycle.api;
 
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * Classes which implement this interface need some special handling on destroy.
  * So the {@link #dispose()} method need to get called
@@ -29,4 +41,169 @@ public interface Disposable {
      * Dispose the object
      */
     void dispose();
+
+    abstract class LeakAware<T extends LeakAware.Resource> implements Disposable {
+        public static class Resource implements Disposable {
+            private final AtomicBoolean isDisposed = new AtomicBoolean(false);
+            private final Disposable cleanup;
+
+            public Resource(Disposable cleanup) {
+                this.cleanup = cleanup;
+            }
+
+            public boolean isDisposed() {
+                return isDisposed.get();
+            }
+
+            @Override
+            public void dispose() {
+                isDisposed.set(true);
+                cleanup.dispose();
+            }
+        }
+
+        public static class LeakDetectorException extends RuntimeException {
+            public LeakDetectorException() {
+                super();
+            }
+        }
+
+        public enum Level {
+            NONE,
+            SIMPLE,
+            ADVANCED,
+            TESTING;
+
+            static Level parse(String input) {
+                for (Level level : values()) {
+                    if (level.name().equalsIgnoreCase(input)) {
+                        return level;
+                    }
+                }
+                throw new IllegalArgumentException(String.format("Unknown level `%s`", input));
+            }
+        }
+
+        public static final ReferenceQueue<LeakAware<?>> REFERENCE_QUEUE = new ReferenceQueue<>();
+        public static final ConcurrentHashMap<LeakAwareFinalizer, Boolean> REFERENCES_IN_USE = new ConcurrentHashMap<>();
+        public static final Level LEVEL = Optional.ofNullable(System.getProperty("james.ligecycle.leak.detection.mode"))
+            .map(Level::parse).orElse(Level.SIMPLE);
+
+        public static void track() {
+            Reference<?> referenceFromQueue;
+            while ((referenceFromQueue = REFERENCE_QUEUE.poll()) != null) {
+                if (leakDetectorEnabled()) {
+                    ((LeakAwareFinalizer) referenceFromQueue).detectLeak();
+                }
+                referenceFromQueue.clear();
+            }
+        }
+
+        private static boolean leakDetectorEnabled() {
+            return LEVEL != Level.NONE;
+        }
+
+        public static boolean tracedEnabled() {
+            return LEVEL == Level.ADVANCED || LEVEL == Level.TESTING;
+        }
+
+        private final T resource;
+        private LeakAwareFinalizer finalizer;
+
+        protected LeakAware(T resource) {
+            this.resource = resource;
+            if (leakDetectorEnabled()) {
+                this.finalizer = new LeakAwareFinalizer(this, resource, REFERENCE_QUEUE);
+                REFERENCES_IN_USE.put(finalizer, true);
+            }
+        }
+
+        @Override
+        public void dispose() {
+            if (finalizer != null) {
+                REFERENCES_IN_USE.remove(finalizer);
+            }
+            resource.dispose();
+        }
+
+        public T getResource() {
+            return resource;
+        }
+    }
+
+    class TraceRecord {
+        private final List<StackWalker.StackFrame> stackFrames;
+
+        TraceRecord(List<StackWalker.StackFrame> stackFrames) {
+            this.stackFrames = stackFrames;
+        }
+
+        @Override
+        public String toString() {
+            StringBuilder buf = new StringBuilder();
+            this.stackFrames.subList(3, this.stackFrames.size())
+                .forEach(stackFrame -> {
+                    buf.append("\t");
+                    buf.append(stackFrame.getClassName());
+                    buf.append("#");
+                    buf.append(stackFrame.getMethodName());
+                    buf.append(":");
+                    buf.append(stackFrame.getLineNumber());
+                    buf.append("\n");
+                });
+            return buf.toString();
+        }
+    }
+
+    class LeakAwareFinalizer extends PhantomReference<LeakAware<?>> {
+        private static final Logger LOGGER = LoggerFactory.getLogger(LeakAwareFinalizer.class);
+
+        private final LeakAware.Resource resource;
+        private TraceRecord traceRecord;
+
+        public LeakAwareFinalizer(LeakAware referent, LeakAware.Resource resource, ReferenceQueue<? super LeakAware<?>> q) {
+            super(referent, q);
+            this.resource = resource;
+            if (LeakAware.tracedEnabled()) {
+                traceRecord = new TraceRecord(StackWalker.getInstance().walk(s -> s.collect(Collectors.toList())));
+            }
+        }
+
+        public void detectLeak() {
+            switch (LeakAware.LEVEL) {
+                case NONE: // nothing
+                    break;
+                case SIMPLE:
+                case ADVANCED: {
+                    if (isNotDisposed()) {
+                        errorLog();
+                        resource.dispose();
+                        LeakAware.REFERENCES_IN_USE.remove(this);
+                    }
+                    break;
+                }
+                case TESTING: {
+                    if (isNotDisposed()) {
+                        errorLog();
+                        throw new LeakAware.LeakDetectorException();
+                    }
+                }
+            }
+        }
+
+        public void errorLog() {
+            if (LeakAware.tracedEnabled()) {
+                LOGGER.error("Leak detected! Resource {} was not released before its referent was garbage-collected. \n" +
+                    "This resource was instanced at: \n{}", resource, traceRecord.toString());
+            } else {
+                LOGGER.error("Leak detected! Resource {} was not released before its referent was garbage-collected. \n" +
+                    "Resource management needs to be reviewed: ensure to always call dispose() for disposable objects you work with. \n" +
+                    "Consider enabling advanced leak detection to further identify the problem.", resource);
+            }
+        }
+
+        private boolean isNotDisposed() {
+            return !resource.isDisposed();
+        }
+    }
 }
diff --git a/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java b/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java
new file mode 100644
index 0000000..b774024
--- /dev/null
+++ b/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java
@@ -0,0 +1,200 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+
+package org.apache.james.lifecycle.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.apache.james.lifecycle.api.Disposable.LeakAware;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+class LeakAwareTest {
+
+    private static final class LeakResourceSample extends LeakAware<LeakResourceSample.TestResource> {
+        static class TestResource extends LeakAware.Resource {
+            public TestResource(Disposable cleanup) {
+                super(cleanup);
+            }
+        }
+
+        public static LeakResourceSample create(AtomicBoolean atomicBoolean) {
+            return new LeakResourceSample(new TestResource(() -> atomicBoolean.set(true)));
+        }
+
+        LeakResourceSample(TestResource resource) {
+            super(resource);
+        }
+    }
+
+    private static final ConditionFactory awaitAtMostTenSeconds = Awaitility
+        .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+        .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+        .await()
+        .atMost(TEN_SECONDS);
+
+    public static ListAppender<ILoggingEvent> getListAppenderForClass(Class clazz) {
+        Logger logger = (Logger) LoggerFactory.getLogger(clazz);
+
+        ListAppender<ILoggingEvent> loggingEventListAppender = new ListAppender<>();
+        loggingEventListAppender.start();
+
+        logger.addAppender(loggingEventListAppender);
+        return loggingEventListAppender;
+    }
+
+    private void forceChangeLevel(String level) throws NoSuchFieldException, IllegalAccessException {
+        forceChangeLevel(LeakAware.Level.parse(level));
+    }
+
+    // using reflect to change LeakAware.LEVEL value
+    private static void forceChangeLevel(LeakAware.Level level) throws NoSuchFieldException, IllegalAccessException {
+        final Field field = LeakAware.class.getDeclaredField("LEVEL");
+        field.setAccessible(true);
+        final Field modifiersField = Field.class.getDeclaredField("modifiers");
+        modifiersField.setAccessible(true);
+        modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+        field.set(null, level);
+    }
+
+    @Test
+    void leakDetectionShouldCloseUnclosedResources() {
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+        resourceSample = null;
+
+        System.gc();
+        awaitAtMostTenSeconds.until(() -> {
+            LeakAware.track();
+            return atomicBoolean.get();
+        });
+    }
+
+    @Test
+    void leakDetectionShouldNotReportClosedObjects() {
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+        resourceSample.dispose();
+        atomicBoolean.set(false);
+        resourceSample = null;
+
+        System.gc();
+        awaitAtMostTenSeconds.until(() -> {
+            LeakAware.track();
+            return !atomicBoolean.get();
+        });
+    }
+
+    @Test
+    void resourceShouldNotBeDetectedLeakWhenLevelIsNone() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
+        forceChangeLevel(LeakAware.Level.NONE);
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+        resourceSample = null;
+
+        System.gc();
+        Thread.sleep(500);
+        LeakAware.track();
+        assertThat(atomicBoolean.get()).isFalse();
+    }
+
+    @ParameterizedTest
+    @ValueSource(strings = {"simple", "advanced"})
+    void leakDetectionShouldLogWhenDetected(String level) throws NoSuchFieldException, IllegalAccessException {
+        forceChangeLevel(level);
+        ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class);
+
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+        resourceSample = null;
+
+        System.gc();
+        awaitAtMostTenSeconds.untilAsserted(() -> {
+            LeakAware.track();
+            assertThat(loggingEvents.list).hasSize(1)
+                .allSatisfy(loggingEvent -> {
+                    assertThat(loggingEvent.getLevel()).isEqualTo(Level.ERROR);
+                    assertThat(loggingEvent.getFormattedMessage()).contains("Leak detected", "TestResource");
+                });
+        });
+    }
+
+    @Test
+    void leakDetectionShouldLogTraceRecordWhenLevelIsAdvanced() throws NoSuchFieldException, IllegalAccessException {
+        forceChangeLevel(LeakAware.Level.ADVANCED);
+        ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class);
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+        resourceSample = null;
+
+        System.gc();
+        awaitAtMostTenSeconds.untilAsserted(() -> {
+            LeakAware.track();
+            assertThat(loggingEvents.list).hasSize(1)
+                .allSatisfy(loggingEvent -> {
+                    assertThat(loggingEvent.getLevel()).isEqualTo(Level.ERROR);
+                    assertThat(loggingEvent.getFormattedMessage()).contains("This resource was instanced at", "LeakAwareTest#leakDetectionShouldLogTraceRecordWhenLevelIsAdvanced");
+                });
+        });
+    }
+
+    @Test
+    void leakDetectionShouldThrowWhenDetectedAndLevelIsTesting() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+        forceChangeLevel(LeakAware.Level.TESTING);
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+        resourceSample = null;
+
+        System.gc();
+        Thread.sleep(500);
+        assertThatThrownBy(LeakAware::track)
+            .isInstanceOf(LeakAware.LeakDetectorException.class);
+    }
+
+    @Test
+    void leakDetectionShouldNotLogWhenLevelIsNone() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
+        forceChangeLevel(LeakAware.Level.NONE);
+        ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class);
+        AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+        LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+        resourceSample = null;
+
+        System.gc();
+        Thread.sleep(500);
+        assertThat(loggingEvents.list).isEmpty();
+    }
+
+}
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java
index 8c6eaaa..0ffa698 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java
@@ -52,7 +52,7 @@ public class JamesDataCmdHandler extends DataCmdHandler {
     @Override
     protected SMTPResponse doDATA(SMTPSession session, String argument) {
         try {
-            MimeMessageInputStreamSource mmiss = new MimeMessageInputStreamSource(MailImpl.getId());
+            MimeMessageInputStreamSource mmiss = MimeMessageInputStreamSource.create(MailImpl.getId());
             session.setAttachment(SMTPConstants.DATA_MIMEMESSAGE_STREAMSOURCE, mmiss, State.Transaction);
         } catch (Exception e) {
             LOGGER.warn("Error creating mimemessagesource for incoming data", e);

---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org