You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by gt...@apache.org on 2013/11/18 14:06:37 UTC

git commit: https://issues.apache.org/jira/browse/AMQ-4485 - resolve - non transacitonal work interleaved with transactional work needs to check for and integrate with cursor update ordering. Key difference in latest test is 3/4 of messages getting produ

Updated Branches:
  refs/heads/trunk 030c2cca3 -> 511b60c6f


https://issues.apache.org/jira/browse/AMQ-4485 - resolve - non transacitonal work interleaved with transactional work needs to check for and integrate with cursor update ordering. Key difference in latest test is 3/4 of messages getting produced via network connectors, hense not in transactions


Project: http://git-wip-us.apache.org/repos/asf/activemq/repo
Commit: http://git-wip-us.apache.org/repos/asf/activemq/commit/511b60c6
Tree: http://git-wip-us.apache.org/repos/asf/activemq/tree/511b60c6
Diff: http://git-wip-us.apache.org/repos/asf/activemq/diff/511b60c6

Branch: refs/heads/trunk
Commit: 511b60c6f650650b24df34ac55dd174006b4cd43
Parents: 030c2cc
Author: gtully <ga...@gmail.com>
Authored: Mon Nov 18 13:04:44 2013 +0000
Committer: gtully <ga...@gmail.com>
Committed: Mon Nov 18 13:05:57 2013 +0000

----------------------------------------------------------------------
 .../apache/activemq/broker/region/Queue.java    |  95 +++--
 .../region/cursors/AbstractStoreCursor.java     |   9 +-
 ...XBrokersWithNDestsFanoutTransactionTest.java | 353 +++++++++++++++++++
 3 files changed, 427 insertions(+), 30 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/activemq/blob/511b60c6/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
index 5b50f1c..31b60b8 100755
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/Queue.java
@@ -35,6 +35,7 @@ import java.util.concurrent.locks.ReentrantReadWriteLock;
 import javax.jms.InvalidSelectorException;
 import javax.jms.JMSException;
 import javax.jms.ResourceAllocationException;
+import javax.transaction.xa.XAException;
 import org.apache.activemq.broker.BrokerService;
 import org.apache.activemq.broker.ConnectionContext;
 import org.apache.activemq.broker.ProducerBrokerExchange;
@@ -714,7 +715,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
     }
 
     final ConcurrentHashMap<Transaction, SendSync> sendSyncs = new ConcurrentHashMap<Transaction, SendSync>();
-    private volatile LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
+    private LinkedList<Transaction> orderIndexUpdates = new LinkedList<Transaction>();
 
     // roll up all message sends
     class SendSync extends Synchronization {
@@ -742,40 +743,31 @@ public class Queue extends BaseDestination implements Task, UsageListener {
 
         @Override
         public void beforeCommit() throws Exception {
-            synchronized (sendLock) {
+            synchronized (orderIndexUpdates) {
                 orderIndexUpdates.addLast(transaction);
             }
         }
 
         @Override
         public void afterCommit() throws Exception {
-            LinkedList<Transaction> orderedWork = new LinkedList<Transaction>();;
-            // use existing object to sync orderIndexUpdates that can be reassigned
-            synchronized (sendLock) {
-                Transaction next = orderIndexUpdates.peek();
-                while( next!=null && next.isCommitted() ) {
-                    orderedWork.addLast(orderIndexUpdates.removeFirst());
-                    next = orderIndexUpdates.peek();
-                }
-            }
-            // do the ordered work
-            if (!orderedWork.isEmpty()) {
-
-                ArrayList<SendSync> syncs = new ArrayList<SendSync>(orderedWork.size());;
-                for (Transaction tx : orderedWork) {
-                    syncs.add(sendSyncs.remove(tx));
-                }
-                sendLock.lockInterruptibly();
-                try {
-                    for (SendSync sync : syncs) {
-                        sync.processSend();
+            ArrayList<SendSync> syncs = new ArrayList<SendSync>(200);
+            sendLock.lockInterruptibly();
+            try {
+                synchronized (orderIndexUpdates) {
+                    Transaction next = orderIndexUpdates.peek();
+                    while( next!=null && next.isCommitted() ) {
+                        syncs.add(sendSyncs.remove(orderIndexUpdates.removeFirst()));
+                        next = orderIndexUpdates.peek();
                     }
-                } finally {
-                    sendLock.unlock();
                 }
                 for (SendSync sync : syncs) {
-                    sync.processSent();
+                    sync.processSend();
                 }
+            } finally {
+                sendLock.unlock();
+            }
+            for (SendSync sync : syncs) {
+                sync.processSent();
             }
         }
 
@@ -815,9 +807,51 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         }
     }
 
+    class OrderedNonTransactionWorkTx extends Transaction {
+
+        @Override
+        public void commit(boolean onePhase) throws XAException, IOException {
+        }
+
+        @Override
+        public void rollback() throws XAException, IOException {
+        }
+
+        @Override
+        public int prepare() throws XAException, IOException {
+            return 0;
+        }
+
+        @Override
+        public TransactionId getTransactionId() {
+            return null;
+        }
+
+        @Override
+        public Logger getLog() {
+            return null;
+        }
+
+        @Override
+        public boolean isCommitted() {
+            return true;
+        }
+
+        @Override
+        public void addSynchronization(Synchronization s) {
+            try {
+                s.beforeCommit();
+            } catch (Exception e) {
+                LOG.error("Failed to add not transactional message to orderedWork", e);
+            }
+        }
+    }
+
     // called while holding the sendLock
     private void registerSendSync(Message message, ConnectionContext context) {
-        final Transaction transaction = context.getTransaction();
+        final Transaction transaction =
+                message.isInTransaction() ? context.getTransaction()
+                        : new OrderedNonTransactionWorkTx();
         Queue.SendSync currentSync = sendSyncs.get(transaction);
         if (currentSync == null) {
             currentSync = new Queue.SendSync(transaction);
@@ -831,6 +865,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
             Exception {
         final ConnectionContext context = producerExchange.getConnectionContext();
         Future<Object> result = null;
+        boolean needsOrderingWithTransactions = context.isInTransaction();
 
         producerExchange.incrementSend();
         checkUsage(context, producerExchange, message);
@@ -847,7 +882,11 @@ public class Queue extends BaseDestination implements Task, UsageListener {
                     message.clearMarshalledState();
                 }
             }
-            if (context.isInTransaction()) {
+            // did a transaction commit beat us to the index?
+            synchronized (orderIndexUpdates) {
+                needsOrderingWithTransactions |= !orderIndexUpdates.isEmpty();
+            }
+            if (needsOrderingWithTransactions ) {
                 // If this is a transacted message.. increase the usage now so that
                 // a big TX does not blow up
                 // our memory. This increment is decremented once the tx finishes..
@@ -862,7 +901,7 @@ public class Queue extends BaseDestination implements Task, UsageListener {
         } finally {
             sendLock.unlock();
         }
-        if (!context.isInTransaction()) {
+        if (!needsOrderingWithTransactions) {
             messageSent(context, message);
         }
         if (result != null && !result.isCancelled()) {

http://git-wip-us.apache.org/repos/asf/activemq/blob/511b60c6/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
----------------------------------------------------------------------
diff --git a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
index 624c4f3..495d365 100644
--- a/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
+++ b/activemq-broker/src/main/java/org/apache/activemq/broker/region/cursors/AbstractStoreCursor.java
@@ -21,6 +21,7 @@ import org.apache.activemq.broker.region.Destination;
 import org.apache.activemq.broker.region.MessageReference;
 import org.apache.activemq.command.Message;
 import org.apache.activemq.command.MessageId;
+import org.apache.activemq.command.TransactionId;
 import org.apache.activemq.store.MessageRecoveryListener;
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
@@ -38,6 +39,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     private boolean storeHasMessages = false;
     protected int size;
     private MessageId lastCachedId;
+    private TransactionId lastTx;
     protected boolean hadSpace = false;
 
     protected AbstractStoreCursor(Destination destination) {
@@ -176,6 +178,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             if (isCacheEnabled()) {
                 if (recoverMessage(node.getMessage(),true)) {
                     lastCachedId = node.getMessageId();
+                    lastTx = node.getMessage().getTransactionId();
                 } else {
                     // failed to recover, possible duplicate from concurrent dispatchPending,
                     // lets not recover further in case of out of order
@@ -190,9 +193,11 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
             setCacheEnabled(false);
             // sync with store on disabling the cache
             if (lastCachedId != null) {
-                LOG.trace(this + "{} - disabling cache, lastCachedId: {} current node Id: {} batchList size: {}", new Object[]{ this, lastCachedId, node.getMessageId(), batchList.size() });
+                LOG.debug("{} - disabling cache, lastCachedId: {} last-tx: {} current node Id: {} node-tx: {} batchList size: {}",
+                        new Object[]{ this, lastCachedId, lastTx, node.getMessageId(), node.getMessage().getTransactionId(), batchList.size() });
                 setBatch(lastCachedId);
                 lastCachedId = null;
+                lastTx = null;
             }
         }
         this.storeHasMessages = true;
@@ -287,7 +292,7 @@ public abstract class AbstractStoreCursor extends AbstractPendingMessageCursor i
     public String toString() {
         return super.toString() + ":" + regionDestination.getActiveMQDestination().getPhysicalName() + ",batchResetNeeded=" + batchResetNeeded
                     + ",storeHasMessages=" + this.storeHasMessages + ",size=" + this.size + ",cacheEnabled=" + isCacheEnabled()
-                    + ",maxBatchSize:" + maxBatchSize;
+                    + ",maxBatchSize:" + maxBatchSize + ",hasSpace:" + hasSpace();
     }
     
     protected abstract void doFillBatch() throws Exception;

http://git-wip-us.apache.org/repos/asf/activemq/blob/511b60c6/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
----------------------------------------------------------------------
diff --git a/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
new file mode 100644
index 0000000..c2cf53a
--- /dev/null
+++ b/activemq-unit-tests/src/test/java/org/apache/activemq/bugs/AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.java
@@ -0,0 +1,353 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.bugs;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.ArrayList;
+import java.util.Arrays;
+import java.util.Collection;
+import java.util.HashMap;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Vector;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.QueueConnection;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.JmsMultipleBrokersTestSupport;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.region.RegionBroker;
+import org.apache.activemq.broker.region.policy.PolicyEntry;
+import org.apache.activemq.broker.region.policy.PolicyMap;
+import org.apache.activemq.command.ActiveMQDestination;
+import org.apache.activemq.command.ActiveMQMessage;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.apache.activemq.command.BrokerInfo;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
+import org.apache.activemq.network.NetworkConnector;
+import org.apache.activemq.store.kahadb.KahaDBPersistenceAdapter;
+import org.apache.activemq.util.TimeUtils;
+import org.apache.activemq.util.Wait;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest extends JmsMultipleBrokersTestSupport {
+    static final String payload = new String(new byte[10 * 1024]);
+    private static final Logger LOG = LoggerFactory.getLogger(AMQ4485NetworkOfXBrokersWithNDestsFanoutTransactionTest.class);
+    final int portBase = 61600;
+    final int numBrokers = 4;
+    final int numProducers = 10;
+    final int numMessages = 800;
+    final int consumerSleepTime = 20;
+    StringBuilder brokersUrl = new StringBuilder();
+    HashMap<ActiveMQQueue, AtomicInteger> accumulators = new HashMap<ActiveMQQueue, AtomicInteger>();
+    private ArrayList<Throwable> exceptions = new ArrayList<Throwable>();
+
+    protected void buildUrlList() throws Exception {
+        for (int i = 0; i < numBrokers; i++) {
+            brokersUrl.append("tcp://localhost:" + (portBase + i));
+            if (i != numBrokers - 1) {
+                brokersUrl.append(',');
+            }
+        }
+    }
+
+    protected BrokerService createBroker(int brokerid) throws Exception {
+        BrokerService broker = new BrokerService();
+        broker.setPersistent(true);
+        broker.setDeleteAllMessagesOnStartup(true);
+        broker.getManagementContext().setCreateConnector(false);
+
+
+        broker.setUseJmx(true);
+        broker.setBrokerName("B" + brokerid);
+        broker.addConnector(new URI("tcp://localhost:" + (portBase + brokerid)));
+
+        addNetworkConnector(broker);
+        broker.setSchedulePeriodForDestinationPurge(0);
+        broker.getSystemUsage().setSendFailIfNoSpace(true);
+        broker.getSystemUsage().getMemoryUsage().setLimit(512 * 1024 * 1024);
+
+
+        PolicyMap policyMap = new PolicyMap();
+        PolicyEntry policyEntry = new PolicyEntry();
+        policyEntry.setExpireMessagesPeriod(0);
+        policyEntry.setQueuePrefetch(1000);
+        policyEntry.setMemoryLimit(1024 * 1024l);
+        policyEntry.setOptimizedDispatch(false);
+        policyEntry.setProducerFlowControl(false);
+        policyEntry.setEnableAudit(true);
+        policyEntry.setUseCache(true);
+        policyMap.put(new ActiveMQQueue("GW.>"), policyEntry);
+        broker.setDestinationPolicy(policyMap);
+
+        KahaDBPersistenceAdapter kahaDBPersistenceAdapter = (KahaDBPersistenceAdapter) broker.getPersistenceAdapter();
+        kahaDBPersistenceAdapter.setConcurrentStoreAndDispatchQueues(false);
+
+        brokers.put(broker.getBrokerName(), new BrokerItem(broker));
+        return broker;
+    }
+
+    private void addNetworkConnector(BrokerService broker) throws Exception {
+        StringBuilder networkConnectorUrl = new StringBuilder("static:(").append(brokersUrl.toString());
+        networkConnectorUrl.append(')');
+
+        for (int i = 0; i < 2; i++) {
+            NetworkConnector nc = new DiscoveryNetworkConnector(new URI(networkConnectorUrl.toString()));
+            nc.setName("Bridge-" + i);
+            nc.setNetworkTTL(1);
+            nc.setDecreaseNetworkConsumerPriority(true);
+            nc.setDynamicOnly(true);
+            nc.setPrefetchSize(100);
+            nc.setDynamicallyIncludedDestinations(
+                    Arrays.asList(new ActiveMQDestination[]{new ActiveMQQueue("GW.*")}));
+            broker.addNetworkConnector(nc);
+        }
+    }
+
+    public void testBrokers() throws Exception {
+
+        buildUrlList();
+
+        for (int i = 0; i < numBrokers; i++) {
+            createBroker(i);
+        }
+
+        startAllBrokers();
+        waitForBridgeFormation(numBrokers - 1);
+
+        verifyPeerBrokerInfos(numBrokers - 1);
+
+
+        final List<ConsumerState> consumerStates = startAllGWConsumers(numBrokers);
+
+        startAllGWFanoutConsumers(numBrokers);
+
+        LOG.info("Waiting for percolation of consumers..");
+        TimeUnit.SECONDS.sleep(5);
+
+        LOG.info("Produce mesages..");
+        long startTime = System.currentTimeMillis();
+
+        // produce
+        produce(numMessages);
+
+        assertTrue("Got all sent", Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                for (ConsumerState tally : consumerStates) {
+                    final int expected = numMessages * (tally.destination.isComposite() ? tally.destination.getCompositeDestinations().length : 1);
+                    LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get());
+                    if (tally.accumulator.get() != expected) {
+                        LOG.info("Tally for: " + tally.brokerName + ", dest: " + tally.destination + " - " + tally.accumulator.get() + " != " + expected + ", " + tally.expected);
+                        return false;
+                    }
+                    LOG.info("got tally on " + tally.brokerName);
+                }
+                return true;
+            }
+        }, 1000 * 60 * 1000l));
+
+        assertTrue("No exceptions:" + exceptions, exceptions.isEmpty());
+
+        LOG.info("done");
+        long duration = System.currentTimeMillis() - startTime;
+        LOG.info("Duration:" + TimeUtils.printDuration(duration));
+    }
+
+    private void startAllGWFanoutConsumers(int nBrokers) throws Exception {
+
+        StringBuffer compositeDest = new StringBuffer();
+        for (int k = 0; k < nBrokers; k++) {
+            compositeDest.append("GW." + k);
+            if (k + 1 != nBrokers) {
+                compositeDest.append(',');
+            }
+        }
+        ActiveMQQueue compositeQ = new ActiveMQQueue(compositeDest.toString());
+
+        for (int id = 0; id < nBrokers; id++) {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+            connectionFactory.setWatchTopicAdvisories(false);
+
+            QueueConnection queueConnection = connectionFactory.createQueueConnection();
+            queueConnection.start();
+
+            final QueueSession queueSession = queueConnection.createQueueSession(true, Session.SESSION_TRANSACTED);
+
+            final MessageProducer producer = queueSession.createProducer(compositeQ);
+            queueSession.createReceiver(new ActiveMQQueue("IN")).setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        producer.send(message);
+                        queueSession.commit();
+                    } catch (Exception e) {
+                        LOG.error("Failed to fanout to GW: " + message, e);
+                    }
+
+                }
+            });
+        }
+    }
+
+    private List<ConsumerState> startAllGWConsumers(int nBrokers) throws Exception {
+        List<ConsumerState> consumerStates = new LinkedList<ConsumerState>();
+        for (int id = 0; id < nBrokers; id++) {
+            ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+            connectionFactory.setWatchTopicAdvisories(false);
+
+            QueueConnection queueConnection = connectionFactory.createQueueConnection();
+            queueConnection.start();
+
+            final QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            ActiveMQQueue destination = new ActiveMQQueue("GW." + id);
+            QueueReceiver queueReceiver = queueSession.createReceiver(destination);
+
+            final ConsumerState consumerState = new ConsumerState();
+            consumerState.brokerName = ((ActiveMQConnection) queueConnection).getBrokerName();
+            consumerState.receiver = queueReceiver;
+            consumerState.destination = destination;
+            for (int j = 0; j < numMessages * (consumerState.destination.isComposite() ? consumerState.destination.getCompositeDestinations().length : 1); j++) {
+                consumerState.expected.add(j);
+            }
+
+            if (!accumulators.containsKey(destination)) {
+                accumulators.put(destination, new AtomicInteger(0));
+            }
+            consumerState.accumulator = accumulators.get(destination);
+
+            queueReceiver.setMessageListener(new MessageListener() {
+                @Override
+                public void onMessage(Message message) {
+                    try {
+                        if (consumerSleepTime > 0) {
+                            TimeUnit.MILLISECONDS.sleep(consumerSleepTime);
+                        }
+                    } catch (InterruptedException e) {
+                        e.printStackTrace();
+                    }
+                    try {
+                        consumerState.accumulator.incrementAndGet();
+                        try {
+                            consumerState.expected.remove(((ActiveMQMessage) message).getProperty("NUM"));
+                        } catch (IOException e) {
+                            e.printStackTrace();
+                        }
+                    } catch (Exception e) {
+                        LOG.error("Failed to commit slow receipt of " + message, e);
+                    }
+                }
+            });
+
+            consumerStates.add(consumerState);
+
+        }
+        return consumerStates;
+    }
+
+    private void produce(int numMessages) throws Exception {
+        ExecutorService executorService = Executors.newFixedThreadPool(numProducers);
+        final AtomicInteger toSend = new AtomicInteger(numMessages);
+        for (int i = 1; i <= numProducers; i++) {
+            final int id = i % numBrokers;
+            executorService.execute(new Runnable() {
+                @Override
+                public void run() {
+                    try {
+                        ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory("failover:(tcp://localhost:" + (portBase + id) + ")");
+                        connectionFactory.setWatchTopicAdvisories(false);
+                        QueueConnection queueConnection = connectionFactory.createQueueConnection();
+                        queueConnection.start();
+                        QueueSession queueSession = queueConnection.createQueueSession(false, Session.AUTO_ACKNOWLEDGE);
+                        MessageProducer producer = queueSession.createProducer(null);
+                        int val = 0;
+                        while ((val = toSend.decrementAndGet()) >= 0) {
+
+                            ActiveMQQueue compositeQ = new ActiveMQQueue("IN");
+                            LOG.info("Send to: " + ((ActiveMQConnection) queueConnection).getBrokerName() + ", " + val + ", dest:" + compositeQ);
+                            Message textMessage = queueSession.createTextMessage(((ActiveMQConnection) queueConnection).getBrokerName() + "->" + val + " payload:" + payload);
+                            textMessage.setIntProperty("NUM", val);
+                            producer.send(compositeQ, textMessage);
+                        }
+                        queueConnection.close();
+
+                    } catch (Throwable throwable) {
+                        throwable.printStackTrace();
+                        exceptions.add(throwable);
+                    }
+                }
+            });
+        }
+    }
+
+    private void verifyPeerBrokerInfo(BrokerItem brokerItem, final int max) throws Exception {
+        final BrokerService broker = brokerItem.broker;
+        final RegionBroker regionBroker = (RegionBroker) broker.getRegionBroker();
+        Wait.waitFor(new Wait.Condition() {
+            @Override
+            public boolean isSatisified() throws Exception {
+                LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
+                return max == regionBroker.getPeerBrokerInfos().length;
+            }
+        });
+        LOG.info("verify infos " + broker.getBrokerName() + ", len: " + regionBroker.getPeerBrokerInfos().length);
+        List<String> missing = new ArrayList<String>();
+        for (int i = 0; i < max; i++) {
+            missing.add("B" + i);
+        }
+        if (max != regionBroker.getPeerBrokerInfos().length) {
+            for (BrokerInfo info : regionBroker.getPeerBrokerInfos()) {
+                LOG.info(info.getBrokerName());
+                missing.remove(info.getBrokerName());
+            }
+            LOG.info("Broker infos off.." + missing);
+        }
+        assertEquals(broker.getBrokerName(), max, regionBroker.getPeerBrokerInfos().length);
+    }
+
+    private void verifyPeerBrokerInfos(final int max) throws Exception {
+        Collection<BrokerItem> brokerList = brokers.values();
+        for (Iterator<BrokerItem> i = brokerList.iterator(); i.hasNext(); ) {
+            verifyPeerBrokerInfo(i.next(), max);
+        }
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+    }
+
+    class ConsumerState {
+        AtomicInteger accumulator;
+        String brokerName;
+        QueueReceiver receiver;
+        ActiveMQDestination destination;
+        Vector<Integer> expected = new Vector<Integer>();
+    }
+}