You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by jb...@apache.org on 2015/03/06 23:30:45 UTC
[12/15] activemq-6 git commit: Refactored the testsuite a bit
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java
----------------------------------------------------------------------
diff --git a/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java b/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java
deleted file mode 100644
index c101c9c..0000000
--- a/tests/concurrent-tests/src/test/java/org/apache/activemq/tests/concurrent/stomp/ConcurrentStompTest.java
+++ /dev/null
@@ -1,160 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq.tests.concurrent.stomp;
-
-import java.io.ByteArrayOutputStream;
-import java.io.IOException;
-import java.io.InputStream;
-import java.io.OutputStream;
-import java.net.Socket;
-import java.nio.charset.StandardCharsets;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.activemq.core.protocol.stomp.Stomp;
-import org.apache.activemq.tests.integration.stomp.StompTestBase;
-import org.junit.Assert;
-import org.junit.Test;
-
-public class ConcurrentStompTest extends StompTestBase
-{
- private Socket stompSocket_2;
-
- private ByteArrayOutputStream inputBuffer_2;
-
- /**
- * Send messages on 1 socket and receives them concurrently on another socket.
- */
- @Test
- public void testSendManyMessages() throws Exception
- {
- try
- {
- String connect = "CONNECT\n" + "login: brianm\n" + "passcode: wombats\n\n" + Stomp.NULL;
-
- sendFrame(connect);
- String connected = receiveFrame(10000);
- Assert.assertTrue(connected.startsWith("CONNECTED"));
-
- stompSocket_2 = createSocket();
- inputBuffer_2 = new ByteArrayOutputStream();
-
- sendFrame(stompSocket_2, connect);
- connected = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
- Assert.assertTrue(connected.startsWith("CONNECTED"));
-
- final int count = 1000;
- final CountDownLatch latch = new CountDownLatch(count);
-
- String subscribe =
- "SUBSCRIBE\n" +
- "destination:" + getQueuePrefix() + getQueueName() + "\n" +
- "ack:auto\n\n" +
- Stomp.NULL;
- sendFrame(stompSocket_2, subscribe);
- Thread.sleep(2000);
-
- new Thread()
- {
- @Override
- public void run()
- {
- int i = 0;
- while (true)
- {
- try
- {
- String frame = receiveFrame(stompSocket_2, inputBuffer_2, 10000);
- Assert.assertTrue(frame.startsWith("MESSAGE"));
- Assert.assertTrue(frame.indexOf("destination:") > 0);
- System.out.println("<<< " + i++);
- latch.countDown();
- }
- catch (Exception e)
- {
- break;
- }
- }
- }
- }.start();
-
- String send = "SEND\n" + "destination:" + getQueuePrefix() + getQueueName() + "\n";
- for (int i = 1; i <= count; i++)
- {
- // Thread.sleep(1);
- System.out.println(">>> " + i);
- sendFrame(send + "count:" + i + "\n\n" + Stomp.NULL);
- }
-
- assertTrue(latch.await(60, TimeUnit.SECONDS));
-
- }
- finally
- {
- stompSocket_2.close();
- inputBuffer_2.close();
- }
-
-
- }
-
- // Implementation methods
- // -------------------------------------------------------------------------
- public void sendFrame(Socket socket, String data) throws Exception
- {
- byte[] bytes = data.getBytes(StandardCharsets.UTF_8);
- OutputStream outputStream = socket.getOutputStream();
- for (byte b : bytes)
- {
- outputStream.write(b);
- }
- outputStream.flush();
- }
-
- public String receiveFrame(Socket socket, ByteArrayOutputStream input, long timeOut) throws Exception
- {
- socket.setSoTimeout((int) timeOut);
- InputStream is = socket.getInputStream();
- int c = 0;
- for (;;)
- {
- c = is.read();
- if (c < 0)
- {
- throw new IOException("socket closed.");
- }
- else if (c == 0)
- {
- c = is.read();
- if (c != '\n')
- {
- byte[] ba = input.toByteArray();
- System.out.println(new String(ba, StandardCharsets.UTF_8));
- }
- Assert.assertEquals("Expecting stomp frame to terminate with \0\n", c, '\n');
- byte[] ba = input.toByteArray();
- input.reset();
- return new String(ba, StandardCharsets.UTF_8);
- }
- else
- {
- input.write(c);
- }
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/pom.xml
----------------------------------------------------------------------
diff --git a/tests/extra-tests/pom.xml b/tests/extra-tests/pom.xml
new file mode 100644
index 0000000..3329bf3
--- /dev/null
+++ b/tests/extra-tests/pom.xml
@@ -0,0 +1,246 @@
+<!--
+ Licensed to the Apache Software Foundation (ASF) under one or more
+ contributor license agreements. See the NOTICE file distributed with
+ this work for additional information regarding copyright ownership.
+ The ASF licenses this file to You under the Apache License, Version 2.0
+ (the "License"); you may not use this file except in compliance with
+ the License. You may obtain a copy of the License at
+
+ http://www.apache.org/licenses/LICENSE-2.0
+
+ Unless required by applicable law or agreed to in writing, software
+ distributed under the License is distributed on an "AS IS" BASIS,
+ WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ See the License for the specific language governing permissions and
+ limitations under the License.
+-->
+
+<!-- This test folder contains tests that are not part of the regular testsuite
+ because they use optional libraries such as LGPL or private ones.
+ They are optional and will validate extra functionality available through Service Integration
+ Example: Transaction Manager -->
+<project xmlns="http://maven.apache.org/POM/4.0.0" xmlns:xsi="http://www.w3.org/2001/XMLSchema-instance"
+ xsi:schemaLocation="http://maven.apache.org/POM/4.0.0 http://maven.apache.org/maven-v4_0_0.xsd">
+ <modelVersion>4.0.0</modelVersion>
+ <parent>
+ <groupId>org.apache.activemq.tests</groupId>
+ <artifactId>activemq-tests-pom</artifactId>
+ <version>6.0.1-SNAPSHOT</version>
+ </parent>
+
+ <artifactId>extra-tests</artifactId>
+ <packaging>jar</packaging>
+ <name>ActiveMQ6 Extra Tests</name>
+
+ <properties>
+ <tools.jar>${java.home}/../lib/tools.jar</tools.jar>
+ <byteman.version>2.2.0</byteman.version>
+ <activemq.basedir>${project.basedir}/../..</activemq.basedir>
+ </properties>
+
+ <dependencies>
+ <dependency>
+ <groupId>io.netty</groupId>
+ <artifactId>netty-all</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.byteman</groupId>
+ <artifactId>byteman</artifactId>
+ <version>${byteman.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.byteman</groupId>
+ <artifactId>byteman-submit</artifactId>
+ <scope>test</scope>
+ <version>${byteman.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.byteman</groupId>
+ <artifactId>byteman-install</artifactId>
+ <scope>test</scope>
+ <version>${byteman.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.byteman</groupId>
+ <artifactId>byteman-bmunit</artifactId>
+ <scope>test</scope>
+ <version>${byteman.version}</version>
+ <exclusions>
+ <exclusion>
+ <groupId>org.testng</groupId>
+ <artifactId>testng</artifactId>
+ </exclusion>
+ </exclusions>
+ </dependency>
+ <dependency>
+ <groupId>com.sun</groupId>
+ <artifactId>tools</artifactId>
+ <version>1.7</version>
+ <scope>system</scope>
+ <systemPath>${tools.jar}</systemPath>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-core-client</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-server</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq.tests</groupId>
+ <artifactId>integration-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq.tests</groupId>
+ <artifactId>unit-tests</artifactId>
+ <version>${project.version}</version>
+ <scope>test</scope>
+ <type>test-jar</type>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-jms-client</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-jms-server</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-ra</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.activemq</groupId>
+ <artifactId>activemq-bootstrap</artifactId>
+ <version>${project.version}</version>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-j2ee-connector_1.5_spec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>commons-logging</groupId>
+ <artifactId>commons-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-ejb_3.0_spec</artifactId>
+ </dependency>
+ <!--this specifically for the JMS Bridge -->
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jta_1.1_spec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.components</groupId>
+ <artifactId>geronimo-jaspi</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_2.0_spec</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.logging</groupId>
+ <artifactId>jboss-logging-processor</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.logging</groupId>
+ <artifactId>jboss-logging</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss.logmanager</groupId>
+ <artifactId>jboss-logmanager</artifactId>
+ </dependency>
+ <dependency>
+ <groupId>junit</groupId>
+ <artifactId>junit</artifactId>
+ </dependency>
+
+
+
+ <!-- Needed for JMS Bridge Tests -->
+ <dependency>
+ <groupId>org.jboss.jbossts.jts</groupId>
+ <artifactId>jbossjts-jacorb</artifactId>
+ <version>4.17.13.Final</version>
+ </dependency>
+ <dependency>
+ <groupId>org.jboss</groupId>
+ <artifactId>jboss-transaction-spi</artifactId>
+ <version>7.1.0.Final</version>
+ </dependency>
+
+ </dependencies>
+
+ <build>
+ <plugins>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-jar-plugin</artifactId>
+ <executions>
+ <execution>
+ <phase>test</phase>
+ <goals>
+ <goal>test-jar</goal>
+ </goals>
+ </execution>
+ </executions>
+ </plugin>
+ <plugin>
+ <groupId>org.apache.maven.plugins</groupId>
+ <artifactId>maven-surefire-plugin</artifactId>
+ <configuration>
+ <skipTests>${skipExtraTests}</skipTests>
+ <!-- ensure we don't inherit a byteman jar form any env settings -->
+ <environmentVariables>
+ <BYTEMAN_HOME></BYTEMAN_HOME>
+ </environmentVariables>
+ <systemProperties>
+ <!--
+ <property>
+ <name>org.jboss.byteman.home</name>
+ <value></value>
+ </property>
+ <property>
+ <name>org.jboss.byteman.verbose</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>org.jboss.byteman.contrib.bmunit.verbose</name>
+ <value>true</value>
+ </property>
+ <property>
+ <name>org.jboss.byteman.dump.generated.classes</name>
+ <value></value>
+ </property>
+ -->
+ </systemProperties>
+ <!-- make sure maven puts the byteman jar in the classpath rather than in a manifest jar -->
+ <useManifestOnlyJar>false</useManifestOnlyJar>
+ <forkMode>once</forkMode>
+ <!--
+ <debugForkedProcess>true</debugForkedProcess>
+ -->
+ <parallel>false</parallel>
+ <!--<argLine>${activemq-surefire-argline} -Dorg.jboss.byteman.verbose -Dorg.jboss.byteman.contrib.bmunit.verbose</argLine>-->
+ <!-- '-noverify' is needed here to fix VerifyErrors on ScaleDownFailoverTest and ScaleDownFailureTest (and their subclasses). I got the tip from https://issues.jboss.org/browse/BYTEMAN-248. -->
+ <argLine>${activemq-surefire-argline} -noverify</argLine>
+ </configuration>
+ </plugin>
+ </plugins>
+ </build>
+
+</project>
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java
new file mode 100644
index 0000000..c97d01a
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/ExtrasTestLogger.java
@@ -0,0 +1,34 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras;
+
+import org.jboss.logging.BasicLogger;
+import org.jboss.logging.Logger;
+import org.jboss.logging.annotations.MessageLogger;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 4/18/12
+ */
+@MessageLogger(projectCode = "HQTEST")
+public interface ExtrasTestLogger extends BasicLogger
+{
+ /**
+ * The integration test logger.
+ */
+ ExtrasTestLogger LOGGER = Logger.getMessageLogger(ExtrasTestLogger.class, ExtrasTestLogger.class.getPackage().getName());
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java
new file mode 100644
index 0000000..0914295
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ActiveMQMessageHandlerTest.java
@@ -0,0 +1,428 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import com.arjuna.ats.arjuna.coordinator.TransactionReaper;
+import com.arjuna.ats.arjuna.coordinator.TxControl;
+import com.arjuna.ats.internal.jta.transaction.arjunacore.TransactionManagerImple;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.core.postoffice.Binding;
+import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.ra.ActiveMQResourceAdapter;
+import org.apache.activemq.ra.inflow.ActiveMQActivationSpec;
+import org.apache.activemq.tests.integration.ra.ActiveMQRATestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+import javax.jms.Message;
+import javax.resource.ResourceException;
+import javax.transaction.SystemException;
+import javax.transaction.Transaction;
+import javax.transaction.TransactionManager;
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+import java.lang.reflect.Method;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * Created May 20, 2010
+ */
+@RunWith(BMUnitRunner.class)
+public class ActiveMQMessageHandlerTest extends ActiveMQRATestBase
+{
+
+ protected boolean usePersistence()
+ {
+ return true;
+ }
+
+
+ @Override
+ public boolean useSecurity()
+ {
+ return false;
+ }
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "interrupt",
+ targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
+ targetMethod = "xaEnd",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ActiveMQMessageHandlerTest.interrupt();"
+ )
+ }
+ )
+ public void testSimpleMessageReceivedOnQueue() throws Exception
+ {
+ ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+ resourceAdapter = qResourceAdapter;
+
+ MyBootstrapContext ctx = new MyBootstrapContext();
+
+ qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
+ qResourceAdapter.start(ctx);
+
+ ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+ spec.setMaxSession(1);
+ spec.setCallTimeout(1000L);
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ XADummyEndpoint endpoint = new XADummyEndpoint(latch, false);
+
+ DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
+
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+ ClientSession session = locator.createSessionFactory().createSession();
+
+ ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+
+ ClientMessage message = session.createMessage(true);
+
+ message.getBodyBuffer().writeString("teststring");
+
+ clientProducer.send(message);
+
+ session.close();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+
+ qResourceAdapter.stop();
+
+ Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED));
+ assertEquals(1, getMessageCount(((Queue) binding.getBindable())));
+
+ server.stop();
+ server.start();
+
+ ClientSessionFactory factory = locator.createSessionFactory();
+ session = factory.createSession(true, true);
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(MDBQUEUEPREFIXED);
+ assertNotNull(consumer.receive(5000));
+ session.close();
+ }
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "interrupt",
+ targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
+ targetMethod = "xaEnd",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ActiveMQMessageHandlerTest.interrupt();"
+ )
+ }
+ )
+ public void testSimpleMessageReceivedOnQueueTwoPhase() throws Exception
+ {
+ ActiveMQResourceAdapter qResourceAdapter = newResourceAdapter();
+ resourceAdapter = qResourceAdapter;
+
+ MyBootstrapContext ctx = new MyBootstrapContext();
+
+ qResourceAdapter.setConnectorClassName(NETTY_CONNECTOR_FACTORY);
+ qResourceAdapter.start(ctx);
+
+ ActiveMQActivationSpec spec = new ActiveMQActivationSpec();
+ spec.setMaxSession(1);
+ spec.setCallTimeout(1000L);
+ spec.setResourceAdapter(qResourceAdapter);
+ spec.setUseJNDI(false);
+ spec.setDestinationType("javax.jms.Queue");
+ spec.setDestination(MDBQUEUE);
+
+ CountDownLatch latch = new CountDownLatch(1);
+
+ XADummyEndpoint endpoint = new XADummyEndpoint(latch, true);
+
+ DummyMessageEndpointFactory endpointFactory = new DummyMessageEndpointFactory(endpoint, true);
+
+ qResourceAdapter.endpointActivation(endpointFactory, spec);
+
+ ClientSession session = locator.createSessionFactory().createSession();
+
+ ClientProducer clientProducer = session.createProducer(MDBQUEUEPREFIXED);
+
+ ClientMessage message = session.createMessage(true);
+
+ message.getBodyBuffer().writeString("teststring");
+
+ clientProducer.send(message);
+
+ session.close();
+
+ latch.await(5, TimeUnit.SECONDS);
+
+ assertNotNull(endpoint.lastMessage);
+ assertEquals(endpoint.lastMessage.getCoreMessage().getBodyBuffer().readString(), "teststring");
+
+ qResourceAdapter.endpointDeactivation(endpointFactory, spec);
+
+ qResourceAdapter.stop();
+
+ Binding binding = server.getPostOffice().getBinding(SimpleString.toSimpleString(MDBQUEUEPREFIXED));
+ assertEquals(1, getMessageCount(((Queue) binding.getBindable())));
+
+
+ server.stop();
+ server.start();
+
+ ClientSessionFactory factory = locator.createSessionFactory();
+ session = factory.createSession(true, true);
+
+ session.start();
+ ClientConsumer consumer = session.createConsumer(MDBQUEUEPREFIXED);
+ assertNotNull(consumer.receive(5000));
+ session.close();
+ }
+
+ static volatile ActiveMQResourceAdapter resourceAdapter;
+ static boolean resourceAdapterStopped = false;
+ public static void interrupt() throws InterruptedException
+ {
+ //Thread.currentThread().interrupt();
+ if (!resourceAdapterStopped)
+ {
+ resourceAdapter.stop();
+ resourceAdapterStopped = true;
+ throw new InterruptedException("foo");
+ }
+ //Thread.currentThread().interrupt();
+ }
+
+ Transaction currentTX;
+
+ public class XADummyEndpoint extends DummyMessageEndpoint
+ {
+ final boolean twoPhase;
+ ClientSession session;
+ int afterDeliveryCounts = 0;
+
+ public XADummyEndpoint(CountDownLatch latch, boolean twoPhase) throws SystemException
+ {
+ super(latch);
+ this.twoPhase = twoPhase;
+ try
+ {
+ session = locator.createSessionFactory().createSession(true, false, false);
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e);
+ }
+ }
+
+ @Override
+ public void beforeDelivery(Method method) throws NoSuchMethodException, ResourceException
+ {
+ super.beforeDelivery(method);
+ try
+ {
+ DummyTMLocator.tm.begin();
+ currentTX = DummyTMLocator.tm.getTransaction();
+ currentTX.enlistResource(xaResource);
+ if (twoPhase)
+ {
+ currentTX.enlistResource(new DummyXAResource());
+ }
+ }
+ catch (Throwable e)
+ {
+ throw new RuntimeException(e.getMessage(), e);
+ }
+ }
+
+ public void onMessage(Message message)
+ {
+ super.onMessage(message);
+// try
+// {
+// lastMessage = (ActiveMQMessage) message;
+// currentTX.enlistResource(session);
+// ClientProducer prod = session.createProducer()
+// }
+// catch (Exception e)
+// {
+// e.printStackTrace();
+// }
+
+
+ }
+
+
+
+ @Override
+ public void afterDelivery() throws ResourceException
+ {
+ afterDeliveryCounts++;
+ try
+ {
+ currentTX.commit();
+ }
+ catch (Throwable e)
+ {
+ //its unsure as to whether the EJB/JCA layer will handle this or throw it to us,
+ // either way we don't do anything else so its fine just to throw.
+ // NB this will only happen with 2 phase commit
+ throw new RuntimeException(e);
+ }
+ super.afterDelivery();
+ }
+ }
+
+ @Before
+ public void setUp() throws Exception
+ {
+ resourceAdapter = null;
+ resourceAdapterStopped = false;
+ super.setUp();
+ DummyTMLocator.startTM();
+ }
+
+
+ @After
+ public void tearDown() throws Exception
+ {
+ DummyTMLocator.stopTM();
+ super.tearDown();
+ }
+
+ public static class DummyTMLocator
+ {
+ public static TransactionManagerImple tm;
+ public static void stopTM()
+ {
+ try
+ {
+ TransactionReaper.terminate(true);
+ TxControl.disable(true);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ tm = null;
+ }
+ public static void startTM()
+ {
+ tm = new TransactionManagerImple();
+ TxControl.enable();
+ }
+ public TransactionManager getTM()
+ {
+ return tm;
+ }
+ }
+
+ static class DummyXAResource implements XAResource
+ {
+ @Override
+ public void commit(Xid xid, boolean b) throws XAException
+ {
+
+ }
+
+ @Override
+ public void end(Xid xid, int i) throws XAException
+ {
+
+ }
+
+ @Override
+ public void forget(Xid xid) throws XAException
+ {
+
+ }
+
+ @Override
+ public int getTransactionTimeout() throws XAException
+ {
+ return 0;
+ }
+
+ @Override
+ public boolean isSameRM(XAResource xaResource) throws XAException
+ {
+ return false;
+ }
+
+ @Override
+ public int prepare(Xid xid) throws XAException
+ {
+ return 0;
+ }
+
+ @Override
+ public Xid[] recover(int i) throws XAException
+ {
+ return new Xid[0];
+ }
+
+ @Override
+ public void rollback(Xid xid) throws XAException
+ {
+
+ }
+
+ @Override
+ public boolean setTransactionTimeout(int i) throws XAException
+ {
+ return false;
+ }
+
+ @Override
+ public void start(Xid xid, int i) throws XAException
+ {
+
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java
new file mode 100644
index 0000000..fb91994
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BMFailoverTest.java
@@ -0,0 +1,451 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import javax.transaction.xa.XAException;
+import javax.transaction.xa.XAResource;
+import javax.transaction.xa.Xid;
+
+import org.apache.activemq.api.core.ActiveMQTransactionOutcomeUnknownException;
+import org.apache.activemq.api.core.ActiveMQTransactionRolledBackException;
+import org.apache.activemq.api.core.ActiveMQUnBlockedException;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ClientConsumer;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.client.ActiveMQClientMessageBundle;
+import org.apache.activemq.core.client.impl.ClientMessageImpl;
+import org.apache.activemq.core.client.impl.ClientSessionFactoryInternal;
+import org.apache.activemq.core.client.impl.ClientSessionInternal;
+import org.apache.activemq.core.postoffice.Binding;
+import org.apache.activemq.core.server.Queue;
+import org.apache.activemq.core.transaction.impl.XidImpl;
+import org.apache.activemq.tests.integration.cluster.failover.FailoverTestBase;
+import org.apache.activemq.tests.integration.cluster.util.TestableServer;
+import org.apache.activemq.tests.util.RandomUtil;
+import org.apache.activemq.utils.UUIDGenerator;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Assert;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ * 4/18/13
+ */
+@RunWith(BMUnitRunner.class)
+public class BMFailoverTest extends FailoverTestBase
+{
+ private ServerLocator locator;
+ private ClientSessionFactoryInternal sf;
+ private ClientSessionFactoryInternal sf2;
+ public static TestableServer serverToStop;
+
+ @Before
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ stopped = false;
+ locator = getServerLocator();
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ private static boolean stopped = false;
+ public static void stopAndThrow() throws ActiveMQUnBlockedException
+ {
+ if (!stopped)
+ {
+ try
+ {
+ serverToStop.getServer().stop(true);
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ try
+ {
+ Thread.sleep(2000);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ stopped = true;
+ throw ActiveMQClientMessageBundle.BUNDLE.unblockingACall(null);
+ }
+ }
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "trace ActiveMQSessionContext xaEnd",
+ targetClass = "org.apache.activemq.core.protocol.core.impl.ActiveMQSessionContext",
+ targetMethod = "xaEnd",
+ targetLocation = "AT EXIT",
+ action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.stopAndThrow()"
+ )
+ }
+ )
+ //https://bugzilla.redhat.com/show_bug.cgi?id=1152410
+ public void testFailOnEndAndRetry() throws Exception
+ {
+ serverToStop = liveServer;
+
+ createSessionFactory();
+
+ ClientSession session = createSession(sf, true, false, false);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(FailoverTestBase.ADDRESS);
+
+ for (int i = 0; i < 100; i++)
+ {
+ producer.send(createMessage(session, i, true));
+ }
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+
+ Xid xid = RandomUtil.randomXid();
+
+ session.start(xid, XAResource.TMNOFLAGS);
+ session.start();
+ // Receive MSGs but don't ack!
+ for (int i = 0; i < 100; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ Assert.assertNotNull(message);
+
+ assertMessageBody(i, message);
+
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ }
+ try
+ {
+ //top level prepare
+ session.end(xid, XAResource.TMSUCCESS);
+ }
+ catch (XAException e)
+ {
+ try
+ {
+ //top level abort
+ session.end(xid, XAResource.TMFAIL);
+ }
+ catch (XAException e1)
+ {
+ try
+ {
+ //rollback
+ session.rollback(xid);
+ }
+ catch (XAException e2)
+ {
+ }
+ }
+ }
+ xid = RandomUtil.randomXid();
+ session.start(xid, XAResource.TMNOFLAGS);
+
+ for (int i = 0; i < 50; i++)
+ {
+ ClientMessage message = consumer.receive(1000);
+
+ Assert.assertNotNull(message);
+
+ assertMessageBody(i, message);
+
+ Assert.assertEquals(i, message.getIntProperty("counter").intValue());
+ }
+ session.end(xid, XAResource.TMSUCCESS);
+ session.commit(xid, true);
+ }
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "trace clientsessionimpl commit",
+ targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
+ targetMethod = "start(javax.transaction.xa.Xid, int)",
+ targetLocation = "AT EXIT",
+ action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
+ )
+ }
+ )
+ public void testFailoverOnCommit2() throws Exception
+ {
+ serverToStop = liveServer;
+ locator = getServerLocator();
+ locator.setFailoverOnInitialConnection(true);
+ SimpleString inQueue = new SimpleString("inQueue");
+ SimpleString outQueue = new SimpleString("outQueue");
+ createSessionFactory();
+ createSessionFactory2();
+
+ // closeable will take care of closing it
+ try (ClientSession session = sf.createSession(false, true, true);
+ ClientProducer sendInitialProducer = session.createProducer();)
+ {
+ session.createQueue(inQueue, inQueue, null, true);
+ session.createQueue(outQueue, outQueue, null, true);
+ sendInitialProducer.send(inQueue, createMessage(session, 0, true));
+ }
+
+ ClientSession xaSessionRec = addClientSession(sf.createSession(true, false, false));
+
+ ClientConsumer consumer = addClientConsumer(xaSessionRec.createConsumer(inQueue));
+
+ byte[] globalTransactionId = UUIDGenerator.getInstance().generateStringUUID().getBytes();
+ Xid xidRec = new XidImpl("xa2".getBytes(), 1, globalTransactionId);
+
+ xaSessionRec.start();
+
+ xaSessionRec.getXAResource().start(xidRec, XAResource.TMNOFLAGS);
+
+ //failover is now occurring, receive, ack and end will be called whilst this is happening.
+
+ ClientMessageImpl m = (ClientMessageImpl) consumer.receive(5000);
+
+ assertNotNull(m);
+
+ System.out.println("********************" + m.getIntProperty("counter"));
+ //the mdb would ack the message before calling onMessage()
+ m.acknowledge();
+
+ try
+ {
+ //this may fail but thats ok, it depends on the race and when failover actually happens
+ xaSessionRec.end(xidRec, XAResource.TMSUCCESS);
+ }
+ catch (XAException ignore)
+ {
+ }
+
+ //we always reset the client on the RA
+ ((ClientSessionInternal) xaSessionRec).resetIfNeeded();
+
+ // closeable will take care of closing it
+ try (ClientSession session = sf.createSession(false, true, true);
+ ClientProducer sendInitialProducer = session.createProducer();)
+ {
+ sendInitialProducer.send(inQueue, createMessage(session, 0, true));
+ }
+
+ //now receive and send a message successfully
+
+ globalTransactionId = UUIDGenerator.getInstance().generateStringUUID().getBytes();
+ xidRec = new XidImpl("xa4".getBytes(), 1, globalTransactionId);
+ xaSessionRec.getXAResource().start(xidRec, XAResource.TMNOFLAGS);
+
+ Binding binding = backupServer.getServer().getPostOffice().getBinding(inQueue);
+ Queue inQ = (Queue) binding.getBindable();
+
+ m = (ClientMessageImpl) consumer.receive(5000);
+
+ assertNotNull(m);
+ //the mdb would ack the message before calling onMessage()
+ m.acknowledge();
+
+ System.out.println("********************" + m.getIntProperty("counter"));
+
+ xaSessionRec.getXAResource().end(xidRec, XAResource.TMSUCCESS);
+ xaSessionRec.getXAResource().prepare(xidRec);
+ xaSessionRec.getXAResource().commit(xidRec, false);
+
+
+ //let's close the consumer so anything pending is handled
+ consumer.close();
+
+ assertEquals(1, getMessageCount(inQ));
+ }
+
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "trace clientsessionimpl commit",
+ targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
+ targetMethod = "commit",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
+ )
+ }
+ )
+ public void testFailoverOnCommit() throws Exception
+ {
+ serverToStop = liveServer;
+ locator = getServerLocator();
+ locator.setFailoverOnInitialConnection(true);
+ createSessionFactory();
+ ClientSession session = createSessionAndQueue();
+
+ ClientProducer producer = addClientProducer(session.createProducer(FailoverTestBase.ADDRESS));
+
+ sendMessages(session, producer, 10);
+ try
+ {
+ session.commit();
+ fail("should have thrown an exception");
+ }
+ catch (ActiveMQTransactionOutcomeUnknownException e)
+ {
+ //pass
+ }
+ sendMessages(session, producer, 10);
+ session.commit();
+ Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable();
+ assertEquals(10, getMessageCount(bindable));
+ }
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "trace clientsessionimpl commit",
+ targetClass = "org.apache.activemq.core.client.impl.ClientSessionImpl",
+ targetMethod = "commit",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.BMFailoverTest.serverToStop.getServer().stop(true)"
+ )
+ }
+ )
+ public void testFailoverOnReceiveCommit() throws Exception
+ {
+ serverToStop = liveServer;
+ locator = getServerLocator();
+ locator.setFailoverOnInitialConnection(true);
+ createSessionFactory();
+ ClientSession session = createSessionAndQueue();
+
+ ClientSession sendSession = createSession(sf, true, true);
+
+ ClientProducer producer = addClientProducer(sendSession.createProducer(FailoverTestBase.ADDRESS));
+
+ sendMessages(sendSession, producer, 10);
+
+ ClientConsumer consumer = session.createConsumer(FailoverTestBase.ADDRESS);
+ session.start();
+ for (int i = 0; i < 10; i++)
+ {
+ ClientMessage m = consumer.receive(500);
+ assertNotNull(m);
+ m.acknowledge();
+ }
+ try
+ {
+ session.commit();
+ fail("should have thrown an exception");
+ }
+ catch (ActiveMQTransactionOutcomeUnknownException e)
+ {
+ //pass
+ }
+ catch (ActiveMQTransactionRolledBackException e1)
+ {
+ //pass
+ }
+ Queue bindable = (Queue) backupServer.getServer().getPostOffice().getBinding(FailoverTestBase.ADDRESS).getBindable();
+ assertEquals(10, getMessageCount(bindable));
+
+ }
+
+ @Override
+ protected TransportConfiguration getAcceptorTransportConfiguration(final boolean live)
+ {
+ return getNettyAcceptorTransportConfiguration(live);
+ }
+
+ @Override
+ protected TransportConfiguration getConnectorTransportConfiguration(final boolean live)
+ {
+ return getNettyConnectorTransportConfiguration(live);
+ }
+
+ private ClientSession createSessionAndQueue() throws Exception
+ {
+ ClientSession session = createSession(sf, false, false);
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+ return session;
+ }
+
+ private ClientSession createXASessionAndQueue() throws Exception
+ {
+ ClientSession session = addClientSession(sf.createSession(true, true, true));
+
+ session.createQueue(FailoverTestBase.ADDRESS, FailoverTestBase.ADDRESS, null, true);
+ return session;
+ }
+
+ protected ClientSession
+ createSession(ClientSessionFactory sf1, boolean autoCommitSends, boolean autoCommitAcks) throws Exception
+ {
+ return addClientSession(sf1.createSession(autoCommitSends, autoCommitAcks));
+ }
+
+ protected ClientSession
+ createSession(ClientSessionFactory sf1, boolean xa, boolean autoCommitSends, boolean autoCommitAcks) throws Exception
+ {
+ return addClientSession(sf1.createSession(xa, autoCommitSends, autoCommitAcks));
+ }
+
+
+ private void createSessionFactory() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setReconnectAttempts(-1);
+
+ sf = createSessionFactoryAndWaitForTopology(locator, 2);
+ }
+
+ private void createSessionFactory2() throws Exception
+ {
+ sf2 = createSessionFactoryAndWaitForTopology(locator, 2);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java
new file mode 100644
index 0000000..bf97f5a
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/BridgeServerLocatorConfigurationTest.java
@@ -0,0 +1,181 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.ArrayList;
+import java.util.HashMap;
+import java.util.List;
+import java.util.Map;
+
+import org.apache.activemq.api.core.TransportConfiguration;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.core.config.BridgeConfiguration;
+import org.apache.activemq.core.config.CoreQueueConfiguration;
+import org.apache.activemq.core.remoting.impl.invm.TransportConstants;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.cluster.impl.BridgeImpl;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class BridgeServerLocatorConfigurationTest extends ServiceTestBase
+{
+
+ private static final long BRIDGE_TTL = 1234L;
+ private static final String BRIDGE_NAME = "bridge1";
+
+ protected boolean isNetty()
+ {
+ return false;
+ }
+
+ private String getConnector()
+ {
+ if (isNetty())
+ {
+ return NETTY_CONNECTOR_FACTORY;
+ }
+ return INVM_CONNECTOR_FACTORY;
+ }
+
+ @Test
+ @BMRule(name = "check connection ttl",
+ targetClass = "org.apache.activemq.tests.extras.byteman.BridgeServerLocatorConfigurationTest",
+ targetMethod = "getBridgeTTL(ActiveMQServer, String)", targetLocation = "EXIT",
+ action = "$! = $0.getConfiguredBridge($1).serverLocator.getConnectionTTL();")
+ /**
+ * Checks the connection ttl by using byteman to override the methods on this class to return the value of private variables in the Bridge.
+ * @throws Exception
+ *
+ * The byteman rule on this test overwrites the {@link #getBridgeTTL} method to retrieve the bridge called {@link @BRIDGE_NAME}.
+ * It the overrides the return value to be the value of the connection ttl. Note that the unused String parameter is required to
+ * ensure that byteman populates the $1 variable, otherwise it will not bind correctly.
+ */
+ public void testConnectionTTLOnBridge() throws Exception
+ {
+ Map<String, Object> server0Params = new HashMap<String, Object>();
+ ActiveMQServer serverWithBridge = createClusteredServerWithParams(isNetty(), 0, true, server0Params);
+
+ Map<String, Object> server1Params = new HashMap<String, Object>();
+ if (isNetty())
+ {
+ server1Params.put("port", org.apache.activemq.core.remoting.impl.netty.TransportConstants.DEFAULT_PORT + 1);
+ }
+ else
+ {
+ server1Params.put(TransportConstants.SERVER_ID_PROP_NAME, 1);
+ }
+ ActiveMQServer server1 = createClusteredServerWithParams(isNetty(), 1, true, server1Params);
+ ServerLocator locator = null;
+ try
+ {
+ final String testAddress = "testAddress";
+ final String queueName0 = "queue0";
+ final String forwardAddress = "forwardAddress";
+ final String queueName1 = "queue1";
+
+ Map<String, TransportConfiguration> connectors = new HashMap<String, TransportConfiguration>();
+ TransportConfiguration server1tc = new TransportConfiguration(getConnector(), server1Params);
+ connectors.put(server1tc.getName(), server1tc);
+
+ serverWithBridge.getConfiguration().setConnectorConfigurations(connectors);
+
+ ArrayList<String> staticConnectors = new ArrayList<String>();
+ staticConnectors.add(server1tc.getName());
+
+ BridgeConfiguration bridgeConfiguration = new BridgeConfiguration()
+ .setName(BRIDGE_NAME)
+ .setQueueName(queueName0)
+ .setForwardingAddress(forwardAddress)
+ .setConnectionTTL(BRIDGE_TTL)
+ .setRetryInterval(1000)
+ .setReconnectAttempts(0)
+ .setReconnectAttemptsOnSameNode(0)
+ .setConfirmationWindowSize(1024)
+ .setStaticConnectors(staticConnectors);
+
+ List<BridgeConfiguration> bridgeConfigs = new ArrayList<BridgeConfiguration>();
+ bridgeConfigs.add(bridgeConfiguration);
+ serverWithBridge.getConfiguration().setBridgeConfigurations(bridgeConfigs);
+
+ CoreQueueConfiguration queueConfig0 = new CoreQueueConfiguration()
+ .setAddress(testAddress)
+ .setName(queueName0);
+ List<CoreQueueConfiguration> queueConfigs0 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs0.add(queueConfig0);
+ serverWithBridge.getConfiguration().setQueueConfigurations(queueConfigs0);
+
+ CoreQueueConfiguration queueConfig1 = new CoreQueueConfiguration()
+ .setAddress(forwardAddress)
+ .setName(queueName1);
+ List<CoreQueueConfiguration> queueConfigs1 = new ArrayList<CoreQueueConfiguration>();
+ queueConfigs1.add(queueConfig1);
+ server1.getConfiguration().setQueueConfigurations(queueConfigs1);
+
+ server1.start();
+ waitForServer(server1);
+
+ serverWithBridge.start();
+ waitForServer(serverWithBridge);
+
+ long bridgeTTL = getBridgeTTL(serverWithBridge, BRIDGE_NAME);
+
+ assertEquals(BRIDGE_TTL, bridgeTTL);
+ }
+ finally
+ {
+ if (locator != null)
+ {
+ locator.close();
+ }
+
+ serverWithBridge.stop();
+
+ server1.stop();
+ }
+ }
+
+ /**
+ * Method for byteman to wrap around and do its magic with to return the ttl from private members
+ * rather than -1
+ * @param bridgeServer
+ * @param bridgeName
+ * @return
+ */
+ private long getBridgeTTL(ActiveMQServer bridgeServer, String bridgeName)
+ {
+ return -1L;
+ }
+
+ /**
+ * Byteman seems to need this method so that it gets back the concrete type not the interface
+ * @param bridgeServer
+ * @return
+ */
+ private BridgeImpl getConfiguredBridge(ActiveMQServer bridgeServer)
+ {
+ return getConfiguredBridge(bridgeServer, BRIDGE_NAME);
+ }
+
+ private BridgeImpl getConfiguredBridge(ActiveMQServer bridgeServer, String bridgeName)
+ {
+ return (BridgeImpl)bridgeServer.getClusterManager().getBridges().get(bridgeName);
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java
new file mode 100644
index 0000000..cd3ee24
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClosingConnectionTest.java
@@ -0,0 +1,192 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerFactory;
+
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.client.ClientMessage;
+import org.apache.activemq.api.core.client.ClientProducer;
+import org.apache.activemq.api.core.client.ClientSession;
+import org.apache.activemq.api.core.client.ClientSessionFactory;
+import org.apache.activemq.api.core.client.ServerLocator;
+import org.apache.activemq.api.core.management.ActiveMQServerControl;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.JournalType;
+import org.apache.activemq.core.settings.impl.AddressSettings;
+import org.apache.activemq.tests.integration.IntegrationTestLogger;
+import org.apache.activemq.tests.integration.management.ManagementControlHelper;
+import org.apache.activemq.tests.util.ServiceTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+@RunWith(BMUnitRunner.class)
+public class ClosingConnectionTest extends ServiceTestBase
+{
+ public static final SimpleString ADDRESS = new SimpleString("SimpleAddress");
+
+ private ServerLocator locator;
+
+ private ActiveMQServer server;
+
+ private static MBeanServer mBeanServer;
+
+ private static boolean readyToKill = false;
+
+ protected boolean isNetty()
+ {
+ return true;
+ }
+
+
+ @Override
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ mBeanServer = MBeanServerFactory.createMBeanServer();
+ server = newActiveMQServer();
+ server.getConfiguration().setJournalType(JournalType.NIO);
+ server.getConfiguration().setJMXManagementEnabled(true);
+ server.start();
+ waitForServer(server);
+ locator = createFactory(isNetty());
+ readyToKill = false;
+ }
+
+ public static void killConnection() throws InterruptedException
+ {
+ if (readyToKill)
+ {
+ // We have to kill the connection in a new thread otherwise Netty won't interrupt the current thread
+ Thread closeConnectionThread = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+ try
+ {
+ ActiveMQServerControl serverControl = ManagementControlHelper.createActiveMQServerControl(mBeanServer);
+ serverControl.closeConnectionsForUser("guest");
+ readyToKill = false;
+ }
+ catch (Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+
+ closeConnectionThread.start();
+
+ try
+ {
+ /* We want to simulate a long-running remoting thread here. If closing the connection in the closeConnectionThread
+ * interrupts this thread then it will cause sleep() to throw and InterruptedException. Therefore we catch
+ * the InterruptedException and re-interrupt the current thread so the interrupt will be passed properly
+ * back to the caller. It's a bit of a hack, but I couldn't find any other way to simulate it.
+ */
+ Thread.sleep(1500);
+ }
+ catch (InterruptedException e)
+ {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /*
+ * Test for https://bugzilla.redhat.com/show_bug.cgi?id=1193085
+ * */
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "rule to kill connection",
+ targetClass = "org.apache.activemq.core.journal.impl.NIOSequentialFile",
+ targetMethod = "open(int, boolean)",
+ targetLocation = "AT INVOKE java.nio.channels.FileChannel.size()",
+ action = "org.apache.activemq.tests.extras.byteman.ClosingConnectionTest.killConnection();"
+
+ )
+ }
+ )
+ public void testKillConnection() throws Exception
+ {
+ locator.setBlockOnNonDurableSend(true);
+ locator.setBlockOnDurableSend(true);
+ locator.setBlockOnAcknowledge(true);
+
+ ClientSessionFactory sf = createSessionFactory(locator);
+ ClientSession session = sf.createSession("guest", null, false, true, true, false, 0);
+
+ session.createQueue(ADDRESS, ADDRESS, null, true);
+
+ ClientProducer producer = session.createProducer(ADDRESS);
+
+ ClientMessage message = session.createMessage(true);
+ message.getBodyBuffer().writeBytes(new byte[1024]);
+
+ for (int i = 0; i < 200; i++)
+ {
+ producer.send(message);
+ }
+
+ assertTrue(server.locateQueue(ADDRESS).getPageSubscription().getPagingStore().isPaging());
+
+ readyToKill = true;
+ try
+ {
+ for (int i = 0; i < 8; i++)
+ {
+ producer.send(message);
+ }
+ fail("Sending message here should result in failure.");
+ }
+ catch (Exception e)
+ {
+ IntegrationTestLogger.LOGGER.info("Caught exception: " + e.getMessage());
+ }
+
+ Thread.sleep(1000);
+
+ assertTrue(server.isStarted());
+
+ session.close();
+ }
+
+ private ActiveMQServer newActiveMQServer() throws Exception
+ {
+ ActiveMQServer server = createServer(true, createDefaultConfig(isNetty()), mBeanServer);
+
+ AddressSettings defaultSetting = new AddressSettings();
+ defaultSetting.setPageSizeBytes(10 * 1024);
+ defaultSetting.setMaxSizeBytes(20 * 1024);
+
+ server.getAddressSettingsRepository().addMatch("#", defaultSetting);
+
+ return server;
+ }
+}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java
new file mode 100644
index 0000000..bc9f853
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/ClusteredGroupingTest.java
@@ -0,0 +1,473 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.api.core.ActiveMQNonExistentQueueException;
+import org.apache.activemq.api.core.Message;
+import org.apache.activemq.api.core.SimpleString;
+import org.apache.activemq.api.core.management.ManagementHelper;
+import org.apache.activemq.api.core.management.CoreNotificationType;
+import org.apache.activemq.core.server.ActiveMQServer;
+import org.apache.activemq.core.server.group.impl.GroupingHandlerConfiguration;
+import org.apache.activemq.core.server.group.impl.Response;
+import org.apache.activemq.core.server.management.Notification;
+import org.apache.activemq.tests.integration.cluster.distribution.ClusterTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * @author <a href="mailto:andy.taylor@jboss.org">Andy Taylor</a>
+ */
+@RunWith(BMUnitRunner.class)
+public class ClusteredGroupingTest extends ClusterTestBase
+{
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "blow-up",
+ targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+ targetMethod = "removeGrouping",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause($1);"
+ ),
+ @BMRule
+ (
+ name = "blow-up2",
+ targetClass = "org.apache.activemq.core.server.group.impl.GroupHandlingAbstract",
+ targetMethod = "forceRemove",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();"
+ )
+ }
+ )
+ public void test2serversLocalGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1);
+
+ setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+
+ startServers(0, 1);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 1, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 1, 1, false);
+ waitForBindings(1, "queues.testaddress", 1, 0, false);
+
+ sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+ latch = new CountDownLatch(1);
+ latch2 = new CountDownLatch(1);
+
+ crashAndWaitForFailure(getServer(1));
+
+ assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+ try
+ {
+ try
+ {
+ sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+ }
+ catch (ActiveMQNonExistentQueueException e)
+ {
+ fail("did not handle removal of queue");
+ }
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ }
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "blow-up",
+ targetClass = "org.apache.activemq.core.server.group.impl.RemoteGroupingHandler",
+ targetMethod = "onNotification",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
+ ),
+ @BMRule(name = "blow-up2",
+ targetClass = "org.apache.activemq.core.server.group.impl.RemoteGroupingHandler",
+ targetMethod = "remove",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
+ }
+ )
+ public void test3serversLocalGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 1, false);
+ waitForBindings(2, "queues.testaddress", 2, 0, false);
+
+ sendWithProperty(1, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+ latch = new CountDownLatch(1);
+ latch2 = new CountDownLatch(1);
+
+ main = Thread.currentThread();
+ crashAndWaitForFailure(getServer(2));
+
+ assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+ try
+ {
+ try
+ {
+ sendWithProperty(1, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+ }
+ catch (ActiveMQNonExistentQueueException e)
+ {
+ fail("did not handle removal of queue");
+ }
+ }
+ finally
+ {
+ latch.countDown();
+ }
+
+ assertHandlersAreSame(getServer(0), getServer(1));
+ }
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "blow-up",
+ targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+ targetMethod = "onNotification",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
+ ),
+ @BMRule(name = "blow-up2",
+ targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+ targetMethod = "remove",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
+ }
+ )
+ public void testLocal3serversLocalGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2);
+
+ setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2);
+
+ setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+
+ startServers(0, 1, 2);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+
+ waitForBindings(0, "queues.testaddress", 2, 1, false);
+ waitForBindings(1, "queues.testaddress", 2, 1, false);
+ waitForBindings(2, "queues.testaddress", 2, 0, false);
+
+ sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+ latch = new CountDownLatch(1);
+ latch2 = new CountDownLatch(1);
+
+ main = Thread.currentThread();
+ crashAndWaitForFailure(getServer(2));
+
+ assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+ try
+ {
+ try
+ {
+ sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+ }
+ catch (ActiveMQNonExistentQueueException e)
+ {
+ fail("did not handle removal of queue");
+ }
+ }
+ finally
+ {
+ latch.countDown();
+ }
+
+ assertHandlersAreSame(getServer(0), getServer(1));
+ }
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "blow-up",
+ targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+ targetMethod = "onNotification",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.pause2($1);"
+ ),
+ @BMRule(name = "blow-up2",
+ targetClass = "org.apache.activemq.core.server.group.impl.LocalGroupingHandler",
+ targetMethod = "remove",
+ targetLocation = "ENTRY",
+ action = "org.apache.activemq.tests.extras.byteman.ClusteredGroupingTest.restart2();")
+ }
+ )
+ public void testLocal4serversLocalGoesDown() throws Exception
+ {
+ setupServer(0, isFileStorage(), isNetty());
+ setupServer(1, isFileStorage(), isNetty());
+ setupServer(2, isFileStorage(), isNetty());
+ setupServer(3, isFileStorage(), isNetty());
+
+ setupClusterConnection("cluster0", "queues", false, 1, 0, 500, isNetty(), 0, 1, 2, 3);
+
+ setupClusterConnection("cluster1", "queues", false, 1, 0, 500, isNetty(), 1, 0, 2, 3);
+
+ setupClusterConnection("cluster2", "queues", false, 1, 0, 500, isNetty(), 2, 0, 1, 3);
+
+ setupClusterConnection("cluster3", "queues", false, 1, 0, 500, isNetty(), 3, 1, 2, 3);
+
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.LOCAL, 0);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 1);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 2);
+ setUpGroupHandler(GroupingHandlerConfiguration.TYPE.REMOTE, 3);
+
+ startServers(0, 1, 2, 3);
+
+ setupSessionFactory(0, isNetty());
+ setupSessionFactory(1, isNetty());
+ setupSessionFactory(2, isNetty());
+ setupSessionFactory(3, isNetty());
+
+ createQueue(0, "queues.testaddress", "queue0", null, true);
+ createQueue(1, "queues.testaddress", "queue0", null, true);
+ createQueue(2, "queues.testaddress", "queue0", null, true);
+ createQueue(3, "queues.testaddress", "queue0", null, true);
+
+ addConsumer(0, 2, "queue0", null);
+
+ waitForBindings(0, "queues.testaddress", 1, 0, true);
+ waitForBindings(1, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 1, 1, true);
+ waitForBindings(3, "queues.testaddress", 1, 0, true);
+
+ waitForBindings(0, "queues.testaddress", 3, 1, false);
+ waitForBindings(1, "queues.testaddress", 3, 1, false);
+ waitForBindings(2, "queues.testaddress", 3, 0, false);
+ waitForBindings(3, "queues.testaddress", 3, 1, false);
+
+ sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+
+ latch = new CountDownLatch(1);
+ latch2 = new CountDownLatch(1);
+
+ main = Thread.currentThread();
+ crashAndWaitForFailure(getServer(2));
+
+ assertTrue(latch2.await(20000, TimeUnit.MILLISECONDS));
+
+ try
+ {
+ try
+ {
+ sendWithProperty(0, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+ }
+ catch (ActiveMQNonExistentQueueException e)
+ {
+ fail("did not handle removal of queue");
+ }
+ }
+ finally
+ {
+ latch.countDown();
+ }
+ //now restart server
+ getServer(2).start();
+ waitForBindings(2, "queues.testaddress", 1, 0, true);
+ waitForBindings(2, "queues.testaddress", 3, 0, false);
+ sendWithProperty(3, "queues.testaddress", 1, true, Message.HDR_GROUP_ID, new SimpleString("id1"));
+ Thread.sleep(2000);
+ assertHandlersAreSame(getServer(0), getServer(1), getServer(2), getServer(3));
+ }
+
+ private void assertHandlersAreSame(ActiveMQServer server, ActiveMQServer... qServers)
+ {
+ SimpleString id = server.getGroupingHandler().getProposal(new SimpleString("id1.queue0"), false).getClusterName();
+ for (ActiveMQServer qServer : qServers)
+ {
+ Response proposal = qServer.getGroupingHandler().getProposal(new SimpleString("id1.queue0"), false);
+ if (proposal != null)
+ {
+ assertEquals(qServer.getIdentity() + " is incorrect", id, proposal.getChosenClusterName());
+ }
+ }
+ }
+
+ static CountDownLatch latch;
+ static CountDownLatch latch2;
+ static Thread main;
+
+ public static void pause(SimpleString clusterName)
+ {
+ if (clusterName.toString().startsWith("queue0"))
+ {
+ try
+ {
+ latch2.countDown();
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+
+ public static void pause2(Notification notification)
+ {
+ if (notification.getType() == CoreNotificationType.BINDING_REMOVED)
+ {
+ SimpleString clusterName = notification.getProperties()
+ .getSimpleStringProperty(ManagementHelper.HDR_CLUSTER_NAME);
+ boolean inMain = main == Thread.currentThread();
+ if (clusterName.toString().startsWith("queue0") && !inMain)
+ {
+ try
+ {
+ latch2.countDown();
+ latch.await();
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ public static void restart2()
+ {
+ latch.countDown();
+ }
+
+
+ @Override
+ @Before
+ public void setUp() throws Exception
+ {
+ super.setUp();
+ }
+
+ @Override
+ @After
+ public void tearDown() throws Exception
+ {
+ closeAllConsumers();
+ closeAllSessionFactories();
+ closeAllServerLocatorsFactories();
+ super.tearDown();
+ }
+
+ public boolean isNetty()
+ {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-6/blob/3661829e/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java
----------------------------------------------------------------------
diff --git a/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java
new file mode 100644
index 0000000..34f58f7
--- /dev/null
+++ b/tests/extra-tests/src/test/java/org/apache/activemq/tests/extras/byteman/GroupingTest.java
@@ -0,0 +1,202 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq.tests.extras.byteman;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.api.core.ActiveMQNotConnectedException;
+import org.apache.activemq.jms.client.ActiveMQConnectionFactory;
+import org.apache.activemq.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.tests.util.JMSTestBase;
+import org.jboss.byteman.contrib.bmunit.BMRule;
+import org.jboss.byteman.contrib.bmunit.BMRules;
+import org.jboss.byteman.contrib.bmunit.BMUnitRunner;
+import org.junit.After;
+import org.junit.Before;
+import org.junit.Test;
+import org.junit.runner.RunWith;
+
+/**
+ * GroupingTest
+ *
+ * @author Andy Taylor
+ */
+@RunWith(BMUnitRunner.class)
+public class GroupingTest extends JMSTestBase
+{
+ private Queue queue;
+ static boolean pause = false;
+
+ @Before
+ @Override
+ public void setUp() throws Exception
+ {
+ super.setUp();
+
+ queue = createQueue("TestQueue");
+ }
+
+ @After
+ @Override
+ public void tearDown() throws Exception
+ {
+ super.tearDown();
+ }
+
+ protected ConnectionFactory getCF() throws Exception
+ {
+ return cf;
+ }
+
+
+ @Test
+ @BMRules
+ (
+ rules =
+ {
+ @BMRule
+ (
+ name = "trace clientsessionimpl commit",
+ targetClass = "org.apache.activemq.core.server.impl.ServerSessionImpl",
+ targetMethod = "rollback",
+ targetLocation = "EXIT",
+ action = "org.apache.activemq.tests.extras.byteman.GroupingTest.pause();"
+ )
+ }
+ )
+ public void testGroupingRollbackOnClose() throws Exception
+ {
+ Connection sendConnection = null;
+ Connection connection = null;
+ Connection connection2 = null;
+ try
+ {
+ ActiveMQConnectionFactory fact = (ActiveMQConnectionFactory) getCF();
+ fact.setReconnectAttempts(0);
+ //fact.setConsumerWindowSize(1000);
+ //fact.setTransactionBatchSize(0);
+ connection = fact.createConnection();
+ RemotingConnection rc = server.getRemotingService().getConnections().iterator().next();
+ connection2 = fact.createConnection();
+ sendConnection = fact.createConnection();
+
+ final Session sendSession = sendConnection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ Session session = connection.createSession(true, Session.SESSION_TRANSACTED);
+ Session session2 = connection2.createSession(true, Session.SESSION_TRANSACTED);
+
+ final MessageProducer producer = sendSession.createProducer(queue);
+
+ MessageConsumer consumer1 = session.createConsumer(queue);
+ MessageConsumer consumer2 = session2.createConsumer(queue);
+
+ connection.start();
+ connection2.start();
+
+ final String jmsxgroupID = null;
+
+ Thread t = new Thread(new Runnable()
+ {
+ @Override
+ public void run()
+ {
+
+ try
+ {
+ for (int j = 0; j < 10000; j++)
+ {
+ TextMessage message = sendSession.createTextMessage();
+
+ message.setText("Message" + j);
+
+ message.setStringProperty("JMSXGroupID", "foo");
+
+ producer.send(message);
+ }
+ }
+ catch (JMSException e)
+ {
+ e.printStackTrace();
+ }
+ }
+ });
+ t.start();
+
+ //consume 5 msgs from 1st first consumer
+ for (int j = 0; j < 5; j++)
+ {
+ TextMessage tm = (TextMessage) consumer1.receive(10000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message" + j, tm.getText());
+
+ assertEquals(tm.getStringProperty("JMSXGroupID"), "foo");
+ }
+
+ pause = true;
+ rc.fail(new ActiveMQNotConnectedException());
+ pause = false;
+
+ for (int j = 0; j < 10000; j++)
+ {
+ TextMessage tm = (TextMessage) consumer2.receive(5000);
+
+ assertNotNull(tm);
+
+ assertEquals("Message" + j, tm.getText());
+
+ assertEquals(tm.getStringProperty("JMSXGroupID"), "foo");
+ }
+ }
+ finally
+ {
+ if (sendConnection != null)
+ {
+ sendConnection.close();
+ }
+ if (connection2 != null)
+ {
+ connection2.close();
+ }
+ }
+ }
+
+
+ public static void pause()
+ {
+ if (pause)
+ {
+ try
+ {
+ System.out.println("pausing after rollback");
+ Thread.sleep(500);
+ }
+ catch (InterruptedException e)
+ {
+ e.printStackTrace(); //To change body of catch statement use File | Settings | File Templates.
+ }
+ System.out.println("finished pausing after rollback");
+ }
+ }
+}