You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2017/12/21 03:16:31 UTC
activemq git commit: AMQ-6861 Allow customisation of network bridge
creation logic.
Repository: activemq
Updated Branches:
refs/heads/master ab2711abb -> a8a032af0
AMQ-6861 Allow customisation of network bridge creation logic.
Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a8a032af
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a8a032af
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a8a032af
Branch: refs/heads/master
Commit: a8a032af091ae2f6d1affe95b37bb424214d1990
Parents: ab2711a
Author: Ćukasz Dywicki <lu...@code-house.org>
Authored: Mon Nov 13 15:06:22 2017 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 20 22:00:54 2017 -0500
----------------------------------------------------------------------
.../activemq/broker/TransportConnection.java | 2 +-
.../apache/activemq/network/BridgeFactory.java | 23 ++
.../network/DiscoveryNetworkConnector.java | 2 +-
.../network/NetworkBridgeConfiguration.java | 13 ++
.../activemq/network/NetworkBridgeFactory.java | 42 ++--
.../activemq/network/BaseNetworkTest.java | 96 ++++++++
.../network/CustomBridgeFactoryTest.java | 217 +++++++++++++++++++
.../activemq/network/SimpleNetworkTest.java | 98 +--------
.../network/localBroker-custom-factory.xml | 40 ++++
9 files changed, 428 insertions(+), 105 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b18881d..967e977 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1442,7 +1442,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
}
MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
listener.setCreatedByDuplex(true);
- duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
+ duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener);
duplexBridge.setBrokerService(brokerService);
//Need to set durableDestinations to properly restart subs when dynamicOnly=false
duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java
new file mode 100644
index 0000000..af6a6db
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java
@@ -0,0 +1,23 @@
+package org.apache.activemq.network;
+
+import org.apache.activemq.transport.Transport;
+
+/**
+ * Encapsulation of bridge creation logic.
+ *
+ * This SPI interface is intended to customize or decorate existing bridge implementations.
+ */
+public interface BridgeFactory {
+
+ /**
+ * Create a network bridge between two specified transports.
+ *
+ * @param configuration Bridge configuration.
+ * @param localTransport Local side of bridge.
+ * @param remoteTransport Remote side of bridge.
+ * @param listener Bridge listener.
+ * @return the NetworkBridge
+ */
+ DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, final NetworkBridgeListener listener);
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
index e05c42c..3850da5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
@@ -256,7 +256,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
}
NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
- DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
+ DemandForwardingBridge result = getBridgeFactory().createNetworkBridge(this, localTransport, remoteTransport, listener);
result.setBrokerService(getBrokerService());
return configureBridge(result);
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 3c64758..1adff09 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -78,6 +78,11 @@ public class NetworkBridgeConfiguration {
private boolean checkDuplicateMessagesOnDuplex = false;
/**
+ * Bridge factory implementation - by default backed by static factory, which is default implementation and will rely change.
+ */
+ private BridgeFactory bridgeFactory = NetworkBridgeFactory.INSTANCE;
+
+ /**
* @return the conduitSubscriptions
*/
public boolean isConduitSubscriptions() {
@@ -541,6 +546,14 @@ public class NetworkBridgeConfiguration {
return useVirtualDestSubs;
}
+ public BridgeFactory getBridgeFactory() {
+ return bridgeFactory;
+ }
+
+ public void setBridgeFactory(BridgeFactory bridgeFactory) {
+ this.bridgeFactory = bridgeFactory;
+ }
+
/**
* This was a typo, so this is deprecated as of 5.13.1
*/
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
index 32a7853..0ac56dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
@@ -18,7 +18,10 @@ package org.apache.activemq.network;
import java.net.URI;
import java.util.HashMap;
-import org.apache.activemq.broker.Broker;
+import java.util.LinkedHashSet;
+import java.util.ServiceLoader;
+import java.util.Set;
+
import org.apache.activemq.transport.Transport;
import org.apache.activemq.transport.TransportFactory;
import org.apache.activemq.util.URISupport;
@@ -28,13 +31,32 @@ import org.apache.activemq.util.URISupport;
*
*
*/
-public final class NetworkBridgeFactory {
+public final class NetworkBridgeFactory implements BridgeFactory {
+
+ public final static BridgeFactory INSTANCE = new NetworkBridgeFactory();
private NetworkBridgeFactory() {
+
+ }
+
+ @Override
+ public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
+ if (configuration.isConduitSubscriptions()) {
+ // dynamicOnly determines whether durables are auto bridged
+ return attachListener(new DurableConduitBridge(configuration, localTransport, remoteTransport), listener);
+ }
+ return attachListener(new DemandForwardingBridge(configuration, localTransport, remoteTransport), listener);
+ }
+
+ private DemandForwardingBridge attachListener(DemandForwardingBridge bridge, NetworkBridgeListener listener) {
+ if (listener != null) {
+ bridge.setNetworkBridgeListener(listener);
+ }
+ return bridge;
}
/**
- * create a network bridge
+ * Create a network bridge
*
* @param configuration
* @param localTransport
@@ -42,20 +64,11 @@ public final class NetworkBridgeFactory {
* @param listener
* @return the NetworkBridge
*/
+ @Deprecated
public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
Transport localTransport, Transport remoteTransport,
final NetworkBridgeListener listener) {
- DemandForwardingBridge result = null;
- if (configuration.isConduitSubscriptions()) {
- // dynamicOnly determines whether durables are auto bridged
- result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
- } else {
- result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
- }
- if (listener != null) {
- result.setNetworkBridgeListener(listener);
- }
- return result;
+ return INSTANCE.createNetworkBridge(configuration, localTransport, remoteTransport, listener);
}
public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
@@ -74,4 +87,5 @@ public final class NetworkBridgeFactory {
uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
return TransportFactory.connect(uri);
}
+
}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
new file mode 100644
index 0000000..6bb9384
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
@@ -0,0 +1,96 @@
+package org.apache.activemq.network;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+public class BaseNetworkTest {
+
+ protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+ protected Connection localConnection;
+ protected Connection remoteConnection;
+ protected BrokerService localBroker;
+ protected BrokerService remoteBroker;
+ protected Session localSession;
+ protected Session remoteSession;
+
+ @Before
+ public final void setUp() throws Exception {
+ doSetUp(true);
+ }
+
+ @After
+ public final void tearDown() throws Exception {
+ doTearDown();
+ }
+
+ protected void doTearDown() throws Exception {
+ localConnection.close();
+ remoteConnection.close();
+ localBroker.stop();
+ remoteBroker.stop();
+ }
+
+ protected void doSetUp(boolean deleteAllMessages) throws Exception {
+ remoteBroker = createRemoteBroker();
+ remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ remoteBroker.start();
+ remoteBroker.waitUntilStarted();
+ localBroker = createLocalBroker();
+ localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+ localBroker.start();
+ localBroker.waitUntilStarted();
+ URI localURI = localBroker.getVmConnectorURI();
+ ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+ fac.setAlwaysSyncSend(true);
+ fac.setDispatchAsync(false);
+ localConnection = fac.createConnection();
+ localConnection.setClientID("clientId");
+ localConnection.start();
+ URI remoteURI = remoteBroker.getVmConnectorURI();
+ fac = new ActiveMQConnectionFactory(remoteURI);
+ remoteConnection = fac.createConnection();
+ remoteConnection.setClientID("clientId");
+ remoteConnection.start();
+ localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ protected String getRemoteBrokerURI() {
+ return "org/apache/activemq/network/remoteBroker.xml";
+ }
+
+ protected String getLocalBrokerURI() {
+ return "org/apache/activemq/network/localBroker.xml";
+ }
+
+ protected BrokerService createBroker(String uri) throws Exception {
+ Resource resource = new ClassPathResource(uri);
+ BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+ resource = new ClassPathResource(uri);
+ factory = new BrokerFactoryBean(resource);
+ factory.afterPropertiesSet();
+ BrokerService result = factory.getBroker();
+ return result;
+ }
+
+ protected BrokerService createLocalBroker() throws Exception {
+ return createBroker(getLocalBrokerURI());
+ }
+
+ protected BrokerService createRemoteBroker() throws Exception {
+ return createBroker(getRemoteBrokerURI());
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java
new file mode 100644
index 0000000..3340584
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.transport.Transport;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Basic test which verify if custom bridge factory receives any interactions when configured.
+ */
+public class CustomBridgeFactoryTest extends BaseNetworkTest {
+
+ private ActiveMQQueue outgoing = new ActiveMQQueue("outgoing");
+
+ /**
+ * Verification of outgoing communication - from local broker (with customized bridge configured) to remote one.
+ */
+ @Test
+ public void verifyOutgoingCommunication() throws JMSException {
+ CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
+ NetworkBridgeListener listener = bridgeFactory.getListener();
+
+ verify(listener).onStart(any(NetworkBridge.class));
+ verifyNoMoreInteractions(listener);
+
+ send(localSession, outgoing, localSession.createTextMessage("test message"));
+ assertNotNull("Message didn't arrive", receive(remoteSession, outgoing));
+
+ verify(listener).onOutboundMessage(any(NetworkBridge.class), any(Message.class));
+ verifyNoMoreInteractions(listener);
+ }
+
+ /**
+ * Additional test which makes sure that custom bridge receives notification about broker shutdown.
+ */
+ @Test
+ public void verifyBrokerShutdown() {
+ shutdownTest(() -> {
+ try {
+ localBroker.stop();
+ } catch (Exception e) {
+ return e;
+ }
+ return null;
+ });
+ }
+
+ /**
+ * Verification of network connector shutdown.
+ */
+ @Test
+ public void verifyConnectorShutdown() {
+ shutdownTest(() -> {
+ try {
+ getLocalConnector(0).stop();
+ } catch (Exception e) {
+ return e;
+ }
+ return null;
+ });
+ }
+
+ private void shutdownTest(Supplier<Throwable> callback) {
+ CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
+ NetworkBridgeListener listener = bridgeFactory.getListener();
+
+ verify(listener).onStart(any(NetworkBridge.class));
+ verifyNoMoreInteractions(listener);
+
+ Throwable throwable = callback.get();
+ assertNull("Unexpected error", throwable);
+
+ verify(listener).onStop(any(NetworkBridge.class));
+ verifyNoMoreInteractions(listener);
+ }
+
+ // helper methods
+ private void send(Session session, ActiveMQQueue destination, TextMessage message) throws JMSException {
+ MessageProducer producer = session.createProducer(destination);
+ try {
+ producer.send(message);
+ } finally {
+ producer.close();
+ }
+ }
+
+ private javax.jms.Message receive(Session session, ActiveMQQueue destination) throws JMSException {
+ MessageConsumer consumer = session.createConsumer(destination);
+ try {
+ return consumer.receive(TimeUnit.SECONDS.toMillis(5));
+ } finally {
+ consumer.close();
+ }
+ }
+
+ // infrastructure operations digging for connectors in running broker
+ private CustomNetworkBridgeFactory getCustomNetworkBridgeFactory() {
+ NetworkConnector connector = getLocalConnector(0);
+
+ assertTrue(connector.getBridgeFactory() instanceof CustomNetworkBridgeFactory);
+
+ return (CustomNetworkBridgeFactory) connector.getBridgeFactory();
+ }
+
+ private NetworkConnector getLocalConnector(int index) {
+ return localBroker.getNetworkConnectors().get(index);
+ }
+
+ // customizations
+ protected String getLocalBrokerURI() {
+ return "org/apache/activemq/network/localBroker-custom-factory.xml";
+ }
+
+ // test classes
+ static class CustomNetworkBridgeFactory implements BridgeFactory {
+
+ private final NetworkBridgeListener listener;
+
+ CustomNetworkBridgeFactory() {
+ this(Mockito.mock(NetworkBridgeListener.class));
+ }
+
+ CustomNetworkBridgeFactory(NetworkBridgeListener listener) {
+ this.listener = listener;
+ }
+
+ public NetworkBridgeListener getListener() {
+ return listener;
+ }
+
+ @Override
+ public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
+ DemandForwardingBridge bridge = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
+ bridge.setNetworkBridgeListener(new CompositeNetworkBridgeListener(this.listener, listener));
+ return bridge;
+ }
+
+ }
+
+ static class CompositeNetworkBridgeListener implements NetworkBridgeListener {
+
+ private final List<NetworkBridgeListener> listeners;
+
+ public CompositeNetworkBridgeListener(NetworkBridgeListener ... wrapped) {
+ this.listeners = Arrays.asList(wrapped);
+ }
+
+ @Override
+ public void bridgeFailed() {
+ for (NetworkBridgeListener listener : listeners) {
+ listener.bridgeFailed();
+ }
+ }
+
+ @Override
+ public void onStart(NetworkBridge bridge) {
+ for (NetworkBridgeListener listener : listeners) {
+ listener.onStart(bridge);
+ }
+ }
+
+ @Override
+ public void onStop(NetworkBridge bridge) {
+ for (NetworkBridgeListener listener : listeners) {
+ listener.onStop(bridge);
+ }
+ }
+
+ @Override
+ public void onOutboundMessage(NetworkBridge bridge, Message message) {
+ for (NetworkBridgeListener listener : listeners) {
+ listener.onOutboundMessage(bridge, message);
+ }
+ }
+
+ @Override
+ public void onInboundMessage(NetworkBridge bridge, Message message) {
+ for (NetworkBridgeListener listener : listeners) {
+ listener.onInboundMessage(bridge, message);
+ }
+ }
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index 2df48dd..c030db5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -21,11 +21,9 @@ import static org.junit.Assert.assertNotNull;
import static org.junit.Assert.assertNull;
import static org.junit.Assert.assertTrue;
-import java.net.URI;
import java.util.Arrays;
import java.util.concurrent.ConcurrentMap;
-import javax.jms.Connection;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.JMSException;
@@ -33,13 +31,11 @@ import javax.jms.Message;
import javax.jms.MessageConsumer;
import javax.jms.MessageListener;
import javax.jms.MessageProducer;
-import javax.jms.Session;
import javax.jms.TextMessage;
import javax.jms.TopicRequestor;
import javax.jms.TopicSession;
import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
@@ -48,33 +44,27 @@ import org.apache.activemq.command.ActiveMQTopic;
import org.apache.activemq.command.ConsumerId;
import org.apache.activemq.util.Wait;
import org.apache.activemq.util.Wait.Condition;
-import org.apache.activemq.xbean.BrokerFactoryBean;
-import org.junit.After;
-import org.junit.Before;
import org.junit.Ignore;
import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
import org.springframework.context.support.AbstractApplicationContext;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
-public class SimpleNetworkTest {
+public class SimpleNetworkTest extends BaseNetworkTest {
protected static final int MESSAGE_COUNT = 10;
- private static final Logger LOG = LoggerFactory.getLogger(SimpleNetworkTest.class);
protected AbstractApplicationContext context;
- protected Connection localConnection;
- protected Connection remoteConnection;
- protected BrokerService localBroker;
- protected BrokerService remoteBroker;
- protected Session localSession;
- protected Session remoteSession;
protected ActiveMQTopic included;
protected ActiveMQTopic excluded;
protected String consumerName = "durableSubs";
+ @Override
+ protected void doSetUp(boolean deleteAllMessages) throws Exception {
+ super.doSetUp(deleteAllMessages);
+
+ included = new ActiveMQTopic("include.test.bar");
+ excluded = new ActiveMQTopic("exclude.test.bar");
+ }
+
// works b/c of non marshaling vm transport, the connection
// ref from the client is used during the forward
@Test(timeout = 60 * 1000)
@@ -364,76 +354,6 @@ public class SimpleNetworkTest {
}
}
- @Before
- public void setUp() throws Exception {
- doSetUp(true);
- }
-
- @After
- public void tearDown() throws Exception {
- doTearDown();
- }
-
- protected void doTearDown() throws Exception {
- localConnection.close();
- remoteConnection.close();
- localBroker.stop();
- remoteBroker.stop();
- }
-
- protected void doSetUp(boolean deleteAllMessages) throws Exception {
- remoteBroker = createRemoteBroker();
- remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
- remoteBroker.start();
- remoteBroker.waitUntilStarted();
- localBroker = createLocalBroker();
- localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
- localBroker.start();
- localBroker.waitUntilStarted();
- URI localURI = localBroker.getVmConnectorURI();
- ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
- fac.setAlwaysSyncSend(true);
- fac.setDispatchAsync(false);
- localConnection = fac.createConnection();
- localConnection.setClientID("clientId");
- localConnection.start();
- URI remoteURI = remoteBroker.getVmConnectorURI();
- fac = new ActiveMQConnectionFactory(remoteURI);
- remoteConnection = fac.createConnection();
- remoteConnection.setClientID("clientId");
- remoteConnection.start();
- included = new ActiveMQTopic("include.test.bar");
- excluded = new ActiveMQTopic("exclude.test.bar");
- localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- }
-
- protected String getRemoteBrokerURI() {
- return "org/apache/activemq/network/remoteBroker.xml";
- }
-
- protected String getLocalBrokerURI() {
- return "org/apache/activemq/network/localBroker.xml";
- }
-
- protected BrokerService createBroker(String uri) throws Exception {
- Resource resource = new ClassPathResource(uri);
- BrokerFactoryBean factory = new BrokerFactoryBean(resource);
- resource = new ClassPathResource(uri);
- factory = new BrokerFactoryBean(resource);
- factory.afterPropertiesSet();
- BrokerService result = factory.getBroker();
- return result;
- }
-
- protected BrokerService createLocalBroker() throws Exception {
- return createBroker(getLocalBrokerURI());
- }
-
- protected BrokerService createRemoteBroker() throws Exception {
- return createBroker(getRemoteBrokerURI());
- }
-
protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();
http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml
new file mode 100644
index 0000000..9dc8c61
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+<beans
+ xmlns="http://www.springframework.org/schema/beans"
+ xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+ http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+ http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+ <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+<broker brokerName="localBroker" start="false" deleteAllMessagesOnStartup="true" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
+ <networkConnectors>
+ <networkConnector uri="static:(tcp://localhost:61617)" name="networkConnector">
+ <bridgeFactory>
+ <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.network.CustomBridgeFactoryTest.CustomNetworkBridgeFactory" />
+ </bridgeFactory>
+ </networkConnector>
+ </networkConnectors>
+
+ <transportConnectors>
+ <transportConnector uri="tcp://localhost:61616"/>
+ </transportConnectors>
+
+ </broker>
+</beans>
\ No newline at end of file