You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by gn...@apache.org on 2021/03/23 16:14:57 UTC
[camel] branch master updated: Make camel-netty tests run in
parallel
This is an automated email from the ASF dual-hosted git repository.
gnodet pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new ae372c3 Make camel-netty tests run in parallel
ae372c3 is described below
commit ae372c39a0807250ffc304bc86c57c8a64be7868
Author: Guillaume Nodet <gn...@gmail.com>
AuthorDate: Tue Mar 23 17:14:11 2021 +0100
Make camel-netty tests run in parallel
---
components/camel-netty/pom.xml | 22 ++++--
.../camel/component/netty/BaseNettyTest.java | 21 ++---
.../netty/ErrorDuringGracefullShutdownTest.java | 3 +
.../camel/component/netty/LogCaptureTest.java | 2 +
.../camel/component/netty/NettyProxyTest.java | 17 ++--
...ttyUseSharedWorkerThreadPoolManyRoutesTest.java | 20 ++++-
.../netty/NettyUseSharedWorkerThreadPoolTest.java | 28 +++----
.../netty/UnsharableCodecsConflictsTest.java | 17 ++--
.../org/apache/camel/test/AvailablePortFinder.java | 91 ++++++++++++++++++++--
9 files changed, 159 insertions(+), 62 deletions(-)
diff --git a/components/camel-netty/pom.xml b/components/camel-netty/pom.xml
index 5245142..a1a6169 100644
--- a/components/camel-netty/pom.xml
+++ b/components/camel-netty/pom.xml
@@ -32,6 +32,11 @@
<name>Camel :: Netty</name>
<description>Camel Netty NIO based socket communication component</description>
+ <properties>
+ <camel.surefire.parallel>true</camel.surefire.parallel>
+ <io.netty.leakDetection.level>DISABLED</io.netty.leakDetection.level>
+ </properties>
+
<dependencies>
<dependency>
<groupId>org.apache.camel</groupId>
@@ -134,20 +139,25 @@
</execution>
</executions>
</plugin>
-
<plugin>
<artifactId>maven-surefire-plugin</artifactId>
<configuration>
- <forkCount>1</forkCount>
- <reuseForks>false</reuseForks>
- <forkedProcessTimeoutInSeconds>5000</forkedProcessTimeoutInSeconds>
<systemPropertyVariables>
- <!-- can use PARANOID for checking every access -->
- <io.netty.leakDetection.level>ADVANCED</io.netty.leakDetection.level>
+ <io.netty.leakDetection.level>${io.netty.leakDetection.level}</io.netty.leakDetection.level>
</systemPropertyVariables>
</configuration>
</plugin>
</plugins>
</build>
+ <profiles>
+ <profile>
+ <id>leaks</id>
+ <properties>
+ <camel.surefire.parallel>false</camel.surefire.parallel>
+ <io.netty.leakDetection.level>PARANOID</io.netty.leakDetection.level>
+ </properties>
+ </profile>
+ </profiles>
+
</project>
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
index 8d40832..7236730 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
@@ -28,6 +28,7 @@ import org.apache.camel.test.junit5.CamelTestSupport;
import org.apache.logging.log4j.core.LogEvent;
import org.junit.jupiter.api.AfterAll;
import org.junit.jupiter.api.BeforeAll;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -38,19 +39,15 @@ public class BaseNettyTest extends CamelTestSupport {
private static final Logger LOG = LoggerFactory.getLogger(BaseNettyTest.class);
- private static volatile int port;
-
- @BeforeAll
- public static void initPort() throws Exception {
- port = AvailablePortFinder.getNextAvailable();
- }
+ @RegisterExtension
+ protected AvailablePortFinder.Port port = AvailablePortFinder.find();
@BeforeAll
public static void startLeakDetection() {
System.setProperty("io.netty.leakDetection.maxRecords", "100");
System.setProperty("io.netty.leakDetection.acquireAndReleaseOnly", "true");
System.setProperty("io.netty.leakDetection.targetRecords", "100");
- ResourceLeakDetector.setLevel(ResourceLeakDetector.Level.PARANOID);
+ LogCaptureAppender.reset();
}
@AfterAll
@@ -64,9 +61,8 @@ public class BaseNettyTest extends CamelTestSupport {
String message = "Leaks detected while running tests: " + events;
// Just write the message into log to help debug
for (LogEvent event : events) {
- LOG.info(event.getMessage().getFormattedMessage());
+ LOG.info(event.getMessage().toString());
}
- LogCaptureAppender.reset();
throw new AssertionError(message);
}
}
@@ -87,13 +83,8 @@ public class BaseNettyTest extends CamelTestSupport {
return prop;
}
- protected int getNextPort() {
- port = AvailablePortFinder.getNextAvailable();
- return port;
- }
-
protected int getPort() {
- return port;
+ return port.getPort();
}
protected String byteArrayToHex(byte[] bytes) {
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
index 6771cc5..0b31b10 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
@@ -21,13 +21,16 @@ import org.apache.camel.ServiceStatus;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Isolated;
import static org.assertj.core.api.Assertions.assertThat;
/**
* Regression test for CAMEL-9527
*/
+@Isolated
class ErrorDuringGracefullShutdownTest extends BaseNettyTest {
+
@Override
protected RoutesBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
index e9656ff..40c3df8 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
@@ -20,6 +20,7 @@ import io.netty.util.ResourceLeakDetector;
import io.netty.util.internal.logging.InternalLoggerFactory;
import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.parallel.Isolated;
import static org.junit.jupiter.api.Assertions.assertFalse;
import static org.junit.jupiter.api.Assertions.assertTrue;
@@ -27,6 +28,7 @@ import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This test ensures LogCaptureAppender is configured properly
*/
+@Isolated
public class LogCaptureTest {
@Test
public void testCapture() {
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java
index 3057d5f..2e8ad5e 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyProxyTest.java
@@ -17,7 +17,9 @@
package org.apache.camel.component.netty;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -26,8 +28,8 @@ import static org.junit.jupiter.api.Assertions.assertEquals;
*/
public class NettyProxyTest extends BaseNettyTest {
- private int port1;
- private int port2;
+ @RegisterExtension
+ protected AvailablePortFinder.Port port2 = AvailablePortFinder.find();
@Test
public void testNettyProxy() throws Exception {
@@ -35,7 +37,7 @@ public class NettyProxyTest extends BaseNettyTest {
getMockEndpoint("mock:proxy").expectedBodiesReceived("Camel");
getMockEndpoint("mock:after").expectedBodiesReceived("Bye Camel");
- Object body = template.requestBody("netty:tcp://localhost:" + port1 + "?sync=true&textline=true", "Camel\n");
+ Object body = template.requestBody("netty:tcp://localhost:" + port.getPort() + "?sync=true&textline=true", "Camel\n");
assertEquals("Bye Camel", body);
assertMockEndpointsSatisfied();
@@ -46,15 +48,12 @@ public class NettyProxyTest extends BaseNettyTest {
return new RouteBuilder() {
@Override
public void configure() throws Exception {
- port1 = getPort();
- port2 = getNextPort();
-
- fromF("netty:tcp://localhost:%s?sync=true&textline=true", port1)
+ fromF("netty:tcp://localhost:%s?sync=true&textline=true", port.getPort())
.to("mock:before")
- .toF("netty:tcp://localhost:%s?sync=true&textline=true", port2)
+ .toF("netty:tcp://localhost:%s?sync=true&textline=true", port2.getPort())
.to("mock:after");
- fromF("netty:tcp://localhost:%s?sync=true&textline=true", port2)
+ fromF("netty:tcp://localhost:%s?sync=true&textline=true", port2.getPort())
.to("mock:proxy")
.transform().simple("Bye ${body}\n");
}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
index 09ea897..6c81982 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolManyRoutesTest.java
@@ -19,6 +19,8 @@ package org.apache.camel.component.netty;
import io.netty.channel.EventLoopGroup;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
+import org.junit.jupiter.api.AfterEach;
import org.junit.jupiter.api.BeforeEach;
import org.junit.jupiter.api.Test;
import org.slf4j.Logger;
@@ -35,6 +37,7 @@ public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest
@BindToRegistry("sharedBoss")
private EventLoopGroup sharedWorkerGroup = new NettyServerBossPoolBuilder().withBossCount(20).build();
private int before;
+ protected AvailablePortFinder.Port[] ports;
@Override
protected boolean useJmx() {
@@ -45,9 +48,22 @@ public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest
@BeforeEach
public void setUp() throws Exception {
before = Thread.activeCount();
+ ports = new AvailablePortFinder.Port[60];
+ for (int i = 0; i < ports.length; i++) {
+ ports[i] = AvailablePortFinder.find();
+ }
super.setUp();
}
+ @Override
+ @AfterEach
+ public void tearDown() throws Exception {
+ super.tearDown();
+ for (AvailablePortFinder.Port port : ports) {
+ port.release();
+ }
+ }
+
@Test
public void testSharedThreadPool() throws Exception {
int delta = Thread.activeCount() - before;
@@ -65,8 +81,8 @@ public class NettyUseSharedWorkerThreadPoolManyRoutesTest extends BaseNettyTest
@Override
public void configure() throws Exception {
- for (int i = 0; i < 60; i++) {
- from("netty:tcp://localhost:" + getNextPort() + "?textline=true&sync=true&usingExecutorService=false"
+ for (AvailablePortFinder.Port port : ports) {
+ from("netty:tcp://localhost:" + port.getPort() + "?textline=true&sync=true&usingExecutorService=false"
+ "&bossGroup=#sharedBoss&workerGroup=#sharedWorker")
.validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result")
.transform(body().regexReplaceAll("Hello", "Bye"));
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
index 931ba90..8266abc 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyUseSharedWorkerThreadPoolTest.java
@@ -19,7 +19,9 @@ package org.apache.camel.component.netty;
import io.netty.channel.EventLoopGroup;
import org.apache.camel.BindToRegistry;
import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.test.AvailablePortFinder;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import static org.junit.jupiter.api.Assertions.assertEquals;
@@ -31,9 +33,10 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest {
@BindToRegistry("sharedClientPool")
private EventLoopGroup sharedWorkerClientGroup
= new NettyWorkerPoolBuilder().withWorkerCount(3).withName("NettyClient").build();
- private int port;
- private int port2;
- private int port3;
+ @RegisterExtension
+ protected AvailablePortFinder.Port port2 = AvailablePortFinder.find();
+ @RegisterExtension
+ protected AvailablePortFinder.Port port3 = AvailablePortFinder.find();
@Override
protected boolean useJmx() {
@@ -46,17 +49,20 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest {
for (int i = 0; i < 10; i++) {
String reply = template.requestBody(
- "netty:tcp://localhost:" + port + "?textline=true&sync=true&workerGroup=#sharedClientPool", "Hello World",
+ "netty:tcp://localhost:" + port.getPort() + "?textline=true&sync=true&workerGroup=#sharedClientPool",
+ "Hello World",
String.class);
assertEquals("Bye World", reply);
reply = template.requestBody(
- "netty:tcp://localhost:" + port2 + "?textline=true&sync=true&workerGroup=#sharedClientPool", "Hello Camel",
+ "netty:tcp://localhost:" + port2.getPort() + "?textline=true&sync=true&workerGroup=#sharedClientPool",
+ "Hello Camel",
String.class);
assertEquals("Hi Camel", reply);
reply = template.requestBody(
- "netty:tcp://localhost:" + port3 + "?textline=true&sync=true&workerGroup=#sharedClientPool", "Hello Claus",
+ "netty:tcp://localhost:" + port3.getPort() + "?textline=true&sync=true&workerGroup=#sharedClientPool",
+ "Hello Claus",
String.class);
assertEquals("Hej Claus", reply);
}
@@ -73,21 +79,17 @@ public class NettyUseSharedWorkerThreadPoolTest extends BaseNettyTest {
@Override
public void configure() throws Exception {
- port = getPort();
- port2 = getNextPort();
- port3 = getNextPort();
-
- from("netty:tcp://localhost:" + port
+ from("netty:tcp://localhost:" + port.getPort()
+ "?textline=true&sync=true&workerGroup=#sharedServerPool&usingExecutorService=false")
.validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result")
.transform(body().regexReplaceAll("Hello", "Bye"));
- from("netty:tcp://localhost:" + port2
+ from("netty:tcp://localhost:" + port2.getPort()
+ "?textline=true&sync=true&workerGroup=#sharedServerPool&usingExecutorService=false")
.validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result")
.transform(body().regexReplaceAll("Hello", "Hi"));
- from("netty:tcp://localhost:" + port3
+ from("netty:tcp://localhost:" + port3.getPort()
+ "?textline=true&sync=true&workerGroup=#sharedServerPool&usingExecutorService=false")
.validate(body().isInstanceOf(String.class)).to("log:result").to("mock:result")
.transform(body().regexReplaceAll("Hello", "Hej"));
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
index a829ca3..884fbae 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/UnsharableCodecsConflictsTest.java
@@ -26,8 +26,10 @@ import org.apache.camel.Exchange;
import org.apache.camel.Processor;
import org.apache.camel.builder.RouteBuilder;
import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.test.AvailablePortFinder;
import org.apache.camel.util.IOHelper;
import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.extension.RegisterExtension;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -42,8 +44,8 @@ public class UnsharableCodecsConflictsTest extends BaseNettyTest {
private Processor processor = new P();
- private int port1;
- private int port2;
+ @RegisterExtension
+ protected AvailablePortFinder.Port port2 = AvailablePortFinder.find();
@BindToRegistry("length-decoder")
private ChannelHandlerFactory decoder = ChannelHandlerFactories.newLengthFieldBasedFrameDecoder(1048576, 0, 4, 0, 4);
@@ -63,8 +65,8 @@ public class UnsharableCodecsConflictsTest extends BaseNettyTest {
MockEndpoint mock = getMockEndpoint("mock:result");
mock.expectedBodiesReceived(new String(sPort2) + "9");
- Socket server1 = getSocket("localhost", port1);
- Socket server2 = getSocket("localhost", port2);
+ Socket server1 = getSocket("localhost", port.getPort());
+ Socket server2 = getSocket("localhost", port2.getPort());
try {
sendSopBuffer(bodyPort2, server2);
@@ -84,12 +86,9 @@ public class UnsharableCodecsConflictsTest extends BaseNettyTest {
protected RouteBuilder createRouteBuilder() throws Exception {
return new RouteBuilder() {
public void configure() throws Exception {
- port1 = getPort();
- port2 = getNextPort();
+ from("netty:tcp://localhost:" + port.getPort() + "?decoders=#length-decoder&sync=false").process(processor);
- from("netty:tcp://localhost:" + port1 + "?decoders=#length-decoder&sync=false").process(processor);
-
- from("netty:tcp://localhost:" + port2 + "?decoders=#length-decoder2&sync=false").process(processor)
+ from("netty:tcp://localhost:" + port2.getPort() + "?decoders=#length-decoder2&sync=false").process(processor)
.to("mock:result");
}
};
diff --git a/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java b/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java
index 3adc24d..0073685 100644
--- a/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java
+++ b/components/camel-test/camel-test-junit5/src/main/java/org/apache/camel/test/AvailablePortFinder.java
@@ -20,7 +20,11 @@ import java.io.IOException;
import java.net.InetAddress;
import java.net.InetSocketAddress;
import java.net.ServerSocket;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import org.junit.jupiter.api.extension.AfterAllCallback;
+import org.junit.jupiter.api.extension.ExtensionContext;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -31,6 +35,25 @@ public final class AvailablePortFinder {
private static final Logger LOG = LoggerFactory.getLogger(AvailablePortFinder.class);
+ private static final AvailablePortFinder INSTANCE = new AvailablePortFinder();
+
+ public interface Port extends AfterAllCallback, AutoCloseable {
+ int getPort();
+
+ void release();
+
+ default void afterAll(ExtensionContext context) throws Exception {
+ release();
+ }
+
+ @Override
+ default void close() {
+ release();
+ }
+ }
+
+ private final Map<Integer, Port> portMapping = new ConcurrentHashMap<>();
+
/**
* Creates a new instance.
*/
@@ -38,6 +61,61 @@ public final class AvailablePortFinder {
// Do nothing
}
+ public static Port find() {
+ return INSTANCE.findPort();
+ }
+
+ synchronized Port findPort() {
+ while (true) {
+ final int port = probePort(0);
+ Port p = new Port() {
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void release() {
+ AvailablePortFinder.this.release(this);
+ }
+ };
+ Port prv = INSTANCE.portMapping.putIfAbsent(port, p);
+ if (prv == null) {
+ return p;
+ }
+ }
+ }
+
+ synchronized Port findPort(int fromPort, int toPort) {
+ for (int i = fromPort; i <= toPort; i++) {
+ try {
+ final int port = probePort(i);
+ Port p = new Port() {
+ @Override
+ public int getPort() {
+ return port;
+ }
+
+ @Override
+ public void release() {
+ AvailablePortFinder.this.release(this);
+ }
+ };
+ Port prv = INSTANCE.portMapping.putIfAbsent(port, p);
+ if (prv == null) {
+ return p;
+ }
+ } catch (IllegalStateException e) {
+ // do nothing, let's try the next port
+ }
+ }
+ throw new IllegalStateException("Cannot find free port");
+ }
+
+ synchronized void release(Port port) {
+ INSTANCE.portMapping.remove(port.getPort(), port);
+ }
+
/**
* Gets the next available port.
*
@@ -45,7 +123,9 @@ public final class AvailablePortFinder {
* @return the available port
*/
public static int getNextAvailable() {
- return probePort(0);
+ try (Port port = INSTANCE.findPort()) {
+ return port.getPort();
+ }
}
/**
@@ -58,14 +138,9 @@ public final class AvailablePortFinder {
* @return the available port
*/
public static int getNextAvailable(int fromPort, int toPort) {
- for (int i = fromPort; i <= toPort; i++) {
- try {
- return probePort(i);
- } catch (IllegalStateException e) {
- // do nothing, let's try the next port
- }
+ try (Port port = INSTANCE.findPort(fromPort, toPort)) {
+ return port.getPort();
}
- throw new IllegalStateException("Cannot find free port");
}
/**