You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by rc...@apache.org on 2022/03/15 07:17:35 UTC
[james-project] 02/03: JAMES-3724 - Implement LeakAware
This is an automated email from the ASF dual-hosted git repository.
rcordier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit daf3d4bba15ecde6f42e235236204e98523a545e
Author: Tung Van TRAN <vt...@linagora.com>
AuthorDate: Wed Mar 9 17:04:00 2022 +0700
JAMES-3724 - Implement LeakAware
---
.../core/BufferedDeferredFileOutputStream.java | 9 +-
.../server/core/MimeMessageInputStreamSource.java | 128 +++++++------
.../james/server/core/MimeMessageWrapper.java | 2 +-
.../core/MimeMessageInputStreamSourceTest.java | 6 +-
.../james/server/core/MimeMessageWrapperTest.java | 6 +-
server/container/lifecycle-api/pom.xml | 5 +
.../org/apache/james/lifecycle/api/Disposable.java | 177 ++++++++++++++++++
.../apache/james/lifecycle/api/LeakAwareTest.java | 200 +++++++++++++++++++++
.../james/smtpserver/JamesDataCmdHandler.java | 2 +-
9 files changed, 470 insertions(+), 65 deletions(-)
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java b/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java
index 761f9b1..b5e3f1e 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/BufferedDeferredFileOutputStream.java
@@ -30,6 +30,7 @@ import org.apache.commons.io.IOUtils;
import org.apache.commons.io.output.ByteArrayOutputStream;
import org.apache.commons.io.output.DeferredFileOutputStream;
import org.apache.commons.io.output.ThresholdingOutputStream;
+import org.apache.james.lifecycle.api.Disposable;
/**
* An almost copy of {@link DeferredFileOutputStream} with buffered file stream.
@@ -46,7 +47,7 @@ import org.apache.commons.io.output.ThresholdingOutputStream;
*
* @link https://issues.apache.org/jira/browse/JAMES-2343
*/
-public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream {
+public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream implements Disposable {
/**
* The output stream to which data will be written prior to the theshold
@@ -261,4 +262,10 @@ public class BufferedDeferredFileOutputStream extends ThresholdingOutputStream {
}
}
}
+
+ @Override
+ public void dispose() {
+ // Fasten GC of the big byte array
+ memoryOutputStream = null;
+ }
}
\ No newline at end of file
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java
index aec61fa..4419909 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageInputStreamSource.java
@@ -44,7 +44,7 @@ import org.apache.james.util.SizeFormat;
*
* This class is not thread safe!
*/
-public class MimeMessageInputStreamSource extends Disposable.LeakAware implements MimeMessageSource, Disposable {
+public class MimeMessageInputStreamSource extends Disposable.LeakAware<MimeMessageInputStreamSource.Resource> implements MimeMessageSource {
/**
* 100kb threshold for the stream.
*/
@@ -58,14 +58,6 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
}
private static final int THRESHOLD = threshold();
-
- private final Set<InputStream> streams = new HashSet<>();
-
- /**
- * A temporary file used to hold the message stream
- */
- private BufferedDeferredFileOutputStream out;
-
/**
* The full path of the temporary file
*/
@@ -76,6 +68,62 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
*/
private static final File TMPDIR = new File(System.getProperty("java.io.tmpdir"));
+ static class Resource extends LeakAware.Resource {
+ private final BufferedDeferredFileOutputStream out;
+ private final Set<InputStream> streams;
+
+ Resource(BufferedDeferredFileOutputStream out, Set<InputStream> streams) {
+ super(() -> {
+ // explicit close all streams
+ for (InputStream stream : streams) {
+ try {
+ stream.close();
+ } catch (IOException e) {
+ //ignore exception during close
+ }
+ }
+
+ if (out != null) {
+ try {
+ out.close();
+ } catch (IOException e) {
+ //ignore exception during close
+ }
+ File file = out.getFile();
+ if (file != null) {
+ FileUtils.deleteQuietly(file);
+ file = null;
+ }
+ out.dispose();
+ }
+ });
+ this.out = out;
+ this.streams = streams;
+ }
+
+ public BufferedDeferredFileOutputStream getOut() {
+ return out;
+ }
+
+
+ }
+
+ public static MimeMessageInputStreamSource create(String key, InputStream in) throws MessagingException {
+ Disposable.LeakAware.track();
+ BufferedDeferredFileOutputStream out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR);
+ Resource resource = new Resource(out, new HashSet<>());
+
+ return new MimeMessageInputStreamSource(resource, key, in);
+ }
+
+ public static MimeMessageInputStreamSource create(String key) {
+ Disposable.LeakAware.track();
+ BufferedDeferredFileOutputStream out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR);
+ Resource resource = new Resource(out, new HashSet<>());
+
+ return new MimeMessageInputStreamSource(resource, key);
+ }
+
/**
* Construct a new MimeMessageInputStreamSource from an
* <code>InputStream</code> that contains the bytes of a MimeMessage.
@@ -84,24 +132,23 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
* @param in the stream containing the MimeMessage
* @throws MessagingException if an error occurs while trying to store the stream
*/
- public MimeMessageInputStreamSource(String key, InputStream in) throws MessagingException {
- super();
+ private MimeMessageInputStreamSource(Resource resource, String key, InputStream in) throws MessagingException {
+ super(resource);
// We want to immediately read this into a temporary file
// Create a temp file and channel the input stream into it
try {
- out = new BufferedDeferredFileOutputStream(THRESHOLD, "mimemessage-" + key, ".m64", TMPDIR);
- IOUtils.copy(in, out);
+ IOUtils.copy(in, resource.out);
sourceId = key;
} catch (IOException ioe) {
- File file = out.getFile();
+ File file = resource.out.getFile();
if (file != null) {
FileUtils.deleteQuietly(file);
}
throw new MessagingException("Unable to retrieve the data: " + ioe.getMessage(), ioe);
} finally {
try {
- if (out != null) {
- out.close();
+ if (resource.out != null) {
+ resource.out.close();
}
} catch (IOException ioe) {
// Ignored - logging unavailable to log this non-fatal error.
@@ -118,9 +165,8 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
}
}
- public MimeMessageInputStreamSource(String key) {
- super();
- out = new BufferedDeferredFileOutputStream(THRESHOLD, key, ".m64", TMPDIR);
+ private MimeMessageInputStreamSource(Resource resource, String key) {
+ super(resource);
sourceId = key;
}
@@ -142,12 +188,12 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
@Override
public InputStream getInputStream() throws IOException {
InputStream in;
- if (out.isInMemory()) {
- in = new SharedByteArrayInputStream(out.getData());
+ if (getResource().getOut().isInMemory()) {
+ in = new SharedByteArrayInputStream(getResource().getOut().getData());
} else {
- in = new SharedFileInputStream(out.getFile());
+ in = new SharedFileInputStream(getResource().getOut().getFile());
}
- streams.add(in);
+ getResource().streams.add(in);
return in;
}
@@ -155,43 +201,13 @@ public class MimeMessageInputStreamSource extends Disposable.LeakAware implement
* Get the size of the temp file
*
* @return the size of the temp file
- * @throws IOException if an error is encoutered while computing the size of the
- * message
*/
@Override
- public long getMessageSize() throws IOException {
- return out.getByteCount();
+ public long getMessageSize() {
+ return getResource().getOut().getByteCount();
}
public OutputStream getWritableOutputStream() {
- return out;
- }
-
- @Override
- public void dispose() {
- // explicit close all streams
- for (InputStream stream : streams) {
- try {
- stream.close();
- } catch (IOException e) {
- //ignore exception during close
- }
- }
-
- if (out != null) {
- try {
- out.close();
- } catch (IOException e) {
- //ignore exception during close
- }
- File file = out.getFile();
- if (file != null) {
- FileUtils.deleteQuietly(file);
- file = null;
- }
- out = null;
- }
- disposed();
+ return getResource().getOut();
}
-
}
diff --git a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java
index 5255387..99bd591 100644
--- a/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java
+++ b/server/container/core/src/main/java/org/apache/james/server/core/MimeMessageWrapper.java
@@ -153,7 +153,7 @@ public class MimeMessageWrapper extends MimeMessage implements Disposable {
in.close();
saved = true;
} else {
- MimeMessageInputStreamSource src = new MimeMessageInputStreamSource("MailCopy-" + UUID.randomUUID().toString());
+ MimeMessageInputStreamSource src = MimeMessageInputStreamSource.create("MailCopy-" + UUID.randomUUID().toString());
OutputStream out = src.getWritableOutputStream();
original.writeTo(out);
out.close();
diff --git a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java
index e98f597..b870aa7 100644
--- a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java
+++ b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageInputStreamSourceTest.java
@@ -41,20 +41,20 @@ public class MimeMessageInputStreamSourceTest {
@Test
public void streamWith1MBytesShouldBeReadable() throws MessagingException, IOException {
- testee = new MimeMessageInputStreamSource("myKey", new ZeroedInputStream(_1M));
+ testee = MimeMessageInputStreamSource.create("myKey", new ZeroedInputStream(_1M));
assertThat(testee.getInputStream()).hasSameContentAs(new ZeroedInputStream(_1M));
}
@Test
public void streamWith10KBytesShouldBeReadable() throws MessagingException, IOException {
- testee = new MimeMessageInputStreamSource("myKey", new ZeroedInputStream(_10KB));
+ testee = MimeMessageInputStreamSource.create("myKey", new ZeroedInputStream(_10KB));
assertThat(testee.getInputStream()).hasSameContentAs(new ZeroedInputStream(_10KB));
}
@Test
public void streamWithVeryShortNameShouldWork() throws MessagingException, IOException {
String veryShortName = "1";
- testee = new MimeMessageInputStreamSource(veryShortName, new ZeroedInputStream(_1M));
+ testee = MimeMessageInputStreamSource.create(veryShortName, new ZeroedInputStream(_1M));
assertThat(testee.getInputStream()).isNotNull();
}
}
diff --git a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java
index 9686fe2..5c5ca59 100644
--- a/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java
+++ b/server/container/core/src/test/java/org/apache/james/server/core/MimeMessageWrapperTest.java
@@ -92,7 +92,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest {
@Override
protected TestableMimeMessageWrapper getMessageFromSources(String sources) throws Exception {
- MimeMessageInputStreamSource mmis = new MimeMessageInputStreamSource("test", new SharedByteArrayInputStream(sources.getBytes()));
+ MimeMessageInputStreamSource mmis = MimeMessageInputStreamSource.create("test", new SharedByteArrayInputStream(sources.getBytes()));
return new TestableMimeMessageWrapper(mmis);
}
@@ -348,7 +348,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest {
@Test
public void getMessageSizeShouldBeAccurateWhenHeadersAreModified() throws Exception {
- MimeMessageWrapper wrapper = new MimeMessageWrapper(new MimeMessageInputStreamSource(MailImpl.getId(),
+ MimeMessageWrapper wrapper = new MimeMessageWrapper(MimeMessageInputStreamSource.create(MailImpl.getId(),
ClassLoaderUtils.getSystemResourceAsSharedStream("JAMES-1593.eml")));
wrapper.setHeader("header", "vss");
@@ -358,7 +358,7 @@ public class MimeMessageWrapperTest extends MimeMessageFromStreamTest {
@Test
public void getMessageSizeShouldBeAccurateWhenHeadersAreModifiedAndOtherEncoding() throws Exception {
- MimeMessageWrapper wrapper = new MimeMessageWrapper(new MimeMessageInputStreamSource(MailImpl.getId(),
+ MimeMessageWrapper wrapper = new MimeMessageWrapper(MimeMessageInputStreamSource.create(MailImpl.getId(),
ClassLoaderUtils.getSystemResourceAsSharedStream("mail-containing-unicode-characters.eml")));
wrapper.setHeader("header", "vss");
diff --git a/server/container/lifecycle-api/pom.xml b/server/container/lifecycle-api/pom.xml
index 8a56000..15fc676 100644
--- a/server/container/lifecycle-api/pom.xml
+++ b/server/container/lifecycle-api/pom.xml
@@ -47,6 +47,11 @@
<artifactId>commons-configuration2</artifactId>
</dependency>
<dependency>
+ <groupId>org.awaitility</groupId>
+ <artifactId>awaitility</artifactId>
+ <scope>test</scope>
+ </dependency>
+ <dependency>
<groupId>org.slf4j</groupId>
<artifactId>slf4j-api</artifactId>
</dependency>
diff --git a/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java b/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java
index 1e99a40..bd69789 100644
--- a/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java
+++ b/server/container/lifecycle-api/src/main/java/org/apache/james/lifecycle/api/Disposable.java
@@ -19,6 +19,18 @@
package org.apache.james.lifecycle.api;
+import java.lang.ref.PhantomReference;
+import java.lang.ref.Reference;
+import java.lang.ref.ReferenceQueue;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.stream.Collectors;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* Classes which implement this interface need some special handling on destroy.
* So the {@link #dispose()} method need to get called
@@ -29,4 +41,169 @@ public interface Disposable {
* Dispose the object
*/
void dispose();
+
+ abstract class LeakAware<T extends LeakAware.Resource> implements Disposable {
+ public static class Resource implements Disposable {
+ private final AtomicBoolean isDisposed = new AtomicBoolean(false);
+ private final Disposable cleanup;
+
+ public Resource(Disposable cleanup) {
+ this.cleanup = cleanup;
+ }
+
+ public boolean isDisposed() {
+ return isDisposed.get();
+ }
+
+ @Override
+ public void dispose() {
+ isDisposed.set(true);
+ cleanup.dispose();
+ }
+ }
+
+ public static class LeakDetectorException extends RuntimeException {
+ public LeakDetectorException() {
+ super();
+ }
+ }
+
+ public enum Level {
+ NONE,
+ SIMPLE,
+ ADVANCED,
+ TESTING;
+
+ static Level parse(String input) {
+ for (Level level : values()) {
+ if (level.name().equalsIgnoreCase(input)) {
+ return level;
+ }
+ }
+ throw new IllegalArgumentException(String.format("Unknown level `%s`", input));
+ }
+ }
+
+ public static final ReferenceQueue<LeakAware<?>> REFERENCE_QUEUE = new ReferenceQueue<>();
+ public static final ConcurrentHashMap<LeakAwareFinalizer, Boolean> REFERENCES_IN_USE = new ConcurrentHashMap<>();
+ public static final Level LEVEL = Optional.ofNullable(System.getProperty("james.ligecycle.leak.detection.mode"))
+ .map(Level::parse).orElse(Level.SIMPLE);
+
+ public static void track() {
+ Reference<?> referenceFromQueue;
+ while ((referenceFromQueue = REFERENCE_QUEUE.poll()) != null) {
+ if (leakDetectorEnabled()) {
+ ((LeakAwareFinalizer) referenceFromQueue).detectLeak();
+ }
+ referenceFromQueue.clear();
+ }
+ }
+
+ private static boolean leakDetectorEnabled() {
+ return LEVEL != Level.NONE;
+ }
+
+ public static boolean tracedEnabled() {
+ return LEVEL == Level.ADVANCED || LEVEL == Level.TESTING;
+ }
+
+ private final T resource;
+ private LeakAwareFinalizer finalizer;
+
+ protected LeakAware(T resource) {
+ this.resource = resource;
+ if (leakDetectorEnabled()) {
+ this.finalizer = new LeakAwareFinalizer(this, resource, REFERENCE_QUEUE);
+ REFERENCES_IN_USE.put(finalizer, true);
+ }
+ }
+
+ @Override
+ public void dispose() {
+ if (finalizer != null) {
+ REFERENCES_IN_USE.remove(finalizer);
+ }
+ resource.dispose();
+ }
+
+ public T getResource() {
+ return resource;
+ }
+ }
+
+ class TraceRecord {
+ private final List<StackWalker.StackFrame> stackFrames;
+
+ TraceRecord(List<StackWalker.StackFrame> stackFrames) {
+ this.stackFrames = stackFrames;
+ }
+
+ @Override
+ public String toString() {
+ StringBuilder buf = new StringBuilder();
+ this.stackFrames.subList(3, this.stackFrames.size())
+ .forEach(stackFrame -> {
+ buf.append("\t");
+ buf.append(stackFrame.getClassName());
+ buf.append("#");
+ buf.append(stackFrame.getMethodName());
+ buf.append(":");
+ buf.append(stackFrame.getLineNumber());
+ buf.append("\n");
+ });
+ return buf.toString();
+ }
+ }
+
+ class LeakAwareFinalizer extends PhantomReference<LeakAware<?>> {
+ private static final Logger LOGGER = LoggerFactory.getLogger(LeakAwareFinalizer.class);
+
+ private final LeakAware.Resource resource;
+ private TraceRecord traceRecord;
+
+ public LeakAwareFinalizer(LeakAware referent, LeakAware.Resource resource, ReferenceQueue<? super LeakAware<?>> q) {
+ super(referent, q);
+ this.resource = resource;
+ if (LeakAware.tracedEnabled()) {
+ traceRecord = new TraceRecord(StackWalker.getInstance().walk(s -> s.collect(Collectors.toList())));
+ }
+ }
+
+ public void detectLeak() {
+ switch (LeakAware.LEVEL) {
+ case NONE: // nothing
+ break;
+ case SIMPLE:
+ case ADVANCED: {
+ if (isNotDisposed()) {
+ errorLog();
+ resource.dispose();
+ LeakAware.REFERENCES_IN_USE.remove(this);
+ }
+ break;
+ }
+ case TESTING: {
+ if (isNotDisposed()) {
+ errorLog();
+ throw new LeakAware.LeakDetectorException();
+ }
+ }
+ }
+ }
+
+ public void errorLog() {
+ if (LeakAware.tracedEnabled()) {
+ LOGGER.error("Leak detected! Resource {} was not released before its referent was garbage-collected. \n" +
+ "This resource was instanced at: \n{}", resource, traceRecord.toString());
+ } else {
+ LOGGER.error("Leak detected! Resource {} was not released before its referent was garbage-collected. \n" +
+ "Resource management needs to be reviewed: ensure to always call dispose() for disposable objects you work with. \n" +
+ "Consider enabling advanced leak detection to further identify the problem.", resource);
+ }
+ }
+
+ private boolean isNotDisposed() {
+ return !resource.isDisposed();
+ }
+ }
}
diff --git a/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java b/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java
new file mode 100644
index 0000000..b774024
--- /dev/null
+++ b/server/container/lifecycle-api/src/test/java/org/apache/james/lifecycle/api/LeakAwareTest.java
@@ -0,0 +1,200 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+
+package org.apache.james.lifecycle.api;
+
+import static org.assertj.core.api.Assertions.assertThat;
+import static org.assertj.core.api.Assertions.assertThatThrownBy;
+import static org.awaitility.Durations.ONE_HUNDRED_MILLISECONDS;
+import static org.awaitility.Durations.TEN_SECONDS;
+import static org.apache.james.lifecycle.api.Disposable.LeakAware;
+
+import java.lang.reflect.Field;
+import java.lang.reflect.Modifier;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import org.awaitility.Awaitility;
+import org.awaitility.core.ConditionFactory;
+
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.params.ParameterizedTest;
+import org.junit.jupiter.params.provider.ValueSource;
+import org.slf4j.LoggerFactory;
+
+import ch.qos.logback.classic.Level;
+import ch.qos.logback.classic.Logger;
+import ch.qos.logback.classic.spi.ILoggingEvent;
+import ch.qos.logback.core.read.ListAppender;
+
+class LeakAwareTest {
+
+ private static final class LeakResourceSample extends LeakAware<LeakResourceSample.TestResource> {
+ static class TestResource extends LeakAware.Resource {
+ public TestResource(Disposable cleanup) {
+ super(cleanup);
+ }
+ }
+
+ public static LeakResourceSample create(AtomicBoolean atomicBoolean) {
+ return new LeakResourceSample(new TestResource(() -> atomicBoolean.set(true)));
+ }
+
+ LeakResourceSample(TestResource resource) {
+ super(resource);
+ }
+ }
+
+ private static final ConditionFactory awaitAtMostTenSeconds = Awaitility
+ .with().pollInterval(ONE_HUNDRED_MILLISECONDS)
+ .and().pollDelay(ONE_HUNDRED_MILLISECONDS)
+ .await()
+ .atMost(TEN_SECONDS);
+
+ public static ListAppender<ILoggingEvent> getListAppenderForClass(Class clazz) {
+ Logger logger = (Logger) LoggerFactory.getLogger(clazz);
+
+ ListAppender<ILoggingEvent> loggingEventListAppender = new ListAppender<>();
+ loggingEventListAppender.start();
+
+ logger.addAppender(loggingEventListAppender);
+ return loggingEventListAppender;
+ }
+
+ private void forceChangeLevel(String level) throws NoSuchFieldException, IllegalAccessException {
+ forceChangeLevel(LeakAware.Level.parse(level));
+ }
+
+ // using reflect to change LeakAware.LEVEL value
+ private static void forceChangeLevel(LeakAware.Level level) throws NoSuchFieldException, IllegalAccessException {
+ final Field field = LeakAware.class.getDeclaredField("LEVEL");
+ field.setAccessible(true);
+ final Field modifiersField = Field.class.getDeclaredField("modifiers");
+ modifiersField.setAccessible(true);
+ modifiersField.setInt(field, field.getModifiers() & ~Modifier.FINAL);
+ field.set(null, level);
+ }
+
+ @Test
+ void leakDetectionShouldCloseUnclosedResources() {
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+ resourceSample = null;
+
+ System.gc();
+ awaitAtMostTenSeconds.until(() -> {
+ LeakAware.track();
+ return atomicBoolean.get();
+ });
+ }
+
+ @Test
+ void leakDetectionShouldNotReportClosedObjects() {
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+ resourceSample.dispose();
+ atomicBoolean.set(false);
+ resourceSample = null;
+
+ System.gc();
+ awaitAtMostTenSeconds.until(() -> {
+ LeakAware.track();
+ return !atomicBoolean.get();
+ });
+ }
+
+ @Test
+ void resourceShouldNotBeDetectedLeakWhenLevelIsNone() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
+ forceChangeLevel(LeakAware.Level.NONE);
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+ resourceSample = null;
+
+ System.gc();
+ Thread.sleep(500);
+ LeakAware.track();
+ assertThat(atomicBoolean.get()).isFalse();
+ }
+
+ @ParameterizedTest
+ @ValueSource(strings = {"simple", "advanced"})
+ void leakDetectionShouldLogWhenDetected(String level) throws NoSuchFieldException, IllegalAccessException {
+ forceChangeLevel(level);
+ ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class);
+
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+ resourceSample = null;
+
+ System.gc();
+ awaitAtMostTenSeconds.untilAsserted(() -> {
+ LeakAware.track();
+ assertThat(loggingEvents.list).hasSize(1)
+ .allSatisfy(loggingEvent -> {
+ assertThat(loggingEvent.getLevel()).isEqualTo(Level.ERROR);
+ assertThat(loggingEvent.getFormattedMessage()).contains("Leak detected", "TestResource");
+ });
+ });
+ }
+
+ @Test
+ void leakDetectionShouldLogTraceRecordWhenLevelIsAdvanced() throws NoSuchFieldException, IllegalAccessException {
+ forceChangeLevel(LeakAware.Level.ADVANCED);
+ ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class);
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+ resourceSample = null;
+
+ System.gc();
+ awaitAtMostTenSeconds.untilAsserted(() -> {
+ LeakAware.track();
+ assertThat(loggingEvents.list).hasSize(1)
+ .allSatisfy(loggingEvent -> {
+ assertThat(loggingEvent.getLevel()).isEqualTo(Level.ERROR);
+ assertThat(loggingEvent.getFormattedMessage()).contains("This resource was instanced at", "LeakAwareTest#leakDetectionShouldLogTraceRecordWhenLevelIsAdvanced");
+ });
+ });
+ }
+
+ @Test
+ void leakDetectionShouldThrowWhenDetectedAndLevelIsTesting() throws NoSuchFieldException, IllegalAccessException, InterruptedException {
+ forceChangeLevel(LeakAware.Level.TESTING);
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+ resourceSample = null;
+
+ System.gc();
+ Thread.sleep(500);
+ assertThatThrownBy(LeakAware::track)
+ .isInstanceOf(LeakAware.LeakDetectorException.class);
+ }
+
+ @Test
+ void leakDetectionShouldNotLogWhenLevelIsNone() throws InterruptedException, NoSuchFieldException, IllegalAccessException {
+ forceChangeLevel(LeakAware.Level.NONE);
+ ListAppender<ILoggingEvent> loggingEvents = getListAppenderForClass(Disposable.LeakAwareFinalizer.class);
+ AtomicBoolean atomicBoolean = new AtomicBoolean(false);
+ LeakResourceSample resourceSample = LeakResourceSample.create(atomicBoolean);
+ resourceSample = null;
+
+ System.gc();
+ Thread.sleep(500);
+ assertThat(loggingEvents.list).isEmpty();
+ }
+
+}
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java
index 8c6eaaa..0ffa698 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/JamesDataCmdHandler.java
@@ -52,7 +52,7 @@ public class JamesDataCmdHandler extends DataCmdHandler {
@Override
protected SMTPResponse doDATA(SMTPSession session, String argument) {
try {
- MimeMessageInputStreamSource mmiss = new MimeMessageInputStreamSource(MailImpl.getId());
+ MimeMessageInputStreamSource mmiss = MimeMessageInputStreamSource.create(MailImpl.getId());
session.setAttachment(SMTPConstants.DATA_MIMEMESSAGE_STREAMSOURCE, mmiss, State.Transaction);
} catch (Exception e) {
LOGGER.warn("Error creating mimemessagesource for incoming data", e);
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org