You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ra...@apache.org on 2008/08/08 20:48:07 UTC

svn commit: r684047 [2/2] - in /activemq/trunk/activemq-core/src: main/java/org/apache/activemq/group/ test/java/org/apache/activemq/group/

Modified: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java?rev=684047&r1=684046&r2=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapTest.java Fri Aug  8 11:48:06 2008
@@ -34,12 +34,12 @@
 
     /**
      * Test method for
-     * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
      * @throws Exception 
      */
     public void testAddMemberChangedListener() throws Exception {
         final AtomicInteger counter = new AtomicInteger();
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         map1.addMemberChangedListener(new MemberChangedListener(){
 
             public void memberStarted(Member member) {
@@ -65,7 +65,7 @@
             }
         }
         assertEquals(1, counter.get());
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         map2.start();
         synchronized(counter) {
             if (counter.get()<2) {
@@ -76,7 +76,7 @@
         map2.stop();
         synchronized(counter) {
             if (counter.get()>=2) {
-                counter.wait(GroupMap.DEFAULT_HEART_BEAT_INTERVAL*3);
+                counter.wait(Group.DEFAULT_HEART_BEAT_INTERVAL*3);
             }
         }
         assertEquals(1, counter.get());
@@ -85,14 +85,14 @@
 
     /**
      * Test method for
-     * {@link org.apache.activemq.group.GroupMap#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}.
+     * {@link org.apache.activemq.group.Group#addMapChangedListener(org.apache.activemq.group.MapChangedListener)}.
      * @throws Exception 
      */
     public void testAddMapChangedListener() throws Exception {
         final AtomicBoolean called1 = new AtomicBoolean();
         final AtomicBoolean called2 = new AtomicBoolean();
         
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         
         map1.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
@@ -104,7 +104,7 @@
         });
         map1.start();
         
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         
         map2.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
@@ -133,12 +133,32 @@
         map1.stop();
         map2.stop();
     }
+   
+    public void testGetImplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupMapUpdateException e) {
+        }
+        map1.stop();
+        map2.stop();
+    }
     
-    public void testGetWriteLock() throws Exception {
-        GroupMap map1 = new GroupMap(connection1, "map1");
+    public void testExpireImplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
         final AtomicBoolean called = new AtomicBoolean();
         map1.start();
-        GroupMap map2 = new GroupMap(connection2, "map2");
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setLockTimeToLive(1000);
         map2.setMinimumGroupSize(2);
         map2.start();
         map2.put("test", "foo");
@@ -147,18 +167,68 @@
             fail("Should have thrown an exception!");
         } catch (GroupMapUpdateException e) {
         }
+        Thread.sleep(2000);
+        map1.put("test", "bah");
         map1.stop();
         map2.stop();
     }
-
+    
+    public void testExpireImplicitLockOnExit() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupMapUpdateException e) {
+        }
+        map2.stop();
+        map1.put("test", "bah");
+        map1.stop();
+        
+    }
+    
+    public void testGetExplicitWriteLock() throws Exception {
+        Group map1 = new Group(connection1, "map1");
+        map1.setAlwaysLock(true);
+        final AtomicBoolean called = new AtomicBoolean();
+        map1.start();
+        Group map2 = new Group(connection2, "map2");
+        map2.setAlwaysLock(true);
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        map2.put("test", "foo");
+        map2.lock("test");
+        try {
+            map1.put("test", "bah");
+            fail("Should have thrown an exception!");
+        } catch (GroupMapUpdateException e) {
+        }
+        map2.unlock("test");
+        map1.lock("test");
+        try {
+            map2.lock("test");
+            fail("Should have thrown an exception!");
+        } catch (GroupMapUpdateException e) {
+        }
+        map1.stop();
+        map2.stop();
+    }
+    
+    
 
     /**
-     * Test method for {@link org.apache.activemq.group.GroupMap#clear()}.
+     * Test method for {@link org.apache.activemq.group.Group#clear()}.
      * 
      * @throws Exception
      */
     public void testClear() throws Exception {
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         final AtomicBoolean called = new AtomicBoolean();
         map1.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
@@ -176,7 +246,7 @@
             }
         });
         map1.start();
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         map2.start();
         map2.put("test","foo");
         synchronized(called) {
@@ -202,12 +272,12 @@
      * Test a new map is populated for existing values
      */
     public void testMapUpdatedOnStart() throws Exception {
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         final AtomicBoolean called = new AtomicBoolean();
         
         map1.start();
         map1.put("test", "foo");
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         map2.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
                 synchronized(called) {
@@ -230,9 +300,9 @@
         map1.stop();
         map2.stop();
     }
-    
+   
     public void testContainsKey() throws Exception {
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         final AtomicBoolean called = new AtomicBoolean();
         map1.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
@@ -243,7 +313,7 @@
             }
         });
         map1.start();
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         map2.start();
         map2.put("test","foo");
         synchronized(called) {
@@ -261,11 +331,11 @@
 
     /**
      * Test method for
-     * {@link org.apache.activemq.group.GroupMap#containsValue(java.lang.Object)}.
+     * {@link org.apache.activemq.group.Group#containsValue(java.lang.Object)}.
      * @throws Exception 
      */
     public void testContainsValue() throws Exception {
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         final AtomicBoolean called = new AtomicBoolean();
         map1.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
@@ -276,7 +346,7 @@
             }
         });
         map1.start();
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         map2.start();
         map2.put("test","foo");
         synchronized(called) {
@@ -299,11 +369,11 @@
 
     /**
      * Test method for
-     * {@link org.apache.activemq.group.GroupMap#get(java.lang.Object)}.
+     * {@link org.apache.activemq.group.Group#get(java.lang.Object)}.
      * @throws Exception 
      */
     public void testGet() throws Exception {
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         final AtomicBoolean called = new AtomicBoolean();
         map1.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
@@ -314,7 +384,7 @@
             }
         });
         map1.start();
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         map2.start();
         map2.put("test","foo");
         synchronized(called) {
@@ -327,15 +397,29 @@
         map1.stop();
         map2.stop();
     }
+    
+    public void testPut() throws Exception {
+        Group map1 = new Group(connection1,"map1");
+        map1.start();
+        Group map2 = new Group(connection2,"map2");
+        map2.setMinimumGroupSize(2);
+        map2.start();
+        Object value = map1.put("foo", "blob");
+        assertNull(value);
+        value = map1.put("foo", "blah");
+        assertEquals(value, "blob");
+        map1.stop();
+        map2.stop();
+    }
 
     
     
     /**
      * Test method for
-     * {@link org.apache.activemq.group.GroupMap#remove(java.lang.Object)}.
+     * {@link org.apache.activemq.group.Group#remove(java.lang.Object)}.
      */
     public void testRemove() throws Exception{
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         final AtomicBoolean called = new AtomicBoolean();
         map1.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapInsert(Member owner,Object Key, Object Value) {
@@ -353,7 +437,7 @@
             }
         });
         map1.start();
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         map2.start();
         map2.put("test","foo");
         synchronized(called) {
@@ -380,7 +464,7 @@
         final AtomicBoolean called1 = new AtomicBoolean();
         final AtomicBoolean called2 = new AtomicBoolean();
         
-        GroupMap map1 = new GroupMap(connection1,"map1");
+        Group map1 = new Group(connection1,"map1");
         map1.setTimeToLive(1000);
         map1.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapRemove(Member owner, Object key, Object value,boolean expired) {        
@@ -392,7 +476,7 @@
         });
         map1.start();
         
-        GroupMap map2 = new GroupMap(connection2,"map2");
+        Group map2 = new Group(connection2,"map2");
         
         map2.addMapChangedListener(new DefaultMapChangedListener() {
             public void mapRemove(Member owner, Object key, Object value,boolean expired) {        

Copied: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java (from r683259, activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java)
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java?p2=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java&p1=activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java&r1=683259&r2=684047&rev=684047&view=diff
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMapMemberTest.java (original)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMemberTest.java Fri Aug  8 11:48:06 2008
@@ -26,48 +26,102 @@
 import org.apache.activemq.broker.BrokerService;
 
 
-public class GroupMapMemberTest extends TestCase {
+public class GroupMemberTest extends TestCase {
     protected BrokerService broker;
     protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
 
+    
+    public void testCoordinatorSelection() throws Exception{
+        Group group = new Group(null,"");
+        List<Member>list = new ArrayList<Member>();
+        final int number =10;
+        Member choosen = null;
+        for (int i =0;i< number;i++) {
+            Member m = new Member("group"+i);
+            m.setId(""+i);
+            if (number/2==i) {
+                m.setCoordinatorWeight(10);
+                choosen=m;
+            }
+            list.add(m);
+        }
+        Member c = group.selectCordinator(list);
+        assertEquals(c,choosen);
+    }
     /**
      * Test method for
-     * {@link org.apache.activemq.group.GroupMap#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
+     * {@link org.apache.activemq.group.Group#addMemberChangedListener(org.apache.activemq.group.MemberChangedListener)}.
      * @throws Exception 
      */
     public void testGroup() throws Exception {
         
-        int number = 20;
+        final int number = 10;
+        List<Connection>connections = new ArrayList<Connection>();
+        List<Group>groupMaps = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i =0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            connection.start();
+            connections.add(connection);
+            Group map = new Group(connection,"map"+i);
+            map.setHeartBeatInterval(200);
+            if(i ==number-1) {
+                map.setMinimumGroupSize(number);
+            }
+            map.start();
+            groupMaps.add(map);
+        }
+        
+        int coordinatorNumber = 0;
+        for (Group map:groupMaps) {
+            if (map.isCoordinator()) {
+                coordinatorNumber++;
+            }
+        }
+        for(Group map:groupMaps) {
+            map.stop();
+        }
+        for (Connection connection:connections) {
+            connection.stop();
+        }
+        
+    }
+    
+public void XtestWeightedGroup() throws Exception {
+        
+        final int number = 10;
         List<Connection>connections = new ArrayList<Connection>();
-        List<GroupMap>groupMaps = new ArrayList<GroupMap>();
+        List<Group>groupMaps = new ArrayList<Group>();
+        Group last = null;
         ConnectionFactory factory = createConnectionFactory();
         for (int i =0; i < number; i++) {
             Connection connection = factory.createConnection();
             connection.start();
             connections.add(connection);
-            GroupMap map = new GroupMap(connection,"map"+i);
-            map.setHeartBeatInterval(20000);
+            Group map = new Group(connection,"map"+i);
+            map.setHeartBeatInterval(200);
             if(i ==number-1) {
                 map.setMinimumGroupSize(number);
+                map.setCoordinatorWeight(10);
+                last=map;
             }
             map.start();
             groupMaps.add(map);
         }
         
         int coordinator = 0;
-        for (GroupMap map:groupMaps) {
+        Group groupCoordinator = null;
+        for (Group map:groupMaps) {
             if (map.isCoordinator()) {
                 coordinator++;
+                groupCoordinator=map;
             }
         }
                
+        assertNotNull(groupCoordinator);
+        assertEquals(groupCoordinator, last);
         assertEquals(1,coordinator);
-        groupMaps.get(0).put("key", "value");
-        Thread.sleep(2000);
-        for (GroupMap map:groupMaps) {
-            assertTrue(map.get("key").equals("value"));
-        }
-        for(GroupMap map:groupMaps) {
+        for(Group map:groupMaps) {
             map.stop();
         }
         for (Connection connection:connections) {

Added: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java?rev=684047&view=auto
==============================================================================
--- activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java (added)
+++ activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java Fri Aug  8 11:48:06 2008
@@ -0,0 +1,256 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.group;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicInteger;
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import junit.framework.TestCase;
+import org.apache.activemq.ActiveMQConnection;
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.broker.BrokerService;
+
+public class GroupMessageTest extends TestCase {
+    protected BrokerService broker;
+    protected String bindAddress = ActiveMQConnectionFactory.DEFAULT_BROKER_BIND_URL;
+
+    public void testGroupBroadcast() throws Exception {
+        final int number = 10;
+        final AtomicInteger count = new AtomicInteger();
+        List<Connection> connections = new ArrayList<Connection>();
+        List<Group> groups = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i = 0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            connection.start();
+            connections.add(connection);
+            Group group = new Group(connection, "group" + i);
+            group.setHeartBeatInterval(20000);
+            if (i == number - 1) {
+                group.setMinimumGroupSize(number);
+            }
+            group.start();
+            groups.add(group);
+            group.addGroupMessageListener(new GroupMessageListener() {
+                public void messageDelivered(Member sender, String replyId,
+                        Object message) {
+                    synchronized (count) {
+                        if (count.incrementAndGet() == number) {
+                            count.notifyAll();
+                        }
+                    }
+                }
+            });
+        }
+        groups.get(0).broadcastMessage("hello");
+        synchronized (count) {
+            if (count.get() < number) {
+                count.wait(5000);
+            }
+        }
+        assertEquals(number, count.get());
+        for (Group map : groups) {
+            map.stop();
+        }
+        for (Connection connection : connections) {
+            connection.stop();
+        }
+    }
+
+    public void testsendMessage() throws Exception {
+        final int number = 10;
+        final AtomicInteger count = new AtomicInteger();
+        List<Connection> connections = new ArrayList<Connection>();
+        List<Group> groups = new ArrayList<Group>();
+        ConnectionFactory factory = createConnectionFactory();
+        for (int i = 0; i < number; i++) {
+            Connection connection = factory.createConnection();
+            connection.start();
+            connections.add(connection);
+            Group group = new Group(connection, "group" + i);
+            group.setHeartBeatInterval(20000);
+            if (i == number - 1) {
+                group.setMinimumGroupSize(number);
+            }
+            group.start();
+            groups.add(group);
+            group.addGroupMessageListener(new GroupMessageListener() {
+                public void messageDelivered(Member sender, String replyId,
+                        Object message) {
+                    synchronized (count) {
+                        count.incrementAndGet();
+                        count.notifyAll();
+                    }
+                }
+            });
+        }
+        groups.get(0).sendMessage("hello");
+        synchronized (count) {
+            if (count.get() == 0) {
+                count.wait(5000);
+            }
+        }
+        // wait a while to check that only one got it
+        Thread.sleep(2000);
+        assertEquals(1, count.get());
+        for (Group map : groups) {
+            map.stop();
+        }
+        for (Connection connection : connections) {
+            connection.stop();
+        }
+    }
+
+    public void testSendToSingleMember() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection1 = factory.createConnection();
+        Connection connection2 = factory.createConnection();
+        connection1.start();
+        connection2.start();
+        Group group1 = new Group(connection1, "group1");
+        final AtomicBoolean called = new AtomicBoolean();
+        group1.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                synchronized (called) {
+                    called.set(true);
+                    called.notifyAll();
+                }
+            }
+        });
+        group1.start();
+        Group group2 = new Group(connection2, "group2");
+        group2.setMinimumGroupSize(2);
+        group2.start();
+        Member member1 = group2.getMemberByName("group1");
+        group2.sendMessage(member1, "hello");
+        synchronized (called) {
+            if (!called.get()) {
+                called.wait(5000);
+            }
+        }
+        assertTrue(called.get());
+        group1.stop();
+        group2.stop();
+        connection1.close();
+        connection2.close();
+    }
+
+    public void testSendRequestReply() throws Exception {
+        ConnectionFactory factory = createConnectionFactory();
+        Connection connection1 = factory.createConnection();
+        Connection connection2 = factory.createConnection();
+        connection1.start();
+        connection2.start();
+        final int number = 1000;
+        final AtomicInteger requestCount = new AtomicInteger();
+        final AtomicInteger replyCount = new AtomicInteger();
+        final List<String> requests = new ArrayList<String>();
+        final List<String> replies = new ArrayList<String>();
+        for (int i = 0; i < number; i++) {
+            requests.add("request" + i);
+            replies.add("reply" + i);
+        }
+        final Group group1 = new Group(connection1, "group1");
+        final AtomicBoolean finished = new AtomicBoolean();
+        group1.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                if (!replies.isEmpty()) {
+                    String reply = replies.remove(0);
+                    try {
+                        group1.sendMessageResponse(sender, replyId, reply);
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                } 
+            }
+        });
+        group1.start();
+        final Group group2 = new Group(connection2, "group2");
+        group2.setMinimumGroupSize(2);
+        group2.addGroupMessageListener(new GroupMessageListener() {
+            public void messageDelivered(Member sender, String replyId,
+                    Object message) {
+                if (!requests.isEmpty()) {
+                    String request = requests.remove(0);
+                    try {
+                        group2.sendMessage(sender, request);
+                    } catch (JMSException e) {
+                        e.printStackTrace();
+                    }
+                }else {
+                    synchronized (finished) {
+                        finished.set(true);
+                        finished.notifyAll();
+                    }
+                }
+            }
+        });
+        group2.start();
+        Member member1 = group2.getMemberByName("group1");
+        group2.sendMessage(member1, requests.remove(0));
+        synchronized (finished) {
+            if (!finished.get()) {
+                finished.wait(10000);
+            }
+        }
+        assertTrue(finished.get());
+        group1.stop();
+        group2.stop();
+        connection1.close();
+        connection2.close();
+    }
+
+    protected void setUp() throws Exception {
+        if (broker == null) {
+            broker = createBroker();
+        }
+        super.setUp();
+    }
+
+    protected void tearDown() throws Exception {
+        super.tearDown();
+        if (broker != null) {
+            broker.stop();
+        }
+    }
+
+    protected ActiveMQConnectionFactory createConnectionFactory()
+            throws Exception {
+        ActiveMQConnectionFactory cf = new ActiveMQConnectionFactory(
+                ActiveMQConnection.DEFAULT_BROKER_URL);
+        return cf;
+    }
+
+    protected BrokerService createBroker() throws Exception {
+        BrokerService answer = new BrokerService();
+        configureBroker(answer);
+        answer.start();
+        return answer;
+    }
+
+    protected void configureBroker(BrokerService answer) throws Exception {
+        answer.setPersistent(false);
+        answer.addConnector(bindAddress);
+        answer.setDeleteAllMessagesOnStartup(true);
+    }
+}

Propchange: activemq/trunk/activemq-core/src/test/java/org/apache/activemq/group/GroupMessageTest.java
------------------------------------------------------------------------------
    svn:eol-style = native