You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2022/12/17 16:10:17 UTC
[activemq-artemis] branch main updated: ARTEMIS-4016 Bridges created by management operations are removed on restart and config reload
This is an automated email from the ASF dual-hosted git repository.
jbertram pushed a commit to branch main
in repository https://gitbox.apache.org/repos/asf/activemq-artemis.git
The following commit(s) were added to refs/heads/main by this push:
new 04ddeb647c ARTEMIS-4016 Bridges created by management operations are removed on restart and config reload
04ddeb647c is described below
commit 04ddeb647c9c57ac48c2dc31cb161c8458f8441b
Author: AntonRoskvist <an...@volvo.com>
AuthorDate: Mon Sep 26 17:26:36 2022 +0200
ARTEMIS-4016 Bridges created by management operations are removed on restart and config reload
---
.../artemis/core/config/BridgeConfiguration.java | 158 +++++++++++++++++++++
.../management/impl/ActiveMQServerControlImpl.java | 5 +-
.../artemis/core/persistence/StorageManager.java | 7 +
.../config/PersistedBridgeConfiguration.java | 72 ++++++++++
.../journal/AbstractJournalStorageManager.java | 39 +++++
.../persistence/impl/journal/DescribeJournal.java | 8 ++
.../persistence/impl/journal/JournalRecordIds.java | 2 +
.../impl/nullpm/NullStorageManager.java | 14 ++
.../core/server/impl/ActiveMQServerImpl.java | 37 ++++-
.../core/config/BridgeConfigurationTest.java | 1 +
.../core/transaction/impl/TransactionImplTest.java | 14 ++
.../tests/integration/client/SendAckFailTest.java | 14 ++
.../BridgeConfigurationStorageTest.java | 124 ++++++++++++++++
.../integration/persistence/ConfigChangeTest.java | 51 +++++++
14 files changed, 539 insertions(+), 7 deletions(-)
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
index dd964eadc1..d9bc9ff182 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/config/BridgeConfiguration.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.config;
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.json.JsonArray;
import org.apache.activemq.artemis.json.JsonArrayBuilder;
import org.apache.activemq.artemis.json.JsonObject;
@@ -31,6 +32,8 @@ import java.util.Map;
import org.apache.activemq.artemis.api.config.ActiveMQDefaultConfiguration;
import org.apache.activemq.artemis.api.core.client.ActiveMQClient;
import org.apache.activemq.artemis.core.server.ComponentConfigurationRoutingType;
+import org.apache.activemq.artemis.utils.BufferHelper;
+import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.artemis.utils.JsonLoader;
public final class BridgeConfiguration implements Serializable {
@@ -62,6 +65,7 @@ public final class BridgeConfiguration implements Serializable {
public static String CALL_TIMEOUT = "call-timeout";
public static String ROUTING_TYPE = "routing-type";
public static String CONCURRENCY = "concurrency";
+ public static String CONFIGURATION_MANAGED = "configuration-managed";
private String name = null;
@@ -118,6 +122,8 @@ public final class BridgeConfiguration implements Serializable {
private String parentName = null;
+ private boolean configurationManaged = true;
+
public BridgeConfiguration() {
}
@@ -148,6 +154,7 @@ public final class BridgeConfiguration implements Serializable {
callTimeout = other.callTimeout;
routingType = other.routingType;
concurrency = other.concurrency;
+ configurationManaged = other.configurationManaged;
}
public BridgeConfiguration(String name) {
@@ -527,6 +534,15 @@ public final class BridgeConfiguration implements Serializable {
return this;
}
+ public boolean isConfigurationManaged() {
+ return configurationManaged;
+ }
+
+ public BridgeConfiguration setConfigurationManaged(boolean configurationManaged) {
+ this.configurationManaged = configurationManaged;
+ return this;
+ }
+
public ComponentConfigurationRoutingType getRoutingType() {
return routingType;
}
@@ -611,6 +627,7 @@ public final class BridgeConfiguration implements Serializable {
builder.add(MIN_LARGE_MESSAGE_SIZE, getMinLargeMessageSize());
builder.add(CALL_TIMEOUT, getCallTimeout());
builder.add(CONCURRENCY, getConcurrency());
+ builder.add(CONFIGURATION_MANAGED, isConfigurationManaged());
// complex fields (only serialize if value is not null)
@@ -705,6 +722,7 @@ public final class BridgeConfiguration implements Serializable {
result = prime * result + (useDuplicateDetection ? 1231 : 1237);
result = prime * result + ((user == null) ? 0 : user.hashCode());
result = prime * result + concurrency;
+ result = prime * result + (configurationManaged ? 1231 : 1237);
return result;
}
@@ -790,7 +808,147 @@ public final class BridgeConfiguration implements Serializable {
return false;
if (concurrency != other.concurrency)
return false;
+ if (configurationManaged != other.configurationManaged)
+ return false;
return true;
}
+ public int getEncodeSize() {
+ int transformerSize = 0;
+ if (transformerConfiguration != null) {
+ transformerSize += BufferHelper.sizeOfNullableString(transformerConfiguration.getClassName());
+ transformerSize += DataConstants.INT;
+ Map<String, String> properties = transformerConfiguration.getProperties();
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ transformerSize += BufferHelper.sizeOfNullableString(entry.getKey());
+ transformerSize += BufferHelper.sizeOfNullableString(entry.getValue());
+ }
+ }
+ int staticConnectorSize = 0;
+ if (staticConnectors != null) {
+ staticConnectorSize += BufferHelper.sizeOfNullableInteger(staticConnectors.size());
+ for (String connector : staticConnectors) {
+ staticConnectorSize += BufferHelper.sizeOfNullableString(connector);
+ }
+ }
+ int size = BufferHelper.sizeOfNullableString(name) +
+ BufferHelper.sizeOfNullableString(parentName) +
+ BufferHelper.sizeOfNullableString(queueName) +
+ BufferHelper.sizeOfNullableString(forwardingAddress) +
+ BufferHelper.sizeOfNullableString(filterString) +
+ BufferHelper.sizeOfNullableString(discoveryGroupName) +
+ BufferHelper.sizeOfNullableBoolean(ha) +
+ BufferHelper.sizeOfNullableLong(retryInterval) +
+ BufferHelper.sizeOfNullableDouble(retryIntervalMultiplier) +
+ BufferHelper.sizeOfNullableInteger(initialConnectAttempts) +
+ BufferHelper.sizeOfNullableInteger(reconnectAttempts) +
+ BufferHelper.sizeOfNullableInteger(reconnectAttemptsOnSameNode) +
+ BufferHelper.sizeOfNullableBoolean(useDuplicateDetection) +
+ BufferHelper.sizeOfNullableInteger(confirmationWindowSize) +
+ BufferHelper.sizeOfNullableInteger(producerWindowSize) +
+ BufferHelper.sizeOfNullableLong(clientFailureCheckPeriod) +
+ BufferHelper.sizeOfNullableString(user) +
+ BufferHelper.sizeOfNullableString(password) +
+ BufferHelper.sizeOfNullableLong(connectionTTL) +
+ BufferHelper.sizeOfNullableLong(maxRetryInterval) +
+ BufferHelper.sizeOfNullableInteger(minLargeMessageSize) +
+ BufferHelper.sizeOfNullableLong(callTimeout) +
+ BufferHelper.sizeOfNullableInteger(concurrency) +
+ BufferHelper.sizeOfNullableBoolean(configurationManaged) +
+ DataConstants.SIZE_BYTE +
+ transformerSize +
+ staticConnectorSize;
+ return size;
+ }
+
+ public void encode(ActiveMQBuffer buffer) {
+ buffer.writeNullableString(name);
+ buffer.writeNullableString(parentName);
+ buffer.writeNullableString(queueName);
+ buffer.writeNullableString(forwardingAddress);
+ buffer.writeNullableString(filterString);
+ buffer.writeNullableString(discoveryGroupName);
+ buffer.writeNullableBoolean(ha);
+ buffer.writeNullableLong(retryInterval);
+ BufferHelper.writeNullableDouble(buffer, retryIntervalMultiplier);
+ buffer.writeNullableInt(initialConnectAttempts);
+ buffer.writeNullableInt(reconnectAttempts);
+ buffer.writeNullableInt(reconnectAttemptsOnSameNode);
+ buffer.writeNullableBoolean(useDuplicateDetection);
+ buffer.writeNullableInt(confirmationWindowSize);
+ buffer.writeNullableInt(producerWindowSize);
+ buffer.writeNullableLong(clientFailureCheckPeriod);
+ buffer.writeNullableString(user);
+ buffer.writeNullableString(password);
+ buffer.writeNullableLong(connectionTTL);
+ buffer.writeNullableLong(maxRetryInterval);
+ buffer.writeNullableInt(minLargeMessageSize);
+ buffer.writeNullableLong(callTimeout);
+ buffer.writeNullableInt(concurrency);
+ buffer.writeNullableBoolean(configurationManaged);
+ buffer.writeByte(routingType != null ? routingType.getType() : ComponentConfigurationRoutingType.valueOf(ActiveMQDefaultConfiguration.getDefaultDivertRoutingType()).getType());
+ if (transformerConfiguration != null) {
+ buffer.writeString(transformerConfiguration.getClassName());
+ Map<String, String> properties = transformerConfiguration.getProperties();
+ buffer.writeInt(properties.size());
+ for (Map.Entry<String, String> entry : properties.entrySet()) {
+ buffer.writeNullableString(entry.getKey());
+ buffer.writeNullableString(entry.getValue());
+ }
+ } else {
+ buffer.writeNullableString(null);
+ }
+ if (staticConnectors != null) {
+ buffer.writeInt(staticConnectors.size());
+ for (String connector : staticConnectors) {
+ buffer.writeNullableString(connector);
+ }
+ } else {
+ buffer.writeInt(0);
+ }
+ }
+
+ public void decode(ActiveMQBuffer buffer) {
+ name = buffer.readNullableString();
+ parentName = buffer.readNullableString();
+ queueName = buffer.readNullableString();
+ forwardingAddress = buffer.readNullableString();
+ filterString = buffer.readNullableString();
+ discoveryGroupName = buffer.readNullableString();
+ ha = buffer.readNullableBoolean();
+ retryInterval = buffer.readNullableLong();
+ retryIntervalMultiplier = BufferHelper.readNullableDouble(buffer);
+ initialConnectAttempts = buffer.readNullableInt();
+ reconnectAttempts = buffer.readNullableInt();
+ reconnectAttemptsOnSameNode = buffer.readNullableInt();
+ useDuplicateDetection = buffer.readNullableBoolean();
+ confirmationWindowSize = buffer.readNullableInt();
+ producerWindowSize = buffer.readNullableInt();
+ clientFailureCheckPeriod = buffer.readNullableLong();
+ user = buffer.readNullableString();
+ password = buffer.readNullableString();
+ connectionTTL = buffer.readNullableLong();
+ maxRetryInterval = buffer.readNullableLong();
+ minLargeMessageSize = buffer.readNullableInt();
+ callTimeout = buffer.readNullableLong();
+ concurrency = buffer.readNullableInt();
+ configurationManaged = buffer.readNullableBoolean();
+ routingType = ComponentConfigurationRoutingType.getType(buffer.readByte());
+ String transformerClassName = buffer.readNullableString();
+ if (transformerClassName != null) {
+ transformerConfiguration = new TransformerConfiguration(transformerClassName);
+ int propsSize = buffer.readInt();
+ for (int i = 0; i < propsSize; i++) {
+ transformerConfiguration.getProperties().put(buffer.readNullableString(), buffer.readNullableString());
+ }
+ }
+ int numStaticConnectors = buffer.readInt();
+ if (numStaticConnectors > 0) {
+ staticConnectors = new ArrayList<>();
+ for (int i = 0; i < numStaticConnectors; i++) {
+ staticConnectors.add(buffer.readNullableString());
+ }
+ }
+ }
+
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
index 6d1f4341da..8e5ce81e08 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/management/impl/ActiveMQServerControlImpl.java
@@ -3847,7 +3847,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
try {
TransformerConfiguration transformerConfiguration = transformerClassName == null || transformerClassName.isEmpty() ? null : new TransformerConfiguration(transformerClassName).setProperties(transformerProperties);
- BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateD [...]
+ BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateD [...]
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
@@ -3891,7 +3891,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
try {
TransformerConfiguration transformerConfiguration = transformerClassName == null || transformerClassName.isEmpty() ? null : new TransformerConfiguration(transformerClassName);
- BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateD [...]
+ BridgeConfiguration config = new BridgeConfiguration().setName(name).setQueueName(queueName).setForwardingAddress(forwardingAddress).setFilterString(filterString).setTransformerConfiguration(transformerConfiguration).setClientFailureCheckPeriod(clientFailureCheckPeriod).setRetryInterval(retryInterval).setRetryIntervalMultiplier(retryIntervalMultiplier).setInitialConnectAttempts(initialConnectAttempts).setReconnectAttempts(reconnectAttempts).setUseDuplicateDetection(useDuplicateD [...]
if (useDiscoveryGroup) {
config.setDiscoveryGroupName(staticConnectorsOrDiscoveryGroup);
@@ -3920,6 +3920,7 @@ public class ActiveMQServerControlImpl extends AbstractControl implements Active
if (bridgeConfiguration == null) {
throw ActiveMQMessageBundle.BUNDLE.failedToParseJson(bridgeConfigurationAsJson);
}
+ bridgeConfiguration.setConfigurationManaged(false);
server.deployBridge(bridgeConfiguration);
} catch (ActiveMQException e) {
throw new IllegalStateException(e.getMessage());
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
index 259d6fdb57..b20feb86e4 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/StorageManager.java
@@ -39,6 +39,7 @@ import org.apache.activemq.artemis.core.paging.PagingManager;
import org.apache.activemq.artemis.core.paging.PagingStore;
import org.apache.activemq.artemis.core.paging.cursor.PagePosition;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@@ -347,6 +348,12 @@ public interface StorageManager extends IDGenerator, ActiveMQComponent {
List<PersistedDivertConfiguration> recoverDivertConfigurations();
+ void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception;
+
+ void deleteBridgeConfiguration(String bridgeName) throws Exception;
+
+ List<PersistedBridgeConfiguration> recoverBridgeConfigurations();
+
void storeUser(PersistedUser persistedUser) throws Exception;
void deleteUser(String username) throws Exception;
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedBridgeConfiguration.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedBridgeConfiguration.java
new file mode 100644
index 0000000000..c7dc6cc017
--- /dev/null
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/config/PersistedBridgeConfiguration.java
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ * <p>
+ * http://www.apache.org/licenses/LICENSE-2.0
+ * <p>
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.core.persistence.config;
+
+import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.journal.EncodingSupport;
+
+public class PersistedBridgeConfiguration implements EncodingSupport {
+
+ private long storeId;
+ private BridgeConfiguration bridgeConfiguration;
+
+ @Override
+ public String toString() {
+ return "PersistedBridgeConfiguration{" + "storeId=" + storeId + ", bridgeConfiguration=" + bridgeConfiguration + '}';
+ }
+
+ public PersistedBridgeConfiguration(BridgeConfiguration bridgeConfiguration) {
+ this.bridgeConfiguration = bridgeConfiguration;
+ }
+
+ public PersistedBridgeConfiguration() {
+ bridgeConfiguration = new BridgeConfiguration();
+ }
+
+ public void setStoreId(long id) {
+ this.storeId = id;
+ }
+
+ public long getStoreId() {
+ return storeId;
+ }
+
+ @Override
+ public int getEncodeSize() {
+ return bridgeConfiguration.getEncodeSize();
+ }
+
+ @Override
+ public void encode(ActiveMQBuffer buffer) {
+ bridgeConfiguration.encode(buffer);
+ }
+
+ @Override
+ public void decode(ActiveMQBuffer buffer) {
+ bridgeConfiguration.decode(buffer);
+ }
+
+ public String getName() {
+ return bridgeConfiguration.getParentName();
+ }
+
+ public BridgeConfiguration getBridgeConfiguration() {
+ return bridgeConfiguration;
+ }
+}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
index 5157103ff9..ebbe5f7199 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/AbstractJournalStorageManager.java
@@ -73,6 +73,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@@ -223,6 +224,8 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
protected final Map<String, PersistedDivertConfiguration> mapPersistedDivertConfigurations = new ConcurrentHashMap<>();
+ protected final Map<String, PersistedBridgeConfiguration> mapPersistedBridgeConfigurations = new ConcurrentHashMap<>();
+
protected final Map<String, PersistedUser> mapPersistedUsers = new ConcurrentHashMap<>();
protected final Map<String, PersistedRole> mapPersistedRoles = new ConcurrentHashMap<>();
@@ -768,6 +771,32 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return new ArrayList<>(mapPersistedDivertConfigurations.values());
}
+ @Override
+ public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
+ deleteBridgeConfiguration(persistedBridgeConfiguration.getName());
+ try (ArtemisCloseable lock = closeableReadLock()) {
+ final long id = idGenerator.generateID();
+ persistedBridgeConfiguration.setStoreId(id);
+ bindingsJournal.appendAddRecord(id, JournalRecordIds.BRIDGE_RECORD, persistedBridgeConfiguration, true);
+ mapPersistedBridgeConfigurations.put(persistedBridgeConfiguration.getName(), persistedBridgeConfiguration);
+ }
+ }
+
+ @Override
+ public void deleteBridgeConfiguration(String bridgeName) throws Exception {
+ PersistedBridgeConfiguration oldBridge = mapPersistedBridgeConfigurations.remove(bridgeName);
+ if (oldBridge != null) {
+ try (ArtemisCloseable lock = closeableReadLock()) {
+ bindingsJournal.tryAppendDeleteRecord(oldBridge.getStoreId(), this::recordNotFoundCallback, false);
+ }
+ }
+ }
+
+ @Override
+ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
+ return new ArrayList<>(mapPersistedBridgeConfigurations.values());
+ }
+
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
deleteUser(persistedUser.getUsername());
@@ -1565,6 +1594,9 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
} else if (rec == JournalRecordIds.DIVERT_RECORD) {
PersistedDivertConfiguration divertConfiguration = newDivertEncoding(id, buffer);
mapPersistedDivertConfigurations.put(divertConfiguration.getName(), divertConfiguration);
+ } else if (rec == JournalRecordIds.BRIDGE_RECORD) {
+ PersistedBridgeConfiguration bridgeConfiguration = newBridgeEncoding(id, buffer);
+ mapPersistedBridgeConfigurations.put(bridgeConfiguration.getName(), bridgeConfiguration);
} else if (rec == JournalRecordIds.USER_RECORD) {
PersistedUser user = newUserEncoding(id, buffer);
mapPersistedUsers.put(user.getUsername(), user);
@@ -2047,6 +2079,13 @@ public abstract class AbstractJournalStorageManager extends CriticalComponentImp
return persistedDivertConfiguration;
}
+ static PersistedBridgeConfiguration newBridgeEncoding(long id, ActiveMQBuffer buffer) {
+ PersistedBridgeConfiguration persistedBridgeConfiguration = new PersistedBridgeConfiguration();
+ persistedBridgeConfiguration.decode(buffer);
+ persistedBridgeConfiguration.setStoreId(id);
+ return persistedBridgeConfiguration;
+ }
+
static PersistedUser newUserEncoding(long id, ActiveMQBuffer buffer) {
PersistedUser persistedUser = new PersistedUser();
persistedUser.decode(buffer);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
index 19840061b1..f2acfcb971 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/DescribeJournal.java
@@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.journal.impl.JournalImpl;
import org.apache.activemq.artemis.core.journal.impl.JournalReaderCallback;
import org.apache.activemq.artemis.core.paging.cursor.impl.PageSubscriptionCounterImpl;
import org.apache.activemq.artemis.core.paging.impl.PageTransactionInfoImpl;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.impl.journal.BatchingIDGenerator.IDCounterEncoding;
import org.apache.activemq.artemis.core.persistence.impl.journal.codec.CursorAckRecordEncoding;
@@ -99,6 +100,7 @@ import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalR
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.SET_SCHEDULED_DELIVERY_TIME;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.UPDATE_DELIVERY_COUNT;
import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.USER_RECORD;
+import static org.apache.activemq.artemis.core.persistence.impl.journal.JournalRecordIds.BRIDGE_RECORD;
/**
* Outputs a String description of the Journals contents.
@@ -604,6 +606,12 @@ public final class DescribeJournal {
persistedDivertConfiguration.decode(buffer);
return persistedDivertConfiguration;
+ case BRIDGE_RECORD: {
+ PersistedBridgeConfiguration persistedBridgeConfiguration = new PersistedBridgeConfiguration();
+ persistedBridgeConfiguration.decode(buffer);
+ return persistedBridgeConfiguration;
+ }
+
case ADD_LARGE_MESSAGE_PENDING: {
PendingLargeMessageEncoding lmEncoding = new PendingLargeMessageEncoding();
lmEncoding.decode(buffer);
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
index d11c61948b..1e4bea0477 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/journal/JournalRecordIds.java
@@ -49,6 +49,8 @@ public final class JournalRecordIds {
public static final byte DIVERT_RECORD = 27;
+ public static final byte BRIDGE_RECORD = 28;
+
// Message journal record types
/**
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
index e0b85b8521..186fd4b56c 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/persistence/impl/nullpm/NullStorageManager.java
@@ -47,6 +47,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@@ -466,6 +467,19 @@ public class NullStorageManager implements StorageManager {
return null;
}
+ @Override
+ public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
+ }
+
+ @Override
+ public void deleteBridgeConfiguration(String bridgeName) throws Exception {
+ }
+
+ @Override
+ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
+ return null;
+ }
+
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
}
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
index 561c60ab32..4c30573233 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ActiveMQServerImpl.java
@@ -93,6 +93,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedSecuritySetting;
import org.apache.activemq.artemis.core.persistence.impl.PageCountPending;
@@ -2945,8 +2946,17 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public boolean deployBridge(BridgeConfiguration config) throws Exception {
- if (clusterManager != null) {
- return clusterManager.deployBridge(config);
+ if (clusterManager != null && clusterManager.deployBridge(config)) {
+ //copying and modifying bridgeConfig before storing to deal with "concurrency > 1" bridges
+ for (Bridge bridge : clusterManager.getBridges().values()) {
+ BridgeConfiguration copyConfig = new BridgeConfiguration(bridge.getConfiguration());
+ if (copyConfig.getConcurrency() > 1 && !copyConfig.getName().endsWith("-0")) {
+ continue;
+ }
+ copyConfig.setName(copyConfig.getParentName());
+ storageManager.storeBridgeConfiguration(new PersistedBridgeConfiguration(copyConfig));
+ }
+ return true;
}
return false;
}
@@ -2954,7 +2964,13 @@ public class ActiveMQServerImpl implements ActiveMQServer {
@Override
public void destroyBridge(String name) throws Exception {
if (clusterManager != null) {
- clusterManager.destroyBridge(name);
+ //Iterating through all bridges to catch "concurrency > 1" bridges matching supplied name
+ for (Bridge bridge : clusterManager.getBridges().values()) {
+ if (bridge.getConfiguration().getParentName().equals(name)) {
+ clusterManager.destroyBridge(bridge.getConfiguration().getName());
+ }
+ }
+ storageManager.deleteBridgeConfiguration(name);
}
}
@@ -3431,6 +3447,8 @@ public class ActiveMQServerImpl implements ActiveMQServer {
postOffice.startExpiryScanner();
postOffice.startAddressQueueScanner();
+
+ recoverStoredBridges();
}
if (configuration.getMaxDiskUsage() != -1) {
@@ -4236,6 +4254,14 @@ public class ActiveMQServerImpl implements ActiveMQServer {
}
}
+ private void recoverStoredBridges() throws Exception {
+ if (storageManager.recoverBridgeConfigurations() != null) {
+ for (PersistedBridgeConfiguration persistedBridgeConfiguration : storageManager.recoverBridgeConfigurations()) {
+ deployBridge(persistedBridgeConfiguration.getBridgeConfiguration());
+ }
+ }
+ }
+
private void deployGroupingHandlerConfiguration(final GroupingHandlerConfiguration config) throws Exception {
if (config != null) {
GroupingHandler groupingHandler1;
@@ -4544,7 +4570,7 @@ public class ActiveMQServerImpl implements ActiveMQServer {
for (BridgeConfiguration newBridgeConfig : configuration.getBridgeConfigurations()) {
newBridgeConfig.setParentName(newBridgeConfig.getName());
Bridge existingBridge = clusterManager.getBridges().get(newBridgeConfig.getParentName());
- if (existingBridge != null && !existingBridge.getConfiguration().equals(newBridgeConfig)) {
+ if (existingBridge != null && !existingBridge.getConfiguration().equals(newBridgeConfig) && existingBridge.getConfiguration().isConfigurationManaged()) {
// this is an existing bridge but the config changed so stop the current bridge and deploy the new one
destroyBridge(existingBridge.getName().toString());
deployBridge(newBridgeConfig);
@@ -4557,11 +4583,12 @@ public class ActiveMQServerImpl implements ActiveMQServer {
List<BridgeConfiguration> newConfig = configuration.getBridgeConfigurations();
BridgeConfiguration running = new BridgeConfiguration(runningBridge.getConfiguration());
running.set("name", running.getParentName());
- if (!configuration.getBridgeConfigurations().contains(running)) {
+ if (!configuration.getBridgeConfigurations().contains(running) && running.isConfigurationManaged()) {
// this bridge is running but it isn't in the new config which means it was removed so destroy it
destroyBridge(runningBridge.getName().toString());
}
}
+ recoverStoredBridges();
}
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java
index af84a501ca..cbadbd84c8 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/config/BridgeConfigurationTest.java
@@ -192,6 +192,7 @@ public class BridgeConfigurationTest {
objectBuilder.add(BridgeConfiguration.CALL_TIMEOUT, 12);
objectBuilder.add(BridgeConfiguration.ROUTING_TYPE, "MULTICAST");
objectBuilder.add(BridgeConfiguration.CONCURRENCY, 1);
+ objectBuilder.add(BridgeConfiguration.CONFIGURATION_MANAGED, true);
return objectBuilder.build();
}
diff --git a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
index a988606786..d511c17859 100644
--- a/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
+++ b/artemis-server/src/test/java/org/apache/activemq/artemis/core/transaction/impl/TransactionImplTest.java
@@ -45,6 +45,7 @@ import org.apache.activemq.artemis.core.persistence.OperationContext;
import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@@ -622,6 +623,19 @@ public class TransactionImplTest extends ActiveMQTestBase {
return null;
}
+ @Override
+ public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
+ }
+
+ @Override
+ public void deleteBridgeConfiguration(String bridgeName) throws Exception {
+ }
+
+ @Override
+ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
+ return null;
+ }
+
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
index c166ea8213..aaf0e0906b 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/client/SendAckFailTest.java
@@ -61,6 +61,7 @@ import org.apache.activemq.artemis.core.persistence.QueueBindingInfo;
import org.apache.activemq.artemis.core.persistence.AddressQueueStatus;
import org.apache.activemq.artemis.core.persistence.StorageManager;
import org.apache.activemq.artemis.core.persistence.config.PersistedAddressSetting;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedDivertConfiguration;
import org.apache.activemq.artemis.core.persistence.config.PersistedKeyValuePair;
import org.apache.activemq.artemis.core.persistence.config.PersistedRole;
@@ -716,6 +717,19 @@ public class SendAckFailTest extends SpawnedTestBase {
return null;
}
+ @Override
+ public void storeBridgeConfiguration(PersistedBridgeConfiguration persistedBridgeConfiguration) throws Exception {
+ }
+
+ @Override
+ public void deleteBridgeConfiguration(String bridgeName) throws Exception {
+ }
+
+ @Override
+ public List<PersistedBridgeConfiguration> recoverBridgeConfigurations() {
+ return null;
+ }
+
@Override
public void storeUser(PersistedUser persistedUser) throws Exception {
manager.storeUser(persistedUser);
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java
new file mode 100644
index 0000000000..ff387077f0
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/BridgeConfigurationStorageTest.java
@@ -0,0 +1,124 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.artemis.tests.integration.persistence;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
+import org.apache.activemq.artemis.core.config.StoreConfiguration;
+import org.apache.activemq.artemis.core.config.TransformerConfiguration;
+import org.apache.activemq.artemis.core.persistence.config.PersistedBridgeConfiguration;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+
+public class BridgeConfigurationStorageTest extends StorageManagerTestBase {
+
+ public BridgeConfigurationStorageTest(StoreConfiguration.StoreType storeType) {
+ super(storeType);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ super.setUp();
+ }
+
+ @Test
+ public void testStoreBridgeConfiguration() throws Exception {
+ createStorage();
+
+ BridgeConfiguration configuration = new BridgeConfiguration();
+ configuration.setName("name");
+ configuration.setParentName("name");
+ configuration.setQueueName("QueueName");
+ configuration.setConcurrency(2);
+ configuration.setForwardingAddress("forward");
+ configuration.setProducerWindowSize(123123);
+ configuration.setConfirmationWindowSize(123123);
+ configuration.setStaticConnectors(Arrays.asList("connector1", "connector2"));
+ TransformerConfiguration mytransformer = new TransformerConfiguration("mytransformer");
+ mytransformer.getProperties().put("key1", "prop1");
+ mytransformer.getProperties().put("key2", "prop2");
+ mytransformer.getProperties().put("key3", "prop3");
+ configuration.setTransformerConfiguration(mytransformer);
+
+ journal.storeBridgeConfiguration(new PersistedBridgeConfiguration(configuration));
+
+ journal.stop();
+ journal.start();
+
+ List<PersistedBridgeConfiguration> bridgeConfigurations = journal.recoverBridgeConfigurations();
+
+ Assert.assertEquals(1, bridgeConfigurations.size());
+
+ PersistedBridgeConfiguration persistedBridgeConfiguration = bridgeConfigurations.get(0);
+ Assert.assertEquals(configuration.getName(), persistedBridgeConfiguration.getBridgeConfiguration().getName());
+ Assert.assertEquals(configuration.getQueueName(), persistedBridgeConfiguration.getBridgeConfiguration().getQueueName());
+ Assert.assertEquals(configuration.getConcurrency(), persistedBridgeConfiguration.getBridgeConfiguration().getConcurrency());
+ Assert.assertEquals(configuration.getForwardingAddress(), persistedBridgeConfiguration.getBridgeConfiguration().getForwardingAddress());
+ Assert.assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getBridgeConfiguration().getStaticConnectors());
+ Assert.assertNotNull(persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration());
+ Assert.assertEquals("mytransformer", persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration().getClassName());
+ Map<String, String> properties = persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration().getProperties();
+ Assert.assertEquals(3, properties.size());
+ Assert.assertEquals("prop1", properties.get("key1"));
+ Assert.assertEquals("prop2", properties.get("key2"));
+ Assert.assertEquals("prop3", properties.get("key3"));
+ journal.stop();
+
+ journal = null;
+
+ }
+
+ @Test
+ public void testStoreBridgeConfigurationNoTransformer() throws Exception {
+ createStorage();
+
+ BridgeConfiguration configuration = new BridgeConfiguration();
+ configuration.setName("name");
+ configuration.setQueueName("QueueName");
+ configuration.setConcurrency(2);
+ configuration.setForwardingAddress("forward");
+ configuration.setStaticConnectors(Arrays.asList("connector1", "connector2"));
+
+ journal.storeBridgeConfiguration(new PersistedBridgeConfiguration(configuration));
+
+ journal.stop();
+
+ journal.start();
+
+ List<PersistedBridgeConfiguration> bridgeConfigurations = journal.recoverBridgeConfigurations();
+
+ Assert.assertEquals(1, bridgeConfigurations.size());
+
+ PersistedBridgeConfiguration persistedBridgeConfiguration = bridgeConfigurations.get(0);
+ Assert.assertEquals(configuration.getName(), persistedBridgeConfiguration.getBridgeConfiguration().getName());
+ Assert.assertEquals(configuration.getQueueName(), persistedBridgeConfiguration.getBridgeConfiguration().getQueueName());
+ Assert.assertEquals(configuration.getConcurrency(), persistedBridgeConfiguration.getBridgeConfiguration().getConcurrency());
+ Assert.assertEquals(configuration.getForwardingAddress(), persistedBridgeConfiguration.getBridgeConfiguration().getForwardingAddress());
+ Assert.assertEquals(configuration.getStaticConnectors(), persistedBridgeConfiguration.getBridgeConfiguration().getStaticConnectors());
+ Assert.assertNull(persistedBridgeConfiguration.getBridgeConfiguration().getTransformerConfiguration());
+ journal.stop();
+
+ journal = null;
+
+ }
+
+}
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
index a04bc4ee29..1fc0675a34 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/persistence/ConfigChangeTest.java
@@ -21,17 +21,23 @@ import javax.jms.JMSContext;
import javax.jms.Message;
import javax.jms.TextMessage;
import java.util.ArrayList;
+import java.util.Arrays;
import java.util.List;
import org.apache.activemq.artemis.api.core.QueueConfiguration;
import org.apache.activemq.artemis.api.core.RoutingType;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.api.core.client.ClientSession;
+import org.apache.activemq.artemis.api.core.client.ClientSessionFactory;
+import org.apache.activemq.artemis.api.core.client.ServerLocator;
+import org.apache.activemq.artemis.core.config.BridgeConfiguration;
import org.apache.activemq.artemis.core.config.Configuration;
import org.apache.activemq.artemis.core.config.CoreAddressConfiguration;
import org.apache.activemq.artemis.core.server.ActiveMQServer;
import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory;
import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
+import org.junit.Assert;
import org.junit.Test;
public class ConfigChangeTest extends ActiveMQTestBase {
@@ -142,4 +148,49 @@ public class ConfigChangeTest extends ActiveMQTestBase {
server.stop();
}
+
+ @Test
+ public void bridgeConfigChagesPersist() throws Exception {
+
+ server = createServer(true);
+ server.start();
+
+ ServerLocator locator = createInVMNonHALocator();
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = addClientSession(sf.createSession(false, true, true));
+
+ String bridgeName = "bridgeName";
+ String queue = "Q1";
+ String forward = "Q2";
+
+ session.createQueue(new QueueConfiguration("Q1").setAddress("Q1").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
+ session.createQueue(new QueueConfiguration("Q2").setAddress("Q2").setRoutingType(RoutingType.ANYCAST).setAutoDelete(false));
+ session.close();
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration().setName(bridgeName)
+ .setQueueName(queue)
+ .setConcurrency(2)
+ .setForwardingAddress(forward)
+ .setProducerWindowSize(1234)
+ .setConfirmationWindowSize(1234)
+ .setStaticConnectors(Arrays.asList("connector1", "connector2"));
+
+ server.getActiveMQServerControl().addConnector("connector1", "tcp://localhost:61616");
+ server.getActiveMQServerControl().addConnector("connector2", "tcp://localhost:61616");
+ server.getActiveMQServerControl().createBridge(bridgeConfiguration.toJSON());
+
+ Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);
+ server.stop();
+ server.start();
+ Assert.assertEquals(2, server.getActiveMQServerControl().getBridgeNames().length);
+
+ server.getActiveMQServerControl().destroyBridge(bridgeName);
+ Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
+ server.stop();
+ server.start();
+ Assert.assertEquals(0, server.getActiveMQServerControl().getBridgeNames().length);
+ server.stop();
+
+ }
+
}