You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2011/01/13 15:18:15 UTC

svn commit: r1058577 - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/broker/ main/java/org/apache/activemq/broker/region/ main/java/org/apache/activemq/command/ main/java/org/apache/activemq/network/ main/java/org/apache/activemq/...

Author: gtully
Date: Thu Jan 13 14:18:14 2011
New Revision: 1058577

URL: http://svn.apache.org/viewvc?rev=1058577&view=rev
Log:
https://issues.apache.org/jira/browse/AMQ-3077 - ArraysIndexOutOfBoundsException : -32768 in "BrokerService[xxx] Task" thread - brokerInfo and peerBroker infro explosion problems. A peer is a oneway relationship with networks, broker infos were being accumulated in duplicate for each connector and for multiple connectors. The peer broker info was maintained for each which caused the problem marshalling. re: https://issues.apache.org/jira/browse/AMQ-2632 - the configuration is now respected so it can be selectively enabled and rebalance only occurs if we randomly choose an alternative. The nested peer broker info is not propagated in a connection control

Added:
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java   (with props)
Modified:
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
    activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
    activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java?rev=1058577&r1=1058576&r2=1058577&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/TransportConnection.java Thu Jan 13 14:18:14 2011
@@ -1015,9 +1015,6 @@ public class TransportConnection impleme
                     ignore.printStackTrace();
                 }
             }
-            if (brokerInfo != null) {
-                broker.removeBroker(this, brokerInfo);
-            }
         }
         LOG.debug("Connection Stopped: " + getRemoteAddress());
     }
@@ -1182,7 +1179,7 @@ public class TransportConnection impleme
             // so this TransportConnection is the rear end of a network bridge
             // We have been requested to create a two way pipe ...
             try {
-                // We first look if existing network connection already exists for the same broker Id
+                // We first look if existing network connection already exists for the same broker Id and network connector name
                 // It's possible in case of brief network fault to have this transport connector side of the connection always active
                 // and the duplex network connector side wanting to open a new one
                 // In this case, the old connection must be broken
@@ -1234,7 +1231,6 @@ public class TransportConnection impleme
             LOG.warn("Unexpected extra broker info command received: " + info);
         }
         this.brokerInfo = info;
-        broker.addBroker(this, info);
         networkConnection = true;
         List<TransportConnectionState> connectionStates = listConnectionStates();
         for (TransportConnectionState cs : connectionStates) {

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java?rev=1058577&r1=1058576&r2=1058577&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/broker/region/RegionBroker.java Thu Jan 13 14:18:14 2011
@@ -94,7 +94,7 @@ public class RegionBroker extends EmptyB
 
     private final CopyOnWriteArrayList<Connection> connections = new CopyOnWriteArrayList<Connection>();
     private final Map<ActiveMQDestination, Destination> destinations = new ConcurrentHashMap<ActiveMQDestination, Destination>();
-    private final CopyOnWriteArrayList<BrokerInfo> brokerInfos = new CopyOnWriteArrayList<BrokerInfo>();
+    private final Map<BrokerId, BrokerInfo> brokerInfos = new HashMap<BrokerId, BrokerInfo>();
 
     private final LongSequenceGenerator sequenceGenerator = new LongSequenceGenerator();
     private BrokerId brokerId;
@@ -640,14 +640,25 @@ public class RegionBroker extends EmptyB
 
     @Override
     public synchronized void addBroker(Connection connection, BrokerInfo info) {
-        brokerInfos.add(info);
+        BrokerInfo existing = brokerInfos.get(info.getBrokerId());
+        if (existing == null) {
+            existing = info.copy();
+            existing.setPeerBrokerInfos(null);
+            brokerInfos.put(info.getBrokerId(), existing);
+        }
+        existing.incrementRefCount();
+        LOG.debug(getBrokerName() + " addBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
         addBrokerInClusterUpdate();
     }
 
     @Override
     public synchronized void removeBroker(Connection connection, BrokerInfo info) {
         if (info != null) {
-            brokerInfos.remove(info);
+            BrokerInfo existing = brokerInfos.get(info.getBrokerId());
+            if (existing != null && existing.decrementRefCount() == 0) {
+               brokerInfos.remove(info.getBrokerId());
+            }
+            LOG.debug(getBrokerName() + " removeBroker:" + info.getBrokerName() + " brokerInfo size : " + brokerInfos.size());
             removeBrokerInClusterUpdate();
         }
     }
@@ -655,7 +666,7 @@ public class RegionBroker extends EmptyB
     @Override
     public synchronized BrokerInfo[] getPeerBrokerInfos() {
         BrokerInfo[] result = new BrokerInfo[brokerInfos.size()];
-        result = brokerInfos.toArray(result);
+        result = brokerInfos.values().toArray(result);
         return result;
     }
 

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java?rev=1058577&r1=1058576&r2=1058577&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/command/BrokerInfo.java Thu Jan 13 14:18:14 2011
@@ -45,6 +45,7 @@ public class BrokerInfo extends BaseComm
     long connectionId;
     String brokerUploadUrl;
     String networkProperties;
+    transient int refCount = 0;
     
     public BrokerInfo copy() {
         BrokerInfo copy = new BrokerInfo();
@@ -265,4 +266,15 @@ public class BrokerInfo extends BaseComm
         }
         return result;
     }
+
+    public int getRefCount() {
+        return refCount;
+    }
+
+    public void incrementRefCount() {
+        refCount++;
+    }
+    public int decrementRefCount() {
+        return --refCount;
+    }
 }

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java?rev=1058577&r1=1058576&r2=1058577&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/network/DemandForwardingBridgeSupport.java Thu Jan 13 14:18:14 2011
@@ -228,20 +228,12 @@ public abstract class DemandForwardingBr
 
             localBroker.start();
             remoteBroker.start();
-            if (configuration.isDuplex() && duplexInitiatingConnection == null) {
-                // initiator side of duplex network
-                remoteBrokerNameKnownLatch.await();
-            }
             if (!disposed.get()) {
                 try {
                     triggerRemoteStartBridge();
                 } catch (IOException e) {
                     LOG.warn("Caught exception from remote start", e);
                 }
-                NetworkBridgeListener l = this.networkBridgeListener;
-                if (l != null) {
-                    l.onStart(this);
-                }
     	    } else {
                 LOG.warn ("Bridge was disposed before the start() method was fully executed.");
                 throw new TransportDisposedIOException();
@@ -309,6 +301,10 @@ public abstract class DemandForwardingBr
                     localSessionInfo = new SessionInfo(localConnectionInfo, 1);
                     localBroker.oneway(localSessionInfo);
                     brokerService.getBroker().networkBridgeStarted(remoteBrokerInfo, this.createdByDuplex);
+                    NetworkBridgeListener l = this.networkBridgeListener;
+                    if (l != null) {
+                        l.onStart(this);
+                    }
                     LOG.info("Network connection between " + localBroker + " and " + remoteBroker + "(" + remoteBrokerName + ") has been established.");
 
                 } else {
@@ -419,6 +415,7 @@ public abstract class DemandForwardingBr
                     ss.throwFirstException();
                 }
             }
+            brokerService.getBroker().removeBroker(null, remoteBrokerInfo);
             brokerService.getBroker().networkBridgeStopped(remoteBrokerInfo);
             LOG.info(configuration.getBrokerName() + " bridge to " + remoteBrokerName + " stopped");
             remoteBrokerNameKnownLatch.countDown();
@@ -480,6 +477,8 @@ public abstract class DemandForwardingBr
                     serviceRemoteBrokerInfo(command);
                     // Let the local broker know the remote broker's ID.
                     localBroker.oneway(command);
+                    // new peer broker (a consumer can work with remote broker also)
+                    brokerService.getBroker().addBroker(null, remoteBrokerInfo);
                 } else if (command.getClass() == ConnectionError.class) {
                     ConnectionError ce = (ConnectionError) command;
                     serviceRemoteException(ce.getException());

Modified: activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java?rev=1058577&r1=1058576&r2=1058577&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java (original)
+++ activemq/trunk/activemq-core/src/main/java/org/apache/activemq/transport/failover/FailoverTransport.java Thu Jan 13 14:18:14 2011
@@ -120,6 +120,7 @@ public class FailoverTransport implement
     private SslContext brokerSslContext;
     private String updateURIsURL = null;
     private boolean rebalanceUpdateURIs=true;
+    private boolean doRebalance = false;
 
     public FailoverTransport() throws InterruptedIOException {
         brokerSslContext = SslContext.getCurrentSslContext();
@@ -131,7 +132,7 @@ public class FailoverTransport implement
                 boolean buildBackup = true;
                 boolean doReconnect = !disposed;
                 synchronized (backupMutex) {
-                    if (connectedTransport.get() == null && !disposed) {
+                    if ((connectedTransport.get() == null || doRebalance) && !disposed) {
                         result = doReconnect();
                         buildBackup = false;
                     }
@@ -623,7 +624,7 @@ public class FailoverTransport implement
         for (int i = 0; i < u.length; i++) {
             uris.remove(u[i]);
         }
-        reconnect(rebalance);
+        // rebalance is automatic if any connected to removed/stopped broker
     }
 
     public void add(boolean rebalance, String u) {
@@ -643,15 +644,7 @@ public class FailoverTransport implement
         synchronized (reconnectMutex) {
             if (started) {
                 if (rebalance) {
-                    Transport transport = this.connectedTransport.getAndSet(null);
-                    if (transport != null) {
-                        try {
-                            transport.stop();
-                        } catch (Exception e) {
-                            LOG.debug("Caught an exception stopping existing transport", e);
-                        }
-                    }
-
+                    doRebalance = true;
                 }
                 LOG.debug("Waking up reconnect task");
                 try {
@@ -683,7 +676,7 @@ public class FailoverTransport implement
         if (removed) {
             l.add(failedConnectTransportURI);
         }
-        LOG.debug("urlList connectionList:" + l);
+        LOG.debug("urlList connectionList:" + l + ", from: " + uris);
         return l;
     }
 
@@ -798,13 +791,31 @@ public class FailoverTransport implement
                 reconnectMutex.notifyAll();
             }
 
-            if (connectedTransport.get() != null || disposed || connectionFailure != null) {
+            if ((connectedTransport.get() != null && !doRebalance) || disposed || connectionFailure != null) {
                 return false;
             } else {
                 List<URI> connectList = getConnectList();
                 if (connectList.isEmpty()) {
                     failure = new IOException("No uris available to connect to.");
                 } else {
+                    if (doRebalance) {
+                        if (connectList.get(0).equals(connectedTransportURI)) {
+                            // already connected to first in the list, no need to rebalance
+                            doRebalance = false;
+                            return false;
+                        } else {
+                            LOG.debug("Doing rebalance from: " + connectedTransportURI + " to " + connectList);
+                            try {
+                                Transport transport = this.connectedTransport.getAndSet(null);
+                                if (transport != null) {
+                                    transport.stop();
+                                }
+                            } catch (Exception e) {
+                                LOG.debug("Caught an exception stopping existing transport for rebalance", e);
+                            }
+                        }
+                        doRebalance = false;
+                    }
                     if (!useExponentialBackOff || reconnectDelay == DEFAULT_INITIAL_RECONNECT_DELAY) {
                         reconnectDelay = initialReconnectDelay;
                     }

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java?rev=1058577&r1=1058576&r2=1058577&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/JmsMultipleBrokersTestSupport.java Thu Jan 13 14:18:14 2011
@@ -152,19 +152,23 @@ public class JmsMultipleBrokersTestSuppo
         maxSetupTime = 8000;
     }
 
-    
-    protected void waitForBridgeFormation() throws Exception {
+
+    protected void waitForBridgeFormation(final int min) throws Exception {
         for (BrokerItem brokerItem : brokers.values()) {
             final BrokerService broker = brokerItem.broker;
             if (!broker.getNetworkConnectors().isEmpty()) {
                 Wait.waitFor(new Wait.Condition() {
                     public boolean isSatisified() throws Exception {
-                        return !broker.getNetworkConnectors().get(0).activeBridges().isEmpty();
+                        return (broker.getNetworkConnectors().get(0).activeBridges().size() >= min);
                     }});
             }
         }
     }
 
+    protected void waitForBridgeFormation() throws Exception {
+        waitForBridgeFormation(1);
+    }
+
     protected void startAllBrokers() throws Exception {
         Collection<BrokerItem> brokerList = brokers.values();
         for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
@@ -517,6 +521,7 @@ public class JmsMultipleBrokersTestSuppo
             }
 
             broker.stop();
+            broker.waitUntilStopped();
             consumers.clear();
 
             broker = null;

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java?rev=1058577&r1=1058576&r2=1058577&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/transport/failover/FailoverClusterTest.java Thu Jan 13 14:18:14 2011
@@ -35,31 +35,32 @@ import org.apache.activemq.network.Netwo
 
 public class FailoverClusterTest extends TestCase {
 
-private static final int NUMBER = 10;
-private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616";
-private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617";
-private static final String CLIENT_URL = "failover://("+BROKER_A_BIND_ADDRESS+")";
-private static final String BROKER_A_NAME = "BROKERA";
-private static final String BROKER_B_NAME = "BROKERB";
-private BrokerService brokerA;
-private BrokerService brokerB;
-private final List<ActiveMQConnection>connections = new ArrayList<ActiveMQConnection>();
-
-
-  public void testClusterConnectedAfterClients() throws Exception{
-      createClients();
-      if (brokerB == null) {
-          brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
-      }
-      Thread.sleep(3000);
-      Set<String> set = new HashSet<String>();
-      for (ActiveMQConnection c:connections) {
-          set.add(c.getTransportChannel().getRemoteAddress());
-      }
-      assertTrue(set.size() > 1);
-  }
+    private static final int NUMBER = 10;
+    private static final String BROKER_A_BIND_ADDRESS = "tcp://0.0.0.0:61616";
+    private static final String BROKER_B_BIND_ADDRESS = "tcp://0.0.0.0:61617";
+    private static final String BROKER_A_NAME = "BROKERA";
+    private static final String BROKER_B_NAME = "BROKERB";
+    private BrokerService brokerA;
+    private BrokerService brokerB;
+    private String clientUrl;
 
-    public void testClusterURIOptionsStrip() throws Exception{
+    private final List<ActiveMQConnection> connections = new ArrayList<ActiveMQConnection>();
+
+
+    public void testClusterConnectedAfterClients() throws Exception {
+        createClients();
+        if (brokerB == null) {
+            brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
+        }
+        Thread.sleep(3000);
+        Set<String> set = new HashSet<String>();
+        for (ActiveMQConnection c : connections) {
+            set.add(c.getTransportChannel().getRemoteAddress());
+        }
+        assertTrue(set.size() > 1);
+    }
+
+    public void testClusterURIOptionsStrip() throws Exception {
         createClients();
         if (brokerB == null) {
             // add in server side only url param, should not be propagated
@@ -67,45 +68,44 @@ private final List<ActiveMQConnection>co
         }
         Thread.sleep(3000);
         Set<String> set = new HashSet<String>();
-        for (ActiveMQConnection c:connections) {
+        for (ActiveMQConnection c : connections) {
             set.add(c.getTransportChannel().getRemoteAddress());
         }
         assertTrue(set.size() > 1);
     }
 
-  
-  public void testClusterConnectedBeforeClients() throws Exception{
-      
-      if (brokerB == null) {
-          brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
-      }
-      Thread.sleep(5000);
-      createClients();
-      Thread.sleep(2000);
-      brokerA.stop();
-      Thread.sleep(2000);
-     
-      URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS);
-      for (ActiveMQConnection c:connections) {
-          String addr = c.getTransportChannel().getRemoteAddress();    
-          assertTrue(addr.indexOf(""+brokerBURI.getPort()) > 0);
-      }
-  }
+
+    public void testClusterConnectedBeforeClients() throws Exception {
+
+        if (brokerB == null) {
+            brokerB = createBrokerB(BROKER_B_BIND_ADDRESS);
+        }
+        Thread.sleep(5000);
+        createClients();
+        Thread.sleep(2000);
+        brokerA.stop();
+        Thread.sleep(2000);
+
+        URI brokerBURI = new URI(BROKER_B_BIND_ADDRESS);
+        for (ActiveMQConnection c : connections) {
+            String addr = c.getTransportChannel().getRemoteAddress();
+            assertTrue(addr.indexOf("" + brokerBURI.getPort()) > 0);
+        }
+    }
 
     @Override
     protected void setUp() throws Exception {
         if (brokerA == null) {
-           brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false");
+            brokerA = createBrokerA(BROKER_A_BIND_ADDRESS + "?transport.closeAsync=false");
+            clientUrl = "failover://(" + brokerA.getTransportConnectors().get(0).getPublishableConnectString() + ")";
         }
-        
-        
     }
 
     @Override
     protected void tearDown() throws Exception {
-        for (Connection c:connections) {
+        for (Connection c : connections) {
             c.close();
-        }     
+        }
         if (brokerB != null) {
             brokerB.stop();
             brokerB = null;
@@ -115,16 +115,16 @@ private final List<ActiveMQConnection>co
             brokerA = null;
         }
     }
-    
+
     protected BrokerService createBrokerA(String uri) throws Exception {
         BrokerService answer = new BrokerService();
         answer.setUseJmx(false);
-        configureConsumerBroker(answer,uri);
+        configureConsumerBroker(answer, uri);
         answer.start();
         return answer;
     }
-    
-    protected void configureConsumerBroker(BrokerService answer,String uri) throws Exception {
+
+    protected void configureConsumerBroker(BrokerService answer, String uri) throws Exception {
         answer.setBrokerName(BROKER_A_NAME);
         answer.setPersistent(false);
         TransportConnector connector = answer.addConnector(uri);
@@ -132,33 +132,33 @@ private final List<ActiveMQConnection>co
         connector.setUpdateClusterClients(true);
         answer.setUseShutdownHook(false);
     }
-    
+
     protected BrokerService createBrokerB(String uri) throws Exception {
         BrokerService answer = new BrokerService();
         answer.setUseJmx(false);
-        configureNetwork(answer,uri);
+        configureNetwork(answer, uri);
         answer.start();
         return answer;
     }
-    
-    protected void configureNetwork(BrokerService answer,String uri) throws Exception {
+
+    protected void configureNetwork(BrokerService answer, String uri) throws Exception {
         answer.setBrokerName(BROKER_B_NAME);
         answer.setPersistent(false);
-        NetworkConnector network = answer.addNetworkConnector("static://"+BROKER_A_BIND_ADDRESS);
+        NetworkConnector network = answer.addNetworkConnector("static://" + BROKER_A_BIND_ADDRESS);
         network.setDuplex(true);
-        TransportConnector connector =answer.addConnector(uri);
+        TransportConnector connector = answer.addConnector(uri);
         connector.setRebalanceClusterClients(true);
         connector.setUpdateClusterClients(true);
         answer.setUseShutdownHook(false);
     }
-    
+
     protected void createClients() throws Exception {
-        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(CLIENT_URL);
-        for (int i =0;i < NUMBER; i++) {
+        ActiveMQConnectionFactory factory = new ActiveMQConnectionFactory(clientUrl);
+        for (int i = 0; i < NUMBER; i++) {
             ActiveMQConnection c = (ActiveMQConnection) factory.createConnection();
             c.start();
             Session s = c.createSession(false, Session.AUTO_ACKNOWLEDGE);
-            Queue queue  = s.createQueue(getClass().getName());
+            Queue queue = s.createQueue(getClass().getName());
             MessageConsumer consumer = s.createConsumer(queue);
             connections.add(c);
         }

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java?rev=1058577&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java Thu Jan 13 14:18:14 2011
@@ -0,0 +1,197 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.usecases;
+
+import java.net.URI;
+import java.util.Collection;
+import java.util.Iterator;
+import java.util.List;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.util.ThreadTracker;
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+public class NetworkOfTwentyBrokersTest extends JmsMultipleBrokersTestSupport {
+    private static final Log LOG = LogFactory.getLog(NetworkOfTwentyBrokersTest.class);
+
+    // This will interconnect all brokers using multicast
+    protected void bridgeAllBrokers() throws Exception {
+        bridgeAllBrokers("TwentyBrokersTest", 1, false, false);
+    }
+
+    protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs) throws Exception {
+        bridgeAllBrokers(groupName, ttl, suppressduplicateQueueSubs, false);
+    }
+
+    protected void bridgeAllBrokers(String groupName, int ttl, boolean suppressduplicateQueueSubs, boolean decreasePriority) throws Exception {
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            BrokerService broker = i.next().broker;
+            List<TransportConnector> transportConnectors = broker.getTransportConnectors();
+
+            if (transportConnectors.isEmpty()) {
+                broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
+                transportConnectors = broker.getTransportConnectors();
+            }
+
+            TransportConnector transport = transportConnectors.get(0);
+            if (transport.getDiscoveryUri() == null) {
+                transport.setDiscoveryUri(new URI("multicast://default?group=" + groupName));
+            }
+
+            List<NetworkConnector> networkConnectors = broker.getNetworkConnectors();
+            if (networkConnectors.isEmpty()) {
+                broker.addNetworkConnector("multicast://default?group=" + groupName);
+                networkConnectors = broker.getNetworkConnectors();
+            }
+
+            NetworkConnector nc = networkConnectors.get(0);
+            nc.setNetworkTTL(ttl);
+            nc.setSuppressDuplicateQueueSubscriptions(suppressduplicateQueueSubs);
+            nc.setDecreaseNetworkConsumerPriority(decreasePriority);
+        }
+
+        // Multicasting may take longer to setup
+        maxSetupTime = 8000;
+    }
+
+    protected BrokerService createBroker(String brokerName) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(false);
+        broker.setUseJmx(false);
+        broker.setBrokerName(brokerName);
+        broker.addConnector(new URI(AUTO_ASSIGN_TRANSPORT));
+        brokers.put(brokerName, new BrokerItem(broker));
+
+        return broker;
+    }
+
+    /* AMQ-3077 Bug */
+    public void testBrokers() throws Exception {
+        int X = 20;
+        int i;
+
+        LOG.info("Creating X Brokers");
+        for (i = 0; i < X; i++) {
+            createBroker("Broker" + i);
+        }
+
+        bridgeAllBrokers();
+        startAllBrokers();
+        waitForBridgeFormation(X-1);
+
+        verifyPeerBrokerInfos(X-1);
+
+        LOG.info("Stopping half the brokers");
+        for (i = 0; i < X/2; i++) {
+            destroyBroker("Broker" + i);
+        }
+
+        LOG.info("Waiting for complete stop");
+        try {
+            Thread.sleep(10000);
+        } catch (Exception e) {
+        }
+
+        verifyPeerBrokerInfos((X/2) -1);
+
+        LOG.info("Recreating first half");
+        for (i = 0; i < X/2; i++) {
+            createBroker("Broker" + i);
+        }
+
+        bridgeAllBrokers();
+        startAllBrokers();
+        waitForBridgeFormation(X-1);
+
+        verifyPeerBrokerInfos(X-1);
+    }
+
+    public void testPeerBrokerCountHalfPeer() throws Exception {
+        createBroker("A");
+        createBroker("B");
+        bridgeBrokers("A", "B");
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+        verifyPeerBrokerInfo(brokers.get("B"), 0);
+    }
+
+    public void testPeerBrokerCountHalfPeerTwice() throws Exception {
+        createBroker("A");
+        createBroker("B");
+        bridgeBrokers("A", "B");
+        bridgeBrokers("A", "B");
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+        verifyPeerBrokerInfo(brokers.get("B"), 0);
+    }
+
+    public void testPeerBrokerCountFullPeer() throws Exception {
+        createBroker("A");
+        createBroker("B");
+        bridgeBrokers("A", "B");
+        bridgeBrokers("B", "A");
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+        verifyPeerBrokerInfo(brokers.get("B"), 1);
+    }
+
+    public void testPeerBrokerCountFullPeerDuplex() throws Exception {
+        createBroker("A");
+        createBroker("B");
+        NetworkConnector nc = bridgeBrokers("A", "B");
+        nc.setDuplex(true);
+        startAllBrokers();
+        verifyPeerBrokerInfo(brokers.get("A"), 1);
+        verifyPeerBrokerInfo(brokers.get("B"), 1);
+    }
+
+
+    private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) {
+        BrokerService broker = brokerItem.broker;
+        RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
+        for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
+            LOG.info(info.getBrokerName());
+        }
+        assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
+    }
+
+    private void verifyPeerBrokerInfos(final int max) {
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext();) {
+            verifyPeerBrokerInfo(i.next(), max);
+        }
+    }
+
+    @Override
+    public void setUp() throws Exception {
+        super.setAutoFail(true);
+        super.setUp();
+    }
+
+    @Override
+    public void tearDown() throws Exception {
+        super.tearDown();
+        ThreadTracker.result();
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/usecases/NetworkOfTwentyBrokersTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date