You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ha...@apache.org on 2017/12/21 03:16:31 UTC

activemq git commit: AMQ-6861 Allow customisation of network bridge creation logic.

Repository: activemq
Updated Branches:
  refs/heads/master ab2711abb -> a8a032af0


AMQ-6861 Allow customisation of network bridge creation logic.


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/a8a032af
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/a8a032af
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/a8a032af

Branch: refs/heads/master
Commit: a8a032af091ae2f6d1affe95b37bb424214d1990
Parents: ab2711a
Author: Ɓukasz Dywicki <lu...@code-house.org>
Authored: Mon Nov 13 15:06:22 2017 +0100
Committer: Hadrian Zbarcea <ha...@apache.org>
Committed: Wed Dec 20 22:00:54 2017 -0500

----------------------------------------------------------------------
 .../activemq/broker/TransportConnection.java    |   2 +-
 .../apache/activemq/network/BridgeFactory.java  |  23 ++
 .../network/DiscoveryNetworkConnector.java      |   2 +-
 .../network/NetworkBridgeConfiguration.java     |  13 ++
 .../activemq/network/NetworkBridgeFactory.java  |  42 ++--
 .../activemq/network/BaseNetworkTest.java       |  96 ++++++++
 .../network/CustomBridgeFactoryTest.java        | 217 +++++++++++++++++++
 .../activemq/network/SimpleNetworkTest.java     |  98 +--------
 .../network/localBroker-custom-factory.xml      |  40 ++++
 9 files changed, 428 insertions(+), 105 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
index b18881d..967e977 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/TransportConnection.java
@@ -1442,7 +1442,7 @@ public class TransportConnection implements Connection, Task, CommandVisitor {
                 }
                 MBeanNetworkListener listener = new MBeanNetworkListener(brokerService, config, brokerService.createDuplexNetworkConnectorObjectName(duplexName));
                 listener.setCreatedByDuplex(true);
-                duplexBridge = NetworkBridgeFactory.createBridge(config, localTransport, remoteBridgeTransport, listener);
+                duplexBridge = config.getBridgeFactory().createNetworkBridge(config, localTransport, remoteBridgeTransport, listener);
                 duplexBridge.setBrokerService(brokerService);
                 //Need to set durableDestinations to properly restart subs when dynamicOnly=false
                 duplexBridge.setDurableDestinations(NetworkConnector.getDurableTopicDestinations(

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java
new file mode 100644
index 0000000..af6a6db
--- /dev/null
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/BridgeFactory.java
@@ -0,0 +1,23 @@
+package org.apache.activemq.network;
+
+import org.apache.activemq.transport.Transport;
+
+/**
+ * Encapsulation of bridge creation logic.
+ *
+ * This SPI interface is intended to customize or decorate existing bridge implementations.
+ */
+public interface BridgeFactory {
+
+    /**
+     * Create a network bridge between two specified transports.
+     *
+     * @param configuration Bridge configuration.
+     * @param localTransport Local side of bridge.
+     * @param remoteTransport Remote side of bridge.
+     * @param listener Bridge listener.
+     * @return the NetworkBridge
+     */
+    DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, final NetworkBridgeListener listener);
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
index e05c42c..3850da5 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/DiscoveryNetworkConnector.java
@@ -256,7 +256,7 @@ public class DiscoveryNetworkConnector extends NetworkConnector implements Disco
         }
         NetworkBridgeListener listener = new DiscoverNetworkBridgeListener(getBrokerService(), getObjectName());
 
-        DemandForwardingBridge result = NetworkBridgeFactory.createBridge(this, localTransport, remoteTransport, listener);
+        DemandForwardingBridge result = getBridgeFactory().createNetworkBridge(this, localTransport, remoteTransport, listener);
         result.setBrokerService(getBrokerService());
         return configureBridge(result);
     }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
index 3c64758..1adff09 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeConfiguration.java
@@ -78,6 +78,11 @@ public class NetworkBridgeConfiguration {
     private boolean checkDuplicateMessagesOnDuplex = false;
 
     /**
+     * Bridge factory implementation - by default backed by static factory, which is default implementation and will rely change.
+     */
+    private BridgeFactory bridgeFactory = NetworkBridgeFactory.INSTANCE;
+
+    /**
      * @return the conduitSubscriptions
      */
     public boolean isConduitSubscriptions() {
@@ -541,6 +546,14 @@ public class NetworkBridgeConfiguration {
         return useVirtualDestSubs;
     }
 
+    public BridgeFactory getBridgeFactory() {
+        return bridgeFactory;
+    }
+
+    public void setBridgeFactory(BridgeFactory bridgeFactory) {
+        this.bridgeFactory = bridgeFactory;
+    }
+
     /**
      * This was a typo, so this is deprecated as of 5.13.1
      */

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
index 32a7853..0ac56dd 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/network/NetworkBridgeFactory.java
@@ -18,7 +18,10 @@ package org.apache.activemq.network;
 
 import java.net.URI;
 import java.util.HashMap;
-import org.apache.activemq.broker.Broker;
+import java.util.LinkedHashSet;
+import java.util.ServiceLoader;
+import java.util.Set;
+
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFactory;
 import org.apache.activemq.util.URISupport;
@@ -28,13 +31,32 @@ import org.apache.activemq.util.URISupport;
  * 
  * 
  */
-public final class NetworkBridgeFactory {
+public final class NetworkBridgeFactory implements BridgeFactory {
+
+    public final static BridgeFactory INSTANCE = new NetworkBridgeFactory();
 
     private NetworkBridgeFactory() {
+
+    }
+
+    @Override
+    public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
+        if (configuration.isConduitSubscriptions()) {
+            // dynamicOnly determines whether durables are auto bridged
+            return attachListener(new DurableConduitBridge(configuration, localTransport, remoteTransport), listener);
+        }
+        return attachListener(new DemandForwardingBridge(configuration, localTransport, remoteTransport), listener);
+    }
+
+    private DemandForwardingBridge attachListener(DemandForwardingBridge bridge, NetworkBridgeListener listener) {
+        if (listener != null) {
+            bridge.setNetworkBridgeListener(listener);
+        }
+        return bridge;
     }
 
     /**
-     * create a network bridge
+     * Create a network bridge
      * 
      * @param configuration
      * @param localTransport
@@ -42,20 +64,11 @@ public final class NetworkBridgeFactory {
      * @param listener
      * @return the NetworkBridge
      */
+    @Deprecated
     public static DemandForwardingBridge createBridge(NetworkBridgeConfiguration configuration,
                                                       Transport localTransport, Transport remoteTransport,
                                                       final NetworkBridgeListener listener) {
-        DemandForwardingBridge result = null;
-        if (configuration.isConduitSubscriptions()) {
-            // dynamicOnly determines whether durables are auto bridged
-            result = new DurableConduitBridge(configuration, localTransport, remoteTransport);
-        } else {
-            result = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
-        }
-        if (listener != null) {
-            result.setNetworkBridgeListener(listener);
-        }
-        return result;
+        return INSTANCE.createNetworkBridge(configuration, localTransport, remoteTransport, listener);
     }
 
     public static Transport createLocalTransport(NetworkBridgeConfiguration configuration, URI uri) throws Exception {
@@ -74,4 +87,5 @@ public final class NetworkBridgeFactory {
         uri = URISupport.createURIWithQuery(uri, URISupport.createQueryString(map));
         return TransportFactory.connect(uri);
     }
+
 }

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
new file mode 100644
index 0000000..6bb9384
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/BaseNetworkTest.java
@@ -0,0 +1,96 @@
+package org.apache.activemq.network;
+
+import java.net.URI;
+
+import javax.jms.Connection;
+import javax.jms.Session;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.xbean.BrokerFactoryBean;
+import org.junit.After;
+import org.junit.Before;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.springframework.core.io.ClassPathResource;
+import org.springframework.core.io.Resource;
+
+public class BaseNetworkTest {
+
+    protected final Logger LOG = LoggerFactory.getLogger(getClass());
+
+    protected Connection localConnection;
+    protected Connection remoteConnection;
+    protected BrokerService localBroker;
+    protected BrokerService remoteBroker;
+    protected Session localSession;
+    protected Session remoteSession;
+
+    @Before
+    public final void setUp() throws Exception {
+        doSetUp(true);
+    }
+
+    @After
+    public final void tearDown() throws Exception {
+        doTearDown();
+    }
+
+    protected void doTearDown() throws Exception {
+        localConnection.close();
+        remoteConnection.close();
+        localBroker.stop();
+        remoteBroker.stop();
+    }
+
+    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+        remoteBroker = createRemoteBroker();
+        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        remoteBroker.start();
+        remoteBroker.waitUntilStarted();
+        localBroker = createLocalBroker();
+        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
+        localBroker.start();
+        localBroker.waitUntilStarted();
+        URI localURI = localBroker.getVmConnectorURI();
+        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
+        fac.setAlwaysSyncSend(true);
+        fac.setDispatchAsync(false);
+        localConnection = fac.createConnection();
+        localConnection.setClientID("clientId");
+        localConnection.start();
+        URI remoteURI = remoteBroker.getVmConnectorURI();
+        fac = new ActiveMQConnectionFactory(remoteURI);
+        remoteConnection = fac.createConnection();
+        remoteConnection.setClientID("clientId");
+        remoteConnection.start();
+        localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+    }
+
+    protected String getRemoteBrokerURI() {
+        return "org/apache/activemq/network/remoteBroker.xml";
+    }
+
+    protected String getLocalBrokerURI() {
+        return "org/apache/activemq/network/localBroker.xml";
+    }
+
+    protected BrokerService createBroker(String uri) throws Exception {
+        Resource resource = new ClassPathResource(uri);
+        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
+        resource = new ClassPathResource(uri);
+        factory = new BrokerFactoryBean(resource);
+        factory.afterPropertiesSet();
+        BrokerService result = factory.getBroker();
+        return result;
+    }
+
+    protected BrokerService createLocalBroker() throws Exception {
+        return createBroker(getLocalBrokerURI());
+    }
+
+    protected BrokerService createRemoteBroker() throws Exception {
+        return createBroker(getRemoteBrokerURI());
+    }
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java
new file mode 100644
index 0000000..3340584
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/CustomBridgeFactoryTest.java
@@ -0,0 +1,217 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.network;
+
+import static org.junit.Assert.*;
+import static org.mockito.Matchers.any;
+import static org.mockito.Mockito.verify;
+import static org.mockito.Mockito.verifyNoMoreInteractions;
+
+import java.util.Arrays;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+import java.util.function.Supplier;
+
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.Message;
+import org.apache.activemq.transport.Transport;
+import org.junit.Test;
+import org.mockito.Mockito;
+
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+/**
+ * Basic test which verify if custom bridge factory receives any interactions when configured.
+ */
+public class CustomBridgeFactoryTest extends BaseNetworkTest {
+
+    private ActiveMQQueue outgoing = new ActiveMQQueue("outgoing");
+
+    /**
+     * Verification of outgoing communication - from local broker (with customized bridge configured) to remote one.
+     */
+    @Test
+    public void verifyOutgoingCommunication() throws JMSException {
+        CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
+        NetworkBridgeListener listener = bridgeFactory.getListener();
+
+        verify(listener).onStart(any(NetworkBridge.class));
+        verifyNoMoreInteractions(listener);
+
+        send(localSession, outgoing, localSession.createTextMessage("test message"));
+        assertNotNull("Message didn't arrive", receive(remoteSession, outgoing));
+
+        verify(listener).onOutboundMessage(any(NetworkBridge.class), any(Message.class));
+        verifyNoMoreInteractions(listener);
+    }
+
+    /**
+     * Additional test which makes sure that custom bridge receives notification about broker shutdown.
+     */
+    @Test
+    public void verifyBrokerShutdown() {
+        shutdownTest(() -> {
+            try {
+                localBroker.stop();
+            } catch (Exception e) {
+                return e;
+            }
+            return null;
+        });
+    }
+
+    /**
+     * Verification of network connector shutdown.
+     */
+    @Test
+    public void verifyConnectorShutdown() {
+        shutdownTest(() -> {
+            try {
+                getLocalConnector(0).stop();
+            } catch (Exception e) {
+                return e;
+            }
+            return null;
+        });
+    }
+
+    private void shutdownTest(Supplier<Throwable> callback) {
+        CustomNetworkBridgeFactory bridgeFactory = getCustomNetworkBridgeFactory();
+        NetworkBridgeListener listener = bridgeFactory.getListener();
+
+        verify(listener).onStart(any(NetworkBridge.class));
+        verifyNoMoreInteractions(listener);
+
+        Throwable throwable = callback.get();
+        assertNull("Unexpected error", throwable);
+
+        verify(listener).onStop(any(NetworkBridge.class));
+        verifyNoMoreInteractions(listener);
+    }
+
+    // helper methods
+    private void send(Session session, ActiveMQQueue destination, TextMessage message) throws JMSException {
+        MessageProducer producer = session.createProducer(destination);
+        try {
+            producer.send(message);
+        } finally {
+            producer.close();
+        }
+    }
+
+    private javax.jms.Message receive(Session session, ActiveMQQueue destination) throws JMSException {
+        MessageConsumer consumer = session.createConsumer(destination);
+        try {
+            return consumer.receive(TimeUnit.SECONDS.toMillis(5));
+        } finally {
+            consumer.close();
+        }
+    }
+
+    // infrastructure operations digging for connectors in running broker
+    private CustomNetworkBridgeFactory getCustomNetworkBridgeFactory() {
+        NetworkConnector connector = getLocalConnector(0);
+
+        assertTrue(connector.getBridgeFactory() instanceof CustomNetworkBridgeFactory);
+
+        return (CustomNetworkBridgeFactory) connector.getBridgeFactory();
+    }
+
+    private NetworkConnector getLocalConnector(int index) {
+        return localBroker.getNetworkConnectors().get(index);
+    }
+
+    // customizations
+    protected String getLocalBrokerURI() {
+        return "org/apache/activemq/network/localBroker-custom-factory.xml";
+    }
+
+    // test classes
+    static class CustomNetworkBridgeFactory implements BridgeFactory {
+
+        private final NetworkBridgeListener listener;
+
+        CustomNetworkBridgeFactory() {
+            this(Mockito.mock(NetworkBridgeListener.class));
+        }
+
+        CustomNetworkBridgeFactory(NetworkBridgeListener listener) {
+            this.listener = listener;
+        }
+
+        public NetworkBridgeListener getListener() {
+            return listener;
+        }
+
+        @Override
+        public DemandForwardingBridge createNetworkBridge(NetworkBridgeConfiguration configuration, Transport localTransport, Transport remoteTransport, NetworkBridgeListener listener) {
+            DemandForwardingBridge bridge = new DemandForwardingBridge(configuration, localTransport, remoteTransport);
+            bridge.setNetworkBridgeListener(new CompositeNetworkBridgeListener(this.listener, listener));
+            return bridge;
+        }
+
+    }
+
+    static class CompositeNetworkBridgeListener implements NetworkBridgeListener {
+
+        private final List<NetworkBridgeListener> listeners;
+
+        public CompositeNetworkBridgeListener(NetworkBridgeListener ... wrapped) {
+            this.listeners = Arrays.asList(wrapped);
+        }
+
+        @Override
+        public void bridgeFailed() {
+            for (NetworkBridgeListener listener : listeners) {
+                listener.bridgeFailed();
+            }
+        }
+
+        @Override
+        public void onStart(NetworkBridge bridge) {
+            for (NetworkBridgeListener listener : listeners) {
+                listener.onStart(bridge);
+            }
+        }
+
+        @Override
+        public void onStop(NetworkBridge bridge) {
+            for (NetworkBridgeListener listener : listeners) {
+                listener.onStop(bridge);
+            }
+        }
+
+        @Override
+        public void onOutboundMessage(NetworkBridge bridge, Message message) {
+            for (NetworkBridgeListener listener : listeners) {
+                listener.onOutboundMessage(bridge, message);
+            }
+        }
+
+        @Override
+        public void onInboundMessage(NetworkBridge bridge, Message message) {
+            for (NetworkBridgeListener listener : listeners) {
+                listener.onInboundMessage(bridge, message);
+            }
+        }
+    }
+
+}

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
index 2df48dd..c030db5 100644
--- a/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/network/SimpleNetworkTest.java
@@ -21,11 +21,9 @@ import static org.junit.Assert.assertNotNull;
 import static org.junit.Assert.assertNull;
 import static org.junit.Assert.assertTrue;
 
-import java.net.URI;
 import java.util.Arrays;
 import java.util.concurrent.ConcurrentMap;
 
-import javax.jms.Connection;
 import javax.jms.DeliveryMode;
 import javax.jms.Destination;
 import javax.jms.JMSException;
@@ -33,13 +31,11 @@ import javax.jms.Message;
 import javax.jms.MessageConsumer;
 import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
-import javax.jms.Session;
 import javax.jms.TextMessage;
 import javax.jms.TopicRequestor;
 import javax.jms.TopicSession;
 
 import org.apache.activemq.ActiveMQConnection;
-import org.apache.activemq.ActiveMQConnectionFactory;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.command.ActiveMQDestination;
 import org.apache.activemq.command.ActiveMQMessage;
@@ -48,33 +44,27 @@ import org.apache.activemq.command.ActiveMQTopic;
 import org.apache.activemq.command.ConsumerId;
 import org.apache.activemq.util.Wait;
 import org.apache.activemq.util.Wait.Condition;
-import org.apache.activemq.xbean.BrokerFactoryBean;
-import org.junit.After;
-import org.junit.Before;
 import org.junit.Ignore;
 import org.junit.Test;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
 import org.springframework.context.support.AbstractApplicationContext;
-import org.springframework.core.io.ClassPathResource;
-import org.springframework.core.io.Resource;
 
-public class SimpleNetworkTest {
+public class SimpleNetworkTest extends BaseNetworkTest {
 
     protected static final int MESSAGE_COUNT = 10;
-    private static final Logger LOG = LoggerFactory.getLogger(SimpleNetworkTest.class);
 
     protected AbstractApplicationContext context;
-    protected Connection localConnection;
-    protected Connection remoteConnection;
-    protected BrokerService localBroker;
-    protected BrokerService remoteBroker;
-    protected Session localSession;
-    protected Session remoteSession;
     protected ActiveMQTopic included;
     protected ActiveMQTopic excluded;
     protected String consumerName = "durableSubs";
 
+    @Override
+    protected void doSetUp(boolean deleteAllMessages) throws Exception {
+        super.doSetUp(deleteAllMessages);
+
+        included = new ActiveMQTopic("include.test.bar");
+        excluded = new ActiveMQTopic("exclude.test.bar");
+    }
+
     // works b/c of non marshaling vm transport, the connection
     // ref from the client is used during the forward
     @Test(timeout = 60 * 1000)
@@ -364,76 +354,6 @@ public class SimpleNetworkTest {
         }
     }
 
-    @Before
-    public void setUp() throws Exception {
-        doSetUp(true);
-    }
-
-    @After
-    public void tearDown() throws Exception {
-        doTearDown();
-    }
-
-    protected void doTearDown() throws Exception {
-        localConnection.close();
-        remoteConnection.close();
-        localBroker.stop();
-        remoteBroker.stop();
-    }
-
-    protected void doSetUp(boolean deleteAllMessages) throws Exception {
-        remoteBroker = createRemoteBroker();
-        remoteBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
-        remoteBroker.start();
-        remoteBroker.waitUntilStarted();
-        localBroker = createLocalBroker();
-        localBroker.setDeleteAllMessagesOnStartup(deleteAllMessages);
-        localBroker.start();
-        localBroker.waitUntilStarted();
-        URI localURI = localBroker.getVmConnectorURI();
-        ActiveMQConnectionFactory fac = new ActiveMQConnectionFactory(localURI);
-        fac.setAlwaysSyncSend(true);
-        fac.setDispatchAsync(false);
-        localConnection = fac.createConnection();
-        localConnection.setClientID("clientId");
-        localConnection.start();
-        URI remoteURI = remoteBroker.getVmConnectorURI();
-        fac = new ActiveMQConnectionFactory(remoteURI);
-        remoteConnection = fac.createConnection();
-        remoteConnection.setClientID("clientId");
-        remoteConnection.start();
-        included = new ActiveMQTopic("include.test.bar");
-        excluded = new ActiveMQTopic("exclude.test.bar");
-        localSession = localConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-        remoteSession = remoteConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
-    }
-
-    protected String getRemoteBrokerURI() {
-        return "org/apache/activemq/network/remoteBroker.xml";
-    }
-
-    protected String getLocalBrokerURI() {
-        return "org/apache/activemq/network/localBroker.xml";
-    }
-
-    protected BrokerService createBroker(String uri) throws Exception {
-        Resource resource = new ClassPathResource(uri);
-        BrokerFactoryBean factory = new BrokerFactoryBean(resource);
-        resource = new ClassPathResource(uri);
-        factory = new BrokerFactoryBean(resource);
-        factory.afterPropertiesSet();
-        BrokerService result = factory.getBroker();
-        return result;
-    }
-
-    protected BrokerService createLocalBroker() throws Exception {
-        return createBroker(getLocalBrokerURI());
-    }
-
-    protected BrokerService createRemoteBroker() throws Exception {
-        return createBroker(getRemoteBrokerURI());
-    }
-
     protected void assertNetworkBridgeStatistics(final long expectedLocalSent, final long expectedRemoteSent) throws Exception {
 
         final NetworkBridge localBridge = localBroker.getNetworkConnectors().get(0).activeBridges().iterator().next();

http://git-wip-us.apache.org/repos/asf/activemq/blob/a8a032af/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml
new file mode 100644
index 0000000..9dc8c61
--- /dev/null
+++ b/activemq-unit-tests/src/test/resources/org/apache/activemq/network/localBroker-custom-factory.xml
@@ -0,0 +1,40 @@
+<?xml version="1.0" encoding="UTF-8"?>
+<!--
+    Licensed to the Apache Software Foundation (ASF) under one or more
+    contributor license agreements.  See the NOTICE file distributed with
+    this work for additional information regarding copyright ownership.
+    The ASF licenses this file to You under the Apache License, Version 2.0
+    (the "License"); you may not use this file except in compliance with
+    the License.  You may obtain a copy of the License at
+
+    http://www.apache.org/licenses/LICENSE-2.0
+
+    Unless required by applicable law or agreed to in writing, software
+    distributed under the License is distributed on an "AS IS" BASIS,
+    WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+    See the License for the specific language governing permissions and
+    limitations under the License.
+-->
+<beans
+  xmlns="http://www.springframework.org/schema/beans"
+  xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+  xsi:schemaLocation="http://www.springframework.org/schema/beans http://www.springframework.org/schema/beans/spring-beans-2.0.xsd
+  http://activemq.apache.org/schema/core http://activemq.apache.org/schema/core/activemq-core.xsd
+  http://camel.apache.org/schema/spring http://camel.apache.org/schema/spring/camel-spring.xsd">
+  <bean class="org.springframework.beans.factory.config.PropertyPlaceholderConfigurer"/>
+
+<broker brokerName="localBroker" start="false" deleteAllMessagesOnStartup="true" persistent="true" useShutdownHook="false" monitorConnectionSplits="true" xmlns="http://activemq.apache.org/schema/core">
+    <networkConnectors>
+      <networkConnector uri="static:(tcp://localhost:61617)" name="networkConnector">
+          <bridgeFactory>
+              <bean xmlns="http://www.springframework.org/schema/beans" class="org.apache.activemq.network.CustomBridgeFactoryTest.CustomNetworkBridgeFactory" />
+          </bridgeFactory>
+      </networkConnector>
+    </networkConnectors>
+
+    <transportConnectors>
+      <transportConnector uri="tcp://localhost:61616"/>
+    </transportConnectors>
+
+  </broker>
+</beans>
\ No newline at end of file