You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by or...@apache.org on 2023/07/27 10:57:29 UTC
[camel] branch main updated: CAMEL-19664: fix multiple concurrency issues in camel-test-infra-artemis (#10854)
This is an automated email from the ASF dual-hosted git repository.
orpiske pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/main by this push:
new 9a314406b6e CAMEL-19664: fix multiple concurrency issues in camel-test-infra-artemis (#10854)
9a314406b6e is described below
commit 9a314406b6e55a00acef957cbf5831da73f2a7e3
Author: Otavio Rodolfo Piske <or...@users.noreply.github.com>
AuthorDate: Thu Jul 27 12:57:22 2023 +0200
CAMEL-19664: fix multiple concurrency issues in camel-test-infra-artemis (#10854)
---
.../services/AbstractArtemisEmbeddedService.java | 38 ++++++++++++----------
.../infra/artemis/services/ArtemisAMQPService.java | 2 +-
.../infra/artemis/services/ArtemisMQTTService.java | 2 +-
.../services/ArtemisPersistentVMService.java | 3 +-
.../services/ArtemisTCPAllProtocolsService.java | 5 +--
.../infra/artemis/services/ArtemisVMService.java | 10 ++++--
6 files changed, 34 insertions(+), 26 deletions(-)
diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java
index d2c33ecd586..8a8dc969013 100644
--- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java
+++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/AbstractArtemisEmbeddedService.java
@@ -41,37 +41,36 @@ import static org.junit.jupiter.api.Assertions.fail;
public abstract class AbstractArtemisEmbeddedService implements ArtemisService, ConnectionFactoryAware {
protected static final Logger LOG = LoggerFactory.getLogger(AbstractArtemisEmbeddedService.class);
- protected static final LongAdder BROKER_COUNT = new LongAdder();
+ private static final LongAdder BROKER_COUNT = new LongAdder();
- protected int tcpPort = AvailablePortFinder.getNextAvailable();
- protected EmbeddedActiveMQ embeddedBrokerService;
- private Configuration artemisConfiguration;
+ protected final EmbeddedActiveMQ embeddedBrokerService;
+ private final Configuration artemisConfiguration;
public AbstractArtemisEmbeddedService() {
- defaultConfiguration();
-
- embeddedBrokerService.setConfiguration(getConfiguration(artemisConfiguration, AvailablePortFinder.getNextAvailable()));
+ this(AvailablePortFinder.getNextAvailable());
}
- public AbstractArtemisEmbeddedService(int port) {
- defaultConfiguration();
+ protected AbstractArtemisEmbeddedService(int port) {
+ embeddedBrokerService = new EmbeddedActiveMQ();
+ artemisConfiguration = new ConfigurationImpl();
- embeddedBrokerService.setConfiguration(getConfiguration(artemisConfiguration, port));
+ embeddedBrokerService.setConfiguration(configure(port));
}
- private void defaultConfiguration() {
- embeddedBrokerService = new EmbeddedActiveMQ();
+ private synchronized Configuration configure(int port) {
+ final int brokerId = BROKER_COUNT.intValue();
+ BROKER_COUNT.increment();
// Base configuration
- artemisConfiguration = new ConfigurationImpl();
artemisConfiguration.setSecurityEnabled(false);
- BROKER_COUNT.increment();
- artemisConfiguration.setBrokerInstance(new File("target", "artemis-" + BROKER_COUNT.intValue()));
+ artemisConfiguration.setBrokerInstance(new File("target", "artemis-" + brokerId));
artemisConfiguration.setJMXManagementEnabled(false);
artemisConfiguration.setMaxDiskUsage(98);
+
+ return configure(artemisConfiguration, port, brokerId);
}
- protected abstract Configuration getConfiguration(Configuration artemisConfiguration, int port);
+ protected abstract Configuration configure(Configuration artemisConfiguration, int port, int brokerId);
public void customConfiguration(Consumer<Configuration> configuration) {
configuration.accept(artemisConfiguration);
@@ -103,8 +102,11 @@ public abstract class AbstractArtemisEmbeddedService implements ArtemisService,
@Override
public void initialize() {
try {
- embeddedBrokerService.start();
- embeddedBrokerService.getActiveMQServer().waitForActivation(20, TimeUnit.SECONDS);
+ if (embeddedBrokerService.getActiveMQServer() == null || !embeddedBrokerService.getActiveMQServer().isStarted()) {
+ embeddedBrokerService.start();
+
+ embeddedBrokerService.getActiveMQServer().waitForActivation(20, TimeUnit.SECONDS);
+ }
} catch (Exception e) {
LOG.warn("Unable to start embedded Artemis broker: {}", e.getMessage(), e);
fail(e.getMessage());
diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java
index ab799052292..b5e9c023ae4 100644
--- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java
+++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisAMQPService.java
@@ -33,7 +33,7 @@ public class ArtemisAMQPService extends AbstractArtemisEmbeddedService {
}
@Override
- protected Configuration getConfiguration(Configuration artemisConfiguration, int port) {
+ protected Configuration configure(Configuration artemisConfiguration, int port, int brokerId) {
amqpPort = port;
brokerURL = "tcp://0.0.0.0:" + amqpPort
+ "?tcpSendBufferSize=1048576;tcpReceiveBufferSize=1048576;protocols=AMQP;useEpoll=true;amqpCredits=1000;amqpMinCredits=300";
diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java
index e0d38e3c268..b0989bde725 100644
--- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java
+++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisMQTTService.java
@@ -34,7 +34,7 @@ public class ArtemisMQTTService extends AbstractArtemisEmbeddedService {
}
@Override
- protected Configuration getConfiguration(Configuration configuration, int port) {
+ protected Configuration configure(Configuration configuration, int port, int brokerId) {
this.port = port;
brokerURL = "tcp://0.0.0.0:" + port;
diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java
index e329966889c..2260e5db8e4 100644
--- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java
+++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisPersistentVMService.java
@@ -29,8 +29,7 @@ public class ArtemisPersistentVMService extends AbstractArtemisEmbeddedService {
private String brokerURL;
@Override
- protected Configuration getConfiguration(Configuration configuration, int port) {
- final int brokerId = super.BROKER_COUNT.intValue();
+ protected Configuration configure(Configuration configuration, int port, int brokerId) {
brokerURL = "vm://" + brokerId;
configuration.setPersistenceEnabled(true);
diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java
index fbbb8ec0af5..faf84ecb17d 100644
--- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java
+++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisTCPAllProtocolsService.java
@@ -32,8 +32,9 @@ public class ArtemisTCPAllProtocolsService extends AbstractArtemisEmbeddedServic
private int port;
@Override
- protected Configuration getConfiguration(Configuration configuration, int port) {
- final int brokerId = super.BROKER_COUNT.intValue();
+ protected Configuration configure(Configuration configuration, int port, int brokerId) {
+ this.port = port;
+
port = AvailablePortFinder.getNextAvailable();
brokerURL = "tcp://0.0.0.0:" + port;
diff --git a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java
index 0f08dd9a64b..77e5d96bcf9 100644
--- a/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java
+++ b/test-infra/camel-test-infra-artemis/src/test/java/org/apache/camel/test/infra/artemis/services/ArtemisVMService.java
@@ -20,19 +20,25 @@ import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.settings.impl.AddressFullMessagePolicy;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
import static org.junit.jupiter.api.Assertions.fail;
public class ArtemisVMService extends AbstractArtemisEmbeddedService {
+ private static final Logger LOG = LoggerFactory.getLogger(ArtemisVMService.class);
private String brokerURL;
@Override
- protected Configuration getConfiguration(Configuration configuration, int port) {
- final int brokerId = super.BROKER_COUNT.intValue();
+ protected Configuration configure(Configuration configuration, int port, int brokerId) {
brokerURL = "vm://" + brokerId;
+ LOG.info("Creating a new Artemis VM-based broker");
configuration.setPersistenceEnabled(false);
+ configuration.setJournalMinFiles(10);
+ configuration.setSecurityEnabled(false);
+
try {
configuration.addAcceptorConfiguration("in-vm", "vm://" + brokerId);
} catch (Exception e) {