You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/03/06 23:30:45 UTC

[12/15] activemq-6 git commit: Refactored the testsuite a bit

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java
----------------------------------------------------------------------
diff --git a/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java b/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java
deleted file mode 100644
index c101c9c..0000000
--- a/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.concurrent.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.core.protocol.stomp.Stomp;
-import org.apache.activemq.tests.integration.stomp.StompTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ConcurrentStompTest extends StompTestBase
-{
-   private Socket stompSocket_2;
-
-   private ByteArrayOutputStream inputBuffer_2;
-
-   /**
-    * Send messages on 1 socket and receives them concurrently on another socket.
-    */
-   @Test
-   public void testSendManyMessages() throws Exception
-   {
-      try
-      {
-         String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-
-         sendFrame(connect);
-         String connected = receiveFrame(10000);
-         Assert.assertTrue(connected.startsWith("CONNECTED"));
-
-         stompSocket_2 = createSocket();
-         inputBuffer_2 = new ByteArrayOutputStream();
-
-         sendFrame(stompSocket_2, connect);
-         connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
-         Assert.assertTrue(connected.startsWith("CONNECTED"));
-
-         final int count = 1000;
-         final CountDownLatch latch = new CountDownLatch(count);
-
-         String subscribe =
-            "SUBSCRIBE\n" +
-               "destination:" + getQueuePrefix() + getQueueName() + "\n" +
-               "ack:auto\n\n" +
-               Stomp.NULL;
-         sendFrame(stompSocket_2, subscribe);
-         Thread.sleep(2000);
-
-         new Thread()
-         {
-            @Override
-            public void run()
-            {
-               int i = 0;
-               while (true)
-               {
-                  try
-                  {
-                     String frame = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
-                     Assert.assertTrue(frame.startsWith("MESSAGE"));
-                     Assert.assertTrue(frame.indexOf("destination:") > 0);
-                     System.out.println("<<< " + i++);
-                     latch.countDown();
-                  }
-                  catch (Exception e)
-                  {
-                     break;
-                  }
-               }
-            }
-         }.start();
-
-         String send = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n";
-         for (int i = 1; i <= count; i++)
-         {
-            // Thread.sleep(1);
-            System.out.println(">>> " + i);
-            sendFrame(send + "count:" + i + "\n\n" + Stomp.NULL);
-         }
-
-         assertTrue(latch.await(60, TimeUnit.SECONDS));
-
-      }
-      finally
-      {
-         stompSocket_2.close();
-         inputBuffer_2.close();
-      }
-
-
-   }
-
-   // Implementation methods
-   // -------------------------------------------------------------------------
-   public void sendFrame(Socket socket, String data) throws Exception
-   {
-      byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
-      OutputStream outputStream = socket.getOutputStream();
-      for (byte b : bytes)
-      {
-         outputStream.write(b);
-      }
-      outputStream.flush();
-   }
-
-   public String receiveFrame(Socket socket, ByteArrayOutputStream input, long timeOut) throws Exception
-   {
-      socket.setSoTimeout((int) timeOut);
-      InputStream is = socket.getInputStream();
-      int c = 0;
-      for (;;)
-      {
-         c = is.read();
-         if (c < 0)
-         {
-            throw new IOException("socket closed.");
-         }
-         else if (c == 0)
-         {
-            c = is.read();
-            if (c != '\n')
-            {
-               byte[] ba = input.toByteArray();
-               System.out.println(new String(ba, StandardCharsets.UTF_8));
-            }
-            Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
-            byte[] ba = input.toByteArray();
-            input.reset();
-            return new String(ba, StandardCharsets.UTF_8);
-         }
-         else
-         {
-            input.write(c);
-         }
-      }
-   }
-
-}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml
new file mode 100644
index 0000000..3329bf3
--- /dev/null
+++ b/tests/extra-tests/pom.xml
@@ -0,0 +1,246 @@
+<!--
+  Licensed to the Apache Software Foundation (ASF) under one or more
+  contributor license agreements.  See the NOTICE file distributed with
+  this work for additional information regarding copyright ownership.
+  The ASF licenses this file to You under the Apache License, Version 2.0
+  (the "License"); you may not use this file except in compliance with
+  the License.  You may obtain a copy of the License at
+
+  http://www.apache.org/licenses/LICENSE-2.0
+
+  Unless required by applicable law or agreed to in writing, software
+  distributed under the License is distributed on an "AS IS" BASIS,
+  WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+  See the License for the specific language governing permissions and
+  limitations under the License.
+-->
+
+<!-- This test folder contains tests that are not part of the regular testsuite
+     because they use optional libraries such as LGPL or private ones.
+     They are optional and will validate extra functionality available through Service Integration
+     Example: Transaction Manager -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+         xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+   <modelVersion>4.0.0</modelVersion>
+   <parent>
+      <groupId>org.apache.activemq.tests</groupId>
+      <artifactId>activemq-tests-pom</artifactId>
+      <version>6.0.1-SNAPSHOT</version>
+   </parent>
+
+   <artifactId>extra-tests</artifactId>
+   <packaging>jar</packaging>
+   <name>ActiveMQ6 Extra Tests</name>
+
+   <properties>
+      <tools.jar>${java.home}/../lib/tools.jar</tools.jar>
+      <byteman.version>2.2.0</byteman.version>
+      <activemq.basedir>${project.basedir}/../..</activemq.basedir>
+   </properties>
+
+   <dependencies>
+      <dependency>
+         <groupId>io.netty</groupId>
+         <artifactId>netty-all</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.byteman</groupId>
+         <artifactId>byteman</artifactId>
+         <version>${byteman.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.byteman</groupId>
+         <artifactId>byteman-submit</artifactId>
+         <scope>test</scope>
+         <version>${byteman.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.byteman</groupId>
+         <artifactId>byteman-install</artifactId>
+         <scope>test</scope>
+         <version>${byteman.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.byteman</groupId>
+         <artifactId>byteman-bmunit</artifactId>
+         <scope>test</scope>
+         <version>${byteman.version}</version>
+         <exclusions>
+            <exclusion>
+               <groupId>org.testng</groupId>
+               <artifactId>testng</artifactId>
+            </exclusion>
+         </exclusions>
+      </dependency>
+      <dependency>
+         <groupId>com.sun</groupId>
+         <artifactId>tools</artifactId>
+         <version>1.7</version>
+         <scope>system</scope>
+         <systemPath>${tools.jar}</systemPath>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-core-client</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-server</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq.tests</groupId>
+         <artifactId>integration-tests</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq.tests</groupId>
+         <artifactId>unit-tests</artifactId>
+         <version>${project.version}</version>
+         <scope>test</scope>
+         <type>test-jar</type>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-jms-client</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-jms-server</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-ra</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.activemq</groupId>
+         <artifactId>activemq-bootstrap</artifactId>
+         <version>${project.version}</version>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>commons-logging</groupId>
+         <artifactId>commons-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-ejb_3.0_spec</artifactId>
+      </dependency>
+      <!--this specifically for the JMS Bridge -->
+      <dependency>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jta_1.1_spec</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.geronimo.components</groupId>
+         <artifactId>geronimo-jaspi</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.apache.geronimo.specs</groupId>
+         <artifactId>geronimo-jms_2.0_spec</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging-processor</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logging</groupId>
+         <artifactId>jboss-logging</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss.logmanager</groupId>
+         <artifactId>jboss-logmanager</artifactId>
+      </dependency>
+      <dependency>
+         <groupId>junit</groupId>
+         <artifactId>junit</artifactId>
+      </dependency>
+
+
+
+      <!-- Needed for JMS Bridge Tests -->
+      <dependency>
+         <groupId>org.jboss.jbossts.jts</groupId>
+         <artifactId>jbossjts-jacorb</artifactId>
+         <version>4.17.13.Final</version>
+      </dependency>
+      <dependency>
+         <groupId>org.jboss</groupId>
+         <artifactId>jboss-transaction-spi</artifactId>
+         <version>7.1.0.Final</version>
+      </dependency>
+
+   </dependencies>
+
+   <build>
+      <plugins>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-jar-plugin</artifactId>
+            <executions>
+               <execution>
+                  <phase>test</phase>
+                  <goals>
+                     <goal>test-jar</goal>
+                  </goals>
+               </execution>
+            </executions>
+         </plugin>
+         <plugin>
+            <groupId>org.apache.maven.plugins</groupId>
+            <artifactId>maven-surefire-plugin</artifactId>
+            <configuration>
+               <skipTests>${skipExtraTests}</skipTests>
+               <!-- ensure we don't inherit a byteman jar form any env settings -->
+               <environmentVariables>
+                  <BYTEMAN_HOME></BYTEMAN_HOME>
+               </environmentVariables>
+               <systemProperties>
+                  <!--
+                  <property>
+                     <name>org.jboss.byteman.home</name>
+                     <value></value>
+                  </property>
+                  <property>
+                     <name>org.jboss.byteman.verbose</name>
+                     <value>true</value>
+                  </property>
+                  <property>
+                     <name>org.jboss.byteman.contrib.bmunit.verbose</name>
+                     <value>true</value>
+                  </property>
+                  <property>
+                     <name>org.jboss.byteman.dump.generated.classes</name>
+                     <value></value>
+                  </property>
+                  -->
+               </systemProperties>
+               <!-- make sure maven puts the byteman jar in the classpath rather than in a manifest jar -->
+               <useManifestOnlyJar>false</useManifestOnlyJar>
+               <forkMode>once</forkMode>
+               <!--
+               <debugForkedProcess>true</debugForkedProcess>
+               -->
+               <parallel>false</parallel>
+               <!--<argLine>${activemq-surefire-argline} -Dorg.jboss.byteman.verbose -Dorg.jboss.byteman.contrib.bmunit.verbose</argLine>-->
+               <!-- '-noverify' is needed here to fix VerifyErrors on ScaleDownFailoverTest and ScaleDownFailureTest (and their subclasses). I got the tip from https://issues.jboss.org/browse/BYTEMAN-248. -->
+               <argLine>${activemq-surefire-argline} -noverify</argLine>
+            </configuration>
+         </plugin>
+      </plugins>
+   </build>
+
+</project>

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java
new file mode 100644
index 0000000..c97d01a
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras;
+
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.MessageLogger;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         4/18/12
+ */
+@MessageLogger(projectCode = "HQTEST")
+public interface ExtrasTestLogger extends BasicLogger
+{
+   /**
+    * The integration test logger.
+    */
+   ExtrasTestLogger LOGGER = Logger.getMessageLogger(ExtrasTestLogger.class, ExtrasTestLogger.class.getPackage().getName());
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java
new file mode 100644
index 0000000..0914295
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.core.postoffice.Binding;
+import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.ra.inflow.ActiveMQActivationSpec;
+import org.apache.activemq.tests.integration.ra.ActiveMQRATestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.jms.Message;
+import javax.resource.ResourceException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         Created May 20, 2010
+ */
+@RunWith(BMUnitRunner.class)
+public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
+{
+
+   protected boolean usePersistence()
+   {
+      return true;
+   }
+
+
+   @Override
+   public boolean useSecurity()
+   {
+      return false;
+   }
+
+   @Test
+   @BMRules
+   (
+      rules =
+            {
+               @BMRule
+                     (
+                           name = "interrupt",
+                           targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
+                           targetMethod = "xaEnd",
+                           targetLocation = "ENTRY",
+                           action = "org.apache.activemq.tests.extras.byteman.ActiveMQMessageHandlerTest.interrupt();"
+                     )
+            }
+   )
+   public void testSimpleMessageReceivedOnQueue() throws Exception
+   {
+      ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+      resourceAdapter = qResourceAdapter;
+
+      MyBootstrapContext ctx = new MyBootstrapContext();
+
+      qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
+      qResourceAdapter.start(ctx);
+
+      ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+      spec.setMaxSession(1);
+      spec.setCallTimeout(1000L);
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+
+      CountDownLatch latch = new CountDownLatch(1);
+
+      XADummyEndpoint endpoint = new XADummyEndpoint(latch, false);
+
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
+
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+      ClientSession session = locator.createSessionFactory().createSession();
+
+      ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+
+      ClientMessage message = session.createMessage(true);
+
+      message.getBodyBuffer().writeString("teststring");
+
+      clientProducer.send(message);
+
+      session.close();
+
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+
+      qResourceAdapter.stop();
+
+      Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED));
+      assertEquals(1, getMessageCount(((Queue) binding.getBindable())));
+
+      server.stop();
+      server.start();
+
+      ClientSessionFactory factory = locator.createSessionFactory();
+      session = factory.createSession(true, true);
+
+      session.start();
+      ClientConsumer consumer = session.createConsumer(MDBQUEUEPREFIXED);
+      assertNotNull(consumer.receive(5000));
+      session.close();
+   }
+
+   @Test
+   @BMRules
+         (
+               rules =
+                     {
+                           @BMRule
+                                 (
+                                       name = "interrupt",
+                                       targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
+                                       targetMethod = "xaEnd",
+                                       targetLocation = "ENTRY",
+                                       action = "org.apache.activemq.tests.extras.byteman.ActiveMQMessageHandlerTest.interrupt();"
+                                 )
+                     }
+         )
+   public void testSimpleMessageReceivedOnQueueTwoPhase() throws Exception
+   {
+      ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+      resourceAdapter = qResourceAdapter;
+
+      MyBootstrapContext ctx = new MyBootstrapContext();
+
+      qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
+      qResourceAdapter.start(ctx);
+
+      ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+      spec.setMaxSession(1);
+      spec.setCallTimeout(1000L);
+      spec.setResourceAdapter(qResourceAdapter);
+      spec.setUseJNDI(false);
+      spec.setDestinationType("javax.jms.Queue");
+      spec.setDestination(MDBQUEUE);
+
+      CountDownLatch latch = new CountDownLatch(1);
+
+      XADummyEndpoint endpoint = new XADummyEndpoint(latch, true);
+
+      DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
+
+      qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+      ClientSession session = locator.createSessionFactory().createSession();
+
+      ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+
+      ClientMessage message = session.createMessage(true);
+
+      message.getBodyBuffer().writeString("teststring");
+
+      clientProducer.send(message);
+
+      session.close();
+
+      latch.await(5, TimeUnit.SECONDS);
+
+      assertNotNull(endpoint.lastMessage);
+      assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+
+      qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+
+      qResourceAdapter.stop();
+
+      Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED));
+      assertEquals(1, getMessageCount(((Queue) binding.getBindable())));
+
+
+      server.stop();
+      server.start();
+
+      ClientSessionFactory factory = locator.createSessionFactory();
+      session = factory.createSession(true, true);
+
+      session.start();
+      ClientConsumer consumer = session.createConsumer(MDBQUEUEPREFIXED);
+      assertNotNull(consumer.receive(5000));
+      session.close();
+   }
+
+   static volatile ActiveMQResourceAdapter resourceAdapter;
+   static boolean resourceAdapterStopped = false;
+   public static void interrupt() throws InterruptedException
+   {
+      //Thread.currentThread().interrupt();
+      if (!resourceAdapterStopped)
+      {
+         resourceAdapter.stop();
+         resourceAdapterStopped = true;
+         throw new InterruptedException("foo");
+      }
+      //Thread.currentThread().interrupt();
+   }
+
+   Transaction currentTX;
+
+   public class XADummyEndpoint extends DummyMessageEndpoint
+   {
+      final boolean twoPhase;
+      ClientSession session;
+      int afterDeliveryCounts = 0;
+
+      public XADummyEndpoint(CountDownLatch latch, boolean twoPhase) throws SystemException
+      {
+         super(latch);
+         this.twoPhase = twoPhase;
+         try
+         {
+            session = locator.createSessionFactory().createSession(true, false, false);
+         }
+         catch (Throwable e)
+         {
+            throw new RuntimeException(e);
+         }
+      }
+
+      @Override
+      public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
+      {
+         super.beforeDelivery(method);
+         try
+         {
+            DummyTMLocator.tm.begin();
+            currentTX = DummyTMLocator.tm.getTransaction();
+            currentTX.enlistResource(xaResource);
+            if (twoPhase)
+            {
+               currentTX.enlistResource(new DummyXAResource());
+            }
+         }
+         catch (Throwable e)
+         {
+            throw new RuntimeException(e.getMessage(), e);
+         }
+      }
+
+      public void onMessage(Message message)
+      {
+         super.onMessage(message);
+//         try
+//         {
+//            lastMessage = (ActiveMQMessage) message;
+//            currentTX.enlistResource(session);
+//            ClientProducer prod = session.createProducer()
+//         }
+//         catch (Exception e)
+//         {
+//            e.printStackTrace();
+//         }
+
+
+      }
+
+
+
+      @Override
+      public void afterDelivery() throws ResourceException
+      {
+         afterDeliveryCounts++;
+         try
+         {
+            currentTX.commit();
+         }
+         catch (Throwable e)
+         {
+            //its unsure as to whether the EJB/JCA layer will handle this or throw it to us,
+            // either way we don't do anything else so its fine just to throw.
+            // NB this will only happen with 2 phase commit
+            throw new RuntimeException(e);
+         }
+         super.afterDelivery();
+      }
+   }
+
+   @Before
+   public void setUp() throws Exception
+   {
+      resourceAdapter = null;
+      resourceAdapterStopped = false;
+      super.setUp();
+      DummyTMLocator.startTM();
+   }
+
+
+   @After
+   public void tearDown() throws Exception
+   {
+      DummyTMLocator.stopTM();
+      super.tearDown();
+   }
+
+   public static class DummyTMLocator
+   {
+      public static TransactionManagerImple tm;
+      public static void stopTM()
+      {
+         try
+         {
+            TransactionReaper.terminate(true);
+            TxControl.disable(true);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+         tm = null;
+      }
+      public static void startTM()
+      {
+         tm = new TransactionManagerImple();
+         TxControl.enable();
+      }
+      public TransactionManager getTM()
+      {
+         return tm;
+      }
+   }
+
+   static class DummyXAResource implements XAResource
+   {
+      @Override
+      public void commit(Xid xid, boolean b) throws XAException
+      {
+
+      }
+
+      @Override
+      public void end(Xid xid, int i) throws XAException
+      {
+
+      }
+
+      @Override
+      public void forget(Xid xid) throws XAException
+      {
+
+      }
+
+      @Override
+      public int getTransactionTimeout() throws XAException
+      {
+         return 0;
+      }
+
+      @Override
+      public boolean isSameRM(XAResource xaResource) throws XAException
+      {
+         return false;
+      }
+
+      @Override
+      public int prepare(Xid xid) throws XAException
+      {
+         return 0;
+      }
+
+      @Override
+      public Xid[] recover(int i) throws XAException
+      {
+         return new Xid[0];
+      }
+
+      @Override
+      public void rollback(Xid xid) throws XAException
+      {
+
+      }
+
+      @Override
+      public boolean setTransactionTimeout(int i) throws XAException
+      {
+         return false;
+      }
+
+      @Override
+      public void start(Xid xid, int i) throws XAException
+      {
+
+      }
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java
new file mode 100644
index 0000000..fb91994
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.api.core.ActiveMQTransactionOutcomeUnknownException;
+import org.apache.activemq.api.core.ActiveMQTransactionRolledBackException;
+import org.apache.activemq.api.core.ActiveMQUnBlockedException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.core.postoffice.Binding;
+import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.transaction.impl.XidImpl;
+import org.apache.activemq.tests.integration.cluster.failover.FailoverTestBase;
+import org.apache.activemq.tests.integration.cluster.util.TestableServer;
+import org.apache.activemq.tests.util.RandomUtil;
+import org.apache.activemq.utils.UUIDGenerator;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ *         4/18/13
+ */
+@RunWith(BMUnitRunner.class)
+public class BMFailoverTest extends FailoverTestBase
+{
+   private ServerLocator locator;
+   private ClientSessionFactoryInternal sf;
+   private ClientSessionFactoryInternal sf2;
+   public static TestableServer serverToStop;
+
+   @Before
+   @Override
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      stopped = false;
+      locator = getServerLocator();
+   }
+
+   @After
+   @Override
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   private static boolean stopped = false;
+   public static void stopAndThrow() throws ActiveMQUnBlockedException
+   {
+      if (!stopped)
+      {
+         try
+         {
+            serverToStop.getServer().stop(true);
+         }
+         catch (Exception e)
+         {
+            e.printStackTrace();
+         }
+         try
+         {
+            Thread.sleep(2000);
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();
+         }
+         stopped = true;
+         throw ActiveMQClientMessageBundle.BUNDLE.unblockingACall(null);
+      }
+   }
+   @Test
+   @BMRules
+   (
+         rules =
+               {
+                     @BMRule
+                           (
+                                 name = "trace ActiveMQSessionContext xaEnd",
+                                 targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
+                                 targetMethod = "xaEnd",
+                                 targetLocation = "AT EXIT",
+                                 action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.stopAndThrow()"
+                           )
+               }
+   )
+   //https://bugzilla.redhat.com/show_bug.cgi?id=1152410
+   public void testFailOnEndAndRetry() throws Exception
+   {
+      serverToStop = liveServer;
+
+      createSessionFactory();
+
+      ClientSession session = createSession(sf, true, false, false);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+      for (int i = 0; i < 100; i++)
+      {
+         producer.send(createMessage(session, i, true));
+      }
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+      Xid xid = RandomUtil.randomXid();
+
+      session.start(xid, XAResource.TMNOFLAGS);
+      session.start();
+      // Receive MSGs but don't ack!
+      for (int i = 0; i < 100; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         Assert.assertNotNull(message);
+
+         assertMessageBody(i, message);
+
+         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+      }
+      try
+      {
+         //top level prepare
+         session.end(xid, XAResource.TMSUCCESS);
+      }
+      catch (XAException e)
+      {
+         try
+         {
+            //top level abort
+            session.end(xid, XAResource.TMFAIL);
+         }
+         catch (XAException e1)
+         {
+            try
+            {
+               //rollback
+               session.rollback(xid);
+            }
+            catch (XAException e2)
+            {
+            }
+         }
+      }
+      xid = RandomUtil.randomXid();
+      session.start(xid, XAResource.TMNOFLAGS);
+
+      for (int i = 0; i < 50; i++)
+      {
+         ClientMessage message = consumer.receive(1000);
+
+         Assert.assertNotNull(message);
+
+         assertMessageBody(i, message);
+
+         Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+      }
+      session.end(xid, XAResource.TMSUCCESS);
+      session.commit(xid, true);
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "trace clientsessionimpl commit",
+                     targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
+                     targetMethod = "start(javax.transaction.xa.Xid, int)",
+                     targetLocation = "AT EXIT",
+                     action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
+                  )
+            }
+      )
+   public void testFailoverOnCommit2() throws Exception
+   {
+      serverToStop = liveServer;
+      locator = getServerLocator();
+      locator.setFailoverOnInitialConnection(true);
+      SimpleString inQueue = new SimpleString("inQueue");
+      SimpleString outQueue = new SimpleString("outQueue");
+      createSessionFactory();
+      createSessionFactory2();
+
+      // closeable will take care of closing it
+      try (ClientSession session = sf.createSession(false, true, true);
+           ClientProducer sendInitialProducer = session.createProducer();)
+      {
+         session.createQueue(inQueue, inQueue, null, true);
+         session.createQueue(outQueue, outQueue, null, true);
+         sendInitialProducer.send(inQueue, createMessage(session, 0, true));
+      }
+
+      ClientSession xaSessionRec = addClientSession(sf.createSession(true, false, false));
+
+      ClientConsumer consumer = addClientConsumer(xaSessionRec.createConsumer(inQueue));
+
+      byte[] globalTransactionId = UUIDGenerator.getInstance().generateStringUUID().getBytes();
+      Xid xidRec = new XidImpl("xa2".getBytes(), 1, globalTransactionId);
+
+      xaSessionRec.start();
+
+      xaSessionRec.getXAResource().start(xidRec, XAResource.TMNOFLAGS);
+
+      //failover is now occurring, receive, ack and end will be called whilst this is happening.
+
+      ClientMessageImpl m = (ClientMessageImpl) consumer.receive(5000);
+
+      assertNotNull(m);
+
+      System.out.println("********************" + m.getIntProperty("counter"));
+      //the mdb would ack the message before calling onMessage()
+      m.acknowledge();
+
+      try
+      {
+         //this may fail but thats ok, it depends on the race and when failover actually happens
+         xaSessionRec.end(xidRec, XAResource.TMSUCCESS);
+      }
+      catch (XAException ignore)
+      {
+      }
+
+      //we always reset the client on the RA
+      ((ClientSessionInternal) xaSessionRec).resetIfNeeded();
+
+      // closeable will take care of closing it
+      try (ClientSession session = sf.createSession(false, true, true);
+           ClientProducer sendInitialProducer = session.createProducer();)
+      {
+         sendInitialProducer.send(inQueue, createMessage(session, 0, true));
+      }
+
+      //now receive and send a message successfully
+
+      globalTransactionId = UUIDGenerator.getInstance().generateStringUUID().getBytes();
+      xidRec = new XidImpl("xa4".getBytes(), 1, globalTransactionId);
+      xaSessionRec.getXAResource().start(xidRec, XAResource.TMNOFLAGS);
+
+      Binding binding = backupServer.getServer().getPostOffice().getBinding(inQueue);
+      Queue inQ = (Queue) binding.getBindable();
+
+      m = (ClientMessageImpl) consumer.receive(5000);
+
+      assertNotNull(m);
+      //the mdb would ack the message before calling onMessage()
+      m.acknowledge();
+
+      System.out.println("********************" + m.getIntProperty("counter"));
+
+      xaSessionRec.getXAResource().end(xidRec, XAResource.TMSUCCESS);
+      xaSessionRec.getXAResource().prepare(xidRec);
+      xaSessionRec.getXAResource().commit(xidRec, false);
+
+
+      //let's close the consumer so anything pending is handled
+      consumer.close();
+
+      assertEquals(1, getMessageCount(inQ));
+   }
+
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "trace clientsessionimpl commit",
+                     targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
+                     targetMethod = "commit",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
+                  )
+            }
+      )
+   public void testFailoverOnCommit() throws Exception
+   {
+      serverToStop = liveServer;
+      locator = getServerLocator();
+      locator.setFailoverOnInitialConnection(true);
+      createSessionFactory();
+      ClientSession session = createSessionAndQueue();
+
+      ClientProducer producer = addClientProducer(session.createProducer(FailoverTestBase.ADDRESS));
+
+      sendMessages(session, producer, 10);
+      try
+      {
+         session.commit();
+         fail("should have thrown an exception");
+      }
+      catch (ActiveMQTransactionOutcomeUnknownException e)
+      {
+         //pass
+      }
+      sendMessages(session, producer, 10);
+      session.commit();
+      Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable();
+      assertEquals(10, getMessageCount(bindable));
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "trace clientsessionimpl commit",
+                     targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
+                     targetMethod = "commit",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
+                  )
+            }
+      )
+   public void testFailoverOnReceiveCommit() throws Exception
+   {
+      serverToStop = liveServer;
+      locator = getServerLocator();
+      locator.setFailoverOnInitialConnection(true);
+      createSessionFactory();
+      ClientSession session = createSessionAndQueue();
+
+      ClientSession sendSession = createSession(sf, true, true);
+
+      ClientProducer producer = addClientProducer(sendSession.createProducer(FailoverTestBase.ADDRESS));
+
+      sendMessages(sendSession, producer, 10);
+
+      ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+      session.start();
+      for (int i = 0; i < 10; i++)
+      {
+         ClientMessage m = consumer.receive(500);
+         assertNotNull(m);
+         m.acknowledge();
+      }
+      try
+      {
+         session.commit();
+         fail("should have thrown an exception");
+      }
+      catch (ActiveMQTransactionOutcomeUnknownException e)
+      {
+         //pass
+      }
+      catch (ActiveMQTransactionRolledBackException e1)
+      {
+         //pass
+      }
+      Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable();
+      assertEquals(10, getMessageCount(bindable));
+
+   }
+
+   @Override
+   protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+   {
+      return getNettyAcceptorTransportConfiguration(live);
+   }
+
+   @Override
+   protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+   {
+      return getNettyConnectorTransportConfiguration(live);
+   }
+
+   private ClientSession createSessionAndQueue() throws Exception
+   {
+      ClientSession session = createSession(sf, false, false);
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+      return session;
+   }
+
+   private ClientSession createXASessionAndQueue() throws Exception
+   {
+      ClientSession session = addClientSession(sf.createSession(true, true, true));
+
+      session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+      return session;
+   }
+
+   protected ClientSession
+   createSession(ClientSessionFactory sf1, boolean autoCommitSends, boolean autoCommitAcks) throws Exception
+   {
+      return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
+   }
+
+   protected ClientSession
+   createSession(ClientSessionFactory sf1, boolean xa, boolean autoCommitSends,   boolean autoCommitAcks) throws Exception
+   {
+      return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
+   }
+
+
+   private void createSessionFactory() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setReconnectAttempts(-1);
+
+      sf = createSessionFactoryAndWaitForTopology(locator, 2);
+   }
+
+   private void createSessionFactory2() throws Exception
+   {
+      sf2 = createSessionFactoryAndWaitForTopology(locator, 2);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java
new file mode 100644
index 0000000..bf97f5a
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.config.BridgeConfiguration;
+import org.apache.activemq.core.config.CoreQueueConfiguration;
+import org.apache.activemq.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.cluster.impl.BridgeImpl;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class BridgeServerLocatorConfigurationTest extends ServiceTestBase
+{
+
+   private static final long BRIDGE_TTL = 1234L;
+   private static final String BRIDGE_NAME = "bridge1";
+
+   protected boolean isNetty()
+   {
+      return false;
+   }
+
+   private String getConnector()
+   {
+      if (isNetty())
+      {
+         return NETTY_CONNECTOR_FACTORY;
+      }
+      return INVM_CONNECTOR_FACTORY;
+   }
+
+   @Test
+   @BMRule(name = "check connection ttl",
+            targetClass = "org.apache.activemq.tests.extras.byteman.BridgeServerLocatorConfigurationTest",
+            targetMethod = "getBridgeTTL(ActiveMQServer, String)", targetLocation = "EXIT",
+            action = "$! = $0.getConfiguredBridge($1).serverLocator.getConnectionTTL();")
+   /**
+    * Checks the connection ttl by using byteman to override the methods on this class to return the value of private variables in the Bridge.
+    * @throws Exception
+    *
+    * The byteman rule on this test overwrites the {@link #getBridgeTTL} method to retrieve the bridge called {@link @BRIDGE_NAME}.
+    * It the overrides the return value to be the value of the connection ttl. Note that the unused String parameter is required to
+    * ensure that byteman populates the $1 variable, otherwise it will not bind correctly.
+    */
+   public void testConnectionTTLOnBridge() throws Exception
+   {
+      Map<String, Object> server0Params = new HashMap<String, Object>();
+      ActiveMQServer serverWithBridge = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+      Map<String, Object> server1Params = new HashMap<String, Object>();
+      if (isNetty())
+      {
+         server1Params.put("port", org.apache.activemq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+      }
+      else
+      {
+         server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+      }
+      ActiveMQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+      ServerLocator locator = null;
+      try
+      {
+         final String testAddress = "testAddress";
+         final String queueName0 = "queue0";
+         final String forwardAddress = "forwardAddress";
+         final String queueName1 = "queue1";
+
+         Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+         TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+         connectors.put(server1tc.getName(), server1tc);
+
+         serverWithBridge.getConfiguration().setConnectorConfigurations(connectors);
+
+         ArrayList<String> staticConnectors = new ArrayList<String>();
+         staticConnectors.add(server1tc.getName());
+
+         BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
+            .setName(BRIDGE_NAME)
+            .setQueueName(queueName0)
+            .setForwardingAddress(forwardAddress)
+            .setConnectionTTL(BRIDGE_TTL)
+            .setRetryInterval(1000)
+            .setReconnectAttempts(0)
+            .setReconnectAttemptsOnSameNode(0)
+            .setConfirmationWindowSize(1024)
+            .setStaticConnectors(staticConnectors);
+
+         List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+         bridgeConfigs.add(bridgeConfiguration);
+         serverWithBridge.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+         CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration()
+            .setAddress(testAddress)
+            .setName(queueName0);
+         List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs0.add(queueConfig0);
+         serverWithBridge.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+         CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
+            .setAddress(forwardAddress)
+            .setName(queueName1);
+         List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+         queueConfigs1.add(queueConfig1);
+         server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+         server1.start();
+         waitForServer(server1);
+
+         serverWithBridge.start();
+         waitForServer(serverWithBridge);
+
+         long bridgeTTL = getBridgeTTL(serverWithBridge, BRIDGE_NAME);
+
+         assertEquals(BRIDGE_TTL, bridgeTTL);
+      }
+      finally
+      {
+         if (locator != null)
+         {
+            locator.close();
+         }
+
+         serverWithBridge.stop();
+
+         server1.stop();
+      }
+   }
+
+   /**
+    * Method for byteman to wrap around and do its magic with to return the ttl from private members
+    * rather than -1
+    * @param bridgeServer
+    * @param bridgeName
+    * @return
+    */
+   private long getBridgeTTL(ActiveMQServer bridgeServer, String bridgeName)
+   {
+      return -1L;
+   }
+
+   /**
+    * Byteman seems to need this method so that it gets back the concrete type not the interface
+    * @param bridgeServer
+    * @return
+    */
+   private BridgeImpl getConfiguredBridge(ActiveMQServer bridgeServer)
+   {
+      return getConfiguredBridge(bridgeServer, BRIDGE_NAME);
+   }
+
+   private BridgeImpl getConfiguredBridge(ActiveMQServer bridgeServer, String bridgeName)
+   {
+      return (BridgeImpl)bridgeServer.getClusterManager().getBridges().get(bridgeName);
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java
new file mode 100644
index 0000000..cd3ee24
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.JournalType;
+import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.apache.activemq.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class ClosingConnectionTest extends ServiceTestBase
+{
+   public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+   private ServerLocator locator;
+
+   private ActiveMQServer server;
+
+   private static MBeanServer mBeanServer;
+
+   private static boolean readyToKill = false;
+
+   protected boolean isNetty()
+   {
+      return true;
+   }
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+      mBeanServer = MBeanServerFactory.createMBeanServer();
+      server = newActiveMQServer();
+      server.getConfiguration().setJournalType(JournalType.NIO);
+      server.getConfiguration().setJMXManagementEnabled(true);
+      server.start();
+      waitForServer(server);
+      locator = createFactory(isNetty());
+      readyToKill = false;
+   }
+
+   public static void killConnection() throws InterruptedException
+   {
+      if (readyToKill)
+      {
+         // We have to kill the connection in a new thread otherwise Netty won't interrupt the current thread
+         Thread closeConnectionThread = new Thread(new Runnable()
+         {
+            @Override
+            public void run()
+            {
+               try
+               {
+                  ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mBeanServer);
+                  serverControl.closeConnectionsForUser("guest");
+                  readyToKill = false;
+               }
+               catch (Exception e)
+               {
+                  e.printStackTrace();
+               }
+            }
+         });
+
+         closeConnectionThread.start();
+
+         try
+         {
+            /* We want to simulate a long-running remoting thread here. If closing the connection in the closeConnectionThread
+             * interrupts this thread then it will cause sleep() to throw and InterruptedException. Therefore we catch
+             * the InterruptedException and re-interrupt the current thread so the interrupt will be passed properly
+             * back to the caller. It's a bit of a hack, but I couldn't find any other way to simulate it.
+             */
+            Thread.sleep(1500);
+         }
+         catch (InterruptedException e)
+         {
+            Thread.currentThread().interrupt();
+         }
+      }
+   }
+
+   /*
+   * Test for https://bugzilla.redhat.com/show_bug.cgi?id=1193085
+   * */
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "rule to kill connection",
+                     targetClass = "org.apache.activemq.core.journal.impl.NIOSequentialFile",
+                     targetMethod = "open(int, boolean)",
+                     targetLocation = "AT INVOKE java.nio.channels.FileChannel.size()",
+                     action = "org.apache.activemq.tests.extras.byteman.ClosingConnectionTest.killConnection();"
+
+                  )
+            }
+      )
+   public void testKillConnection() throws Exception
+   {
+      locator.setBlockOnNonDurableSend(true);
+      locator.setBlockOnDurableSend(true);
+      locator.setBlockOnAcknowledge(true);
+
+      ClientSessionFactory sf = createSessionFactory(locator);
+      ClientSession session = sf.createSession("guest", null, false, true, true, false, 0);
+
+      session.createQueue(ADDRESS, ADDRESS, null, true);
+
+      ClientProducer producer = session.createProducer(ADDRESS);
+
+      ClientMessage message = session.createMessage(true);
+      message.getBodyBuffer().writeBytes(new byte[1024]);
+
+      for (int i = 0; i < 200; i++)
+      {
+         producer.send(message);
+      }
+
+      assertTrue(server.locateQueue(ADDRESS).getPageSubscription().getPagingStore().isPaging());
+
+      readyToKill = true;
+      try
+      {
+         for (int i = 0; i < 8; i++)
+         {
+            producer.send(message);
+         }
+         fail("Sending message here should result in failure.");
+      }
+      catch (Exception e)
+      {
+         IntegrationTestLogger.LOGGER.info("Caught exception: " + e.getMessage());
+      }
+
+      Thread.sleep(1000);
+
+      assertTrue(server.isStarted());
+
+      session.close();
+   }
+
+   private ActiveMQServer newActiveMQServer() throws Exception
+   {
+      ActiveMQServer server = createServer(true, createDefaultConfig(isNetty()), mBeanServer);
+
+      AddressSettings defaultSetting = new AddressSettings();
+      defaultSetting.setPageSizeBytes(10 * 1024);
+      defaultSetting.setMaxSizeBytes(20 * 1024);
+
+      server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+      return server;
+   }
+}
\ No newline at end of file

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java
new file mode 100644
index 0000000..bc9f853
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
+import org.apache.activemq.api.core.Message;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.management.ManagementHelper;
+import org.apache.activemq.api.core.management.CoreNotificationType;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.core.server.group.impl.Response;
+import org.apache.activemq.core.server.management.Notification;
+import org.apache.activemq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+@RunWith(BMUnitRunner.class)
+public class ClusteredGroupingTest extends ClusterTestBase
+{
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "blow-up",
+                     targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+                     targetMethod = "removeGrouping",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause($1);"
+                  ),
+               @BMRule
+                  (
+                     name = "blow-up2",
+                     targetClass = "org.apache.activemq.core.server.group.impl.GroupHandlingAbstract",
+                     targetMethod = "forceRemove",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();"
+                  )
+            }
+      )
+   public void test2serversLocalGoesDown() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1);
+
+      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0);
+
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+
+      startServers(0, 1);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 1, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 1, 1, false);
+      waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+      sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+      latch = new CountDownLatch(1);
+      latch2 = new CountDownLatch(1);
+
+      crashAndWaitForFailure(getServer(1));
+
+      assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+      try
+      {
+         try
+         {
+            sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+         }
+         catch (ActiveMQNonExistentQueueException e)
+         {
+            fail("did not handle removal of queue");
+         }
+      }
+      finally
+      {
+         latch.countDown();
+      }
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "blow-up",
+                     targetClass = "org.apache.activemq.core.server.group.impl.RemoteGroupingHandler",
+                     targetMethod = "onNotification",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
+                  ),
+               @BMRule(name = "blow-up2",
+                       targetClass = "org.apache.activemq.core.server.group.impl.RemoteGroupingHandler",
+                       targetMethod = "remove",
+                       targetLocation = "ENTRY",
+                       action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
+            }
+      )
+   public void test3serversLocalGoesDown() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1,  0, 500, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1,  0, 500, isNetty(), 2, 0, 1);
+
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+      startServers(0, 1, 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 2, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 0, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 1, false);
+      waitForBindings(1, "queues.testaddress", 2, 1, false);
+      waitForBindings(2, "queues.testaddress", 2, 0, false);
+
+      sendWithProperty(1, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+      latch = new CountDownLatch(1);
+      latch2 = new CountDownLatch(1);
+
+      main = Thread.currentThread();
+      crashAndWaitForFailure(getServer(2));
+
+      assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+      try
+      {
+         try
+         {
+            sendWithProperty(1, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+         }
+         catch (ActiveMQNonExistentQueueException e)
+         {
+            fail("did not handle removal of queue");
+         }
+      }
+      finally
+      {
+         latch.countDown();
+      }
+
+      assertHandlersAreSame(getServer(0), getServer(1));
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "blow-up",
+                     targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+                     targetMethod = "onNotification",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
+                  ),
+               @BMRule(name = "blow-up2",
+                       targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+                       targetMethod = "remove",
+                       targetLocation = "ENTRY",
+                       action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
+            }
+      )
+   public void testLocal3serversLocalGoesDown() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1,  0, 500, isNetty(), 0, 1, 2);
+
+      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0, 2);
+
+      setupClusterConnection("cluster2", "queues", false, 1,  0, 500, isNetty(), 2, 0, 1);
+
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+      startServers(0, 1, 2);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 2, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 0, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+      waitForBindings(0, "queues.testaddress", 2, 1, false);
+      waitForBindings(1, "queues.testaddress", 2, 1, false);
+      waitForBindings(2, "queues.testaddress", 2, 0, false);
+
+      sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+      latch = new CountDownLatch(1);
+      latch2 = new CountDownLatch(1);
+
+      main = Thread.currentThread();
+      crashAndWaitForFailure(getServer(2));
+
+      assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+      try
+      {
+         try
+         {
+            sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+         }
+         catch (ActiveMQNonExistentQueueException e)
+         {
+            fail("did not handle removal of queue");
+         }
+      }
+      finally
+      {
+         latch.countDown();
+      }
+
+      assertHandlersAreSame(getServer(0), getServer(1));
+   }
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "blow-up",
+                     targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+                     targetMethod = "onNotification",
+                     targetLocation = "ENTRY",
+                     action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
+                  ),
+               @BMRule(name = "blow-up2",
+                       targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+                       targetMethod = "remove",
+                       targetLocation = "ENTRY",
+                       action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
+            }
+      )
+   public void testLocal4serversLocalGoesDown() throws Exception
+   {
+      setupServer(0, isFileStorage(), isNetty());
+      setupServer(1, isFileStorage(), isNetty());
+      setupServer(2, isFileStorage(), isNetty());
+      setupServer(3, isFileStorage(), isNetty());
+
+      setupClusterConnection("cluster0", "queues", false, 1,  0, 500, isNetty(), 0, 1, 2, 3);
+
+      setupClusterConnection("cluster1", "queues", false, 1,  0, 500, isNetty(), 1, 0, 2, 3);
+
+      setupClusterConnection("cluster2", "queues", false, 1,  0, 500, isNetty(), 2, 0, 1, 3);
+
+      setupClusterConnection("cluster3", "queues", false, 1,  0, 500, isNetty(), 3, 1, 2, 3);
+
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+      setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 3);
+
+      startServers(0, 1, 2, 3);
+
+      setupSessionFactory(0, isNetty());
+      setupSessionFactory(1, isNetty());
+      setupSessionFactory(2, isNetty());
+      setupSessionFactory(3, isNetty());
+
+      createQueue(0, "queues.testaddress", "queue0", null, true);
+      createQueue(1, "queues.testaddress", "queue0", null, true);
+      createQueue(2, "queues.testaddress", "queue0", null, true);
+      createQueue(3, "queues.testaddress", "queue0", null, true);
+
+      addConsumer(0, 2, "queue0", null);
+
+      waitForBindings(0, "queues.testaddress", 1, 0, true);
+      waitForBindings(1, "queues.testaddress", 1, 0, true);
+      waitForBindings(2, "queues.testaddress", 1, 1, true);
+      waitForBindings(3, "queues.testaddress", 1, 0, true);
+
+      waitForBindings(0, "queues.testaddress", 3, 1, false);
+      waitForBindings(1, "queues.testaddress", 3, 1, false);
+      waitForBindings(2, "queues.testaddress", 3, 0, false);
+      waitForBindings(3, "queues.testaddress", 3, 1, false);
+
+      sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+      latch = new CountDownLatch(1);
+      latch2 = new CountDownLatch(1);
+
+      main = Thread.currentThread();
+      crashAndWaitForFailure(getServer(2));
+
+      assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+      try
+      {
+         try
+         {
+            sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+         }
+         catch (ActiveMQNonExistentQueueException e)
+         {
+            fail("did not handle removal of queue");
+         }
+      }
+      finally
+      {
+         latch.countDown();
+      }
+      //now restart server
+      getServer(2).start();
+      waitForBindings(2, "queues.testaddress", 1, 0, true);
+      waitForBindings(2, "queues.testaddress", 3, 0, false);
+      sendWithProperty(3, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+      Thread.sleep(2000);
+      assertHandlersAreSame(getServer(0), getServer(1), getServer(2), getServer(3));
+   }
+
+   private void assertHandlersAreSame(ActiveMQServer server, ActiveMQServer... qServers)
+   {
+      SimpleString id = server.getGroupingHandler().getProposal(new SimpleString("id1.queue0"), false).getClusterName();
+      for (ActiveMQServer qServer : qServers)
+      {
+         Response proposal = qServer.getGroupingHandler().getProposal(new SimpleString("id1.queue0"), false);
+         if (proposal != null)
+         {
+            assertEquals(qServer.getIdentity() + " is incorrect", id, proposal.getChosenClusterName());
+         }
+      }
+   }
+
+   static CountDownLatch latch;
+   static CountDownLatch latch2;
+   static Thread main;
+
+   public static void pause(SimpleString clusterName)
+   {
+      if (clusterName.toString().startsWith("queue0"))
+      {
+         try
+         {
+            latch2.countDown();
+            latch.await();
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();
+         }
+      }
+   }
+
+   public static void pause2(Notification notification)
+   {
+      if (notification.getType() == CoreNotificationType.BINDING_REMOVED)
+      {
+         SimpleString clusterName = notification.getProperties()
+            .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
+         boolean inMain = main == Thread.currentThread();
+         if (clusterName.toString().startsWith("queue0") && !inMain)
+         {
+            try
+            {
+               latch2.countDown();
+               latch.await();
+            }
+            catch (InterruptedException e)
+            {
+               e.printStackTrace();
+            }
+         }
+      }
+   }
+
+   public static void restart2()
+   {
+      latch.countDown();
+   }
+
+
+   @Override
+   @Before
+   public void setUp() throws Exception
+   {
+      super.setUp();
+   }
+
+   @Override
+   @After
+   public void tearDown() throws Exception
+   {
+      closeAllConsumers();
+      closeAllSessionFactories();
+      closeAllServerLocatorsFactories();
+      super.tearDown();
+   }
+
+   public boolean isNetty()
+   {
+      return true;
+   }
+}

http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java
new file mode 100644
index 0000000..34f58f7
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.tests.util.JMSTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * GroupingTest
+ *
+ * @author Andy Taylor
+ */
+@RunWith(BMUnitRunner.class)
+public class GroupingTest extends JMSTestBase
+{
+   private Queue queue;
+   static boolean pause = false;
+
+   @Before
+   @Override
+   public void setUp() throws Exception
+   {
+      super.setUp();
+
+      queue = createQueue("TestQueue");
+   }
+
+   @After
+   @Override
+   public void tearDown() throws Exception
+   {
+      super.tearDown();
+   }
+
+   protected ConnectionFactory getCF() throws Exception
+   {
+      return cf;
+   }
+
+
+   @Test
+   @BMRules
+      (
+         rules =
+            {
+               @BMRule
+                  (
+                     name = "trace clientsessionimpl commit",
+                     targetClass = "org.apache.activemq.core.server.impl.ServerSessionImpl",
+                     targetMethod = "rollback",
+                     targetLocation = "EXIT",
+                     action = "org.apache.activemq.tests.extras.byteman.GroupingTest.pause();"
+                  )
+            }
+      )
+   public void testGroupingRollbackOnClose() throws Exception
+   {
+      Connection sendConnection = null;
+      Connection connection = null;
+      Connection connection2 = null;
+      try
+      {
+         ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();
+         fact.setReconnectAttempts(0);
+         //fact.setConsumerWindowSize(1000);
+         //fact.setTransactionBatchSize(0);
+         connection = fact.createConnection();
+         RemotingConnection rc = server.getRemotingService().getConnections().iterator().next();
+         connection2 = fact.createConnection();
+         sendConnection = fact.createConnection();
+
+         final Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+         Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+         Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+
+         final MessageProducer producer = sendSession.createProducer(queue);
+
+         MessageConsumer consumer1 = session.createConsumer(queue);
+         MessageConsumer consumer2 = session2.createConsumer(queue);
+
+         connection.start();
+         connection2.start();
+
+         final String jmsxgroupID = null;
+
+         Thread t = new Thread(new Runnable()
+         {
+            @Override
+            public void run()
+            {
+
+               try
+               {
+                  for (int j = 0; j < 10000; j++)
+                  {
+                     TextMessage message = sendSession.createTextMessage();
+
+                     message.setText("Message" + j);
+
+                     message.setStringProperty("JMSXGroupID", "foo");
+
+                     producer.send(message);
+                  }
+               }
+               catch (JMSException e)
+               {
+                  e.printStackTrace();
+               }
+            }
+         });
+         t.start();
+
+         //consume 5 msgs from 1st first consumer
+         for (int j = 0; j < 5; j++)
+         {
+            TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            assertEquals(tm.getStringProperty("JMSXGroupID"), "foo");
+         }
+
+         pause = true;
+         rc.fail(new ActiveMQNotConnectedException());
+         pause = false;
+
+         for (int j = 0; j < 10000; j++)
+         {
+            TextMessage tm = (TextMessage) consumer2.receive(5000);
+
+            assertNotNull(tm);
+
+            assertEquals("Message" + j, tm.getText());
+
+            assertEquals(tm.getStringProperty("JMSXGroupID"), "foo");
+         }
+      }
+      finally
+      {
+         if (sendConnection != null)
+         {
+            sendConnection.close();
+         }
+         if (connection2 != null)
+         {
+            connection2.close();
+         }
+      }
+   }
+
+
+   public static void pause()
+   {
+      if (pause)
+      {
+         try
+         {
+            System.out.println("pausing after rollback");
+            Thread.sleep(500);
+         }
+         catch (InterruptedException e)
+         {
+            e.printStackTrace();  //To change body of catch statement use File | Settings | File Templates.
+         }
+         System.out.println("finished pausing after rollback");
+      }
+   }
+}