You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2016/03/16 16:21:59 UTC
[30/61] [abbrv] activemq-artemis git commit: open wire changes
equivalent to ab16f7098fb52d2b4c40627ed110e1776525f208
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
index ffdfc6e..d34f943 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/BrokerService.java
@@ -16,22 +16,25 @@
*/
package org.apache.activemq.broker;
+import javax.management.MalformedObjectNameException;
+import javax.management.ObjectName;
import java.io.BufferedReader;
import java.io.File;
import java.io.IOException;
import java.io.InputStream;
import java.io.InputStreamReader;
+import java.net.ServerSocket;
import java.net.URI;
import java.net.URISyntaxException;
import java.util.ArrayList;
import java.util.HashSet;
+import java.util.Iterator;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.WeakHashMap;
-
-import javax.management.MalformedObjectNameException;
-import javax.management.ObjectName;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.activemq.ActiveMQConnectionMetaData;
import org.apache.activemq.Service;
@@ -44,6 +47,8 @@ import org.apache.activemq.broker.region.policy.PolicyMap;
import org.apache.activemq.broker.scheduler.JobSchedulerStore;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.BrokerId;
+import org.apache.activemq.network.ConnectionFilter;
+import org.apache.activemq.network.DiscoveryNetworkConnector;
import org.apache.activemq.network.NetworkConnector;
import org.apache.activemq.network.jms.JmsConnector;
import org.apache.activemq.proxy.ProxyConnector;
@@ -57,6 +62,7 @@ import org.apache.activemq.usage.SystemUsage;
import org.apache.activemq.util.IOExceptionHandler;
import org.apache.activemq.util.IOHelper;
import org.apache.activemq.util.ServiceStopper;
+import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -68,10 +74,12 @@ import org.slf4j.LoggerFactory;
public class BrokerService implements Service {
public static final String DEFAULT_PORT = "61616";
+ public static final AtomicInteger RANDOM_PORT_BASE = new AtomicInteger(51616);
public static final String DEFAULT_BROKER_NAME = "localhost";
public static final String BROKER_VERSION;
public static final int DEFAULT_MAX_FILE_LENGTH = 1024 * 1024 * 32;
public static final long DEFAULT_START_TIMEOUT = 600000L;
+ public static boolean disableWrapper = false;
public String SERVER_SIDE_KEYSTORE;
public String KEYSTORE_PASSWORD;
@@ -99,6 +107,11 @@ public class BrokerService implements Service {
private PolicyMap destinationPolicy;
private SystemUsage systemUsage;
+ private boolean isClustered = true;
+ private final List<NetworkConnector> networkConnectors = new CopyOnWriteArrayList<NetworkConnector>();
+
+ private TemporaryFolder tmpfolder;
+
public static WeakHashMap<Broker, Exception> map = new WeakHashMap<>();
static {
@@ -131,6 +144,10 @@ public class BrokerService implements Service {
@Override
public void start() throws Exception {
+ File targetTmp = new File("./target/tmp");
+ targetTmp.mkdirs();
+ tmpfolder = new TemporaryFolder(targetTmp);
+ tmpfolder.create();
Exception e = new Exception();
e.fillInStackTrace();
startBroker(startAsync);
@@ -188,10 +205,10 @@ public class BrokerService implements Service {
LOG.info("Apache ActiveMQ Artemis{} ({}, {}) is shutting down", new Object[]{getBrokerVersion(), getBrokerName(), brokerId});
if (broker != null) {
- System.out.println("______________________stopping broker: " + broker.getClass().getName());
broker.stop();
broker = null;
}
+ tmpfolder.delete();
LOG.info("Apache ActiveMQ Artemis {} ({}, {}) is shutdown", new Object[]{getBrokerVersion(), getBrokerName(), brokerId});
}
@@ -200,7 +217,7 @@ public class BrokerService implements Service {
public Broker getBroker() throws Exception {
if (broker == null) {
- broker = createBroker();
+ broker = createBroker(tmpfolder.getRoot());
}
return broker;
}
@@ -220,13 +237,14 @@ public class BrokerService implements Service {
this.brokerName = str.trim();
}
- protected Broker createBroker() throws Exception {
- broker = createBrokerWrapper();
+ protected Broker createBroker(File temporaryFile) throws Exception {
+ new Exception("file=" + temporaryFile.getAbsolutePath()).printStackTrace();
+ broker = createBrokerWrapper(temporaryFile);
return broker;
}
- private Broker createBrokerWrapper() {
- return new ArtemisBrokerWrapper(this);
+ private Broker createBrokerWrapper(File temporaryFile) {
+ return new ArtemisBrokerWrapper(this, temporaryFile);
}
public void makeSureDestinationExists(ActiveMQDestination activemqDestination) throws Exception {
@@ -382,10 +400,6 @@ public class BrokerService implements Service {
public void setKeepDurableSubsActive(boolean keepDurableSubsActive) {
}
- public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
- return null;
- }
-
public TransportConnector getConnectorByName(String connectorName) {
return null;
}
@@ -407,8 +421,17 @@ public class BrokerService implements Service {
public void setSchedulerDirectoryFile(File schedulerDirectory) {
}
+ public NetworkConnector addNetworkConnector(String discoveryAddress) throws Exception {
+ return addNetworkConnector(new URI(discoveryAddress));
+ }
+
+ public NetworkConnector addNetworkConnector(URI discoveryAddress) throws Exception {
+ NetworkConnector connector = new DiscoveryNetworkConnector(discoveryAddress);
+ return addNetworkConnector(connector);
+ }
+
public List<NetworkConnector> getNetworkConnectors() {
- return new ArrayList<>();
+ return this.networkConnectors;
}
public void setSchedulerSupport(boolean schedulerSupport) {
@@ -471,6 +494,30 @@ public class BrokerService implements Service {
}
public NetworkConnector addNetworkConnector(NetworkConnector connector) throws Exception {
+ connector.setBrokerService(this);
+
+ System.out.println("------------------------ this broker uri: " + this.getConnectURI());
+ connector.setLocalUri(this.getConnectURI());
+ // Set a connection filter so that the connector does not establish loop
+ // back connections.
+ connector.setConnectionFilter(new ConnectionFilter() {
+ @Override
+ public boolean connectTo(URI location) {
+ List<TransportConnector> transportConnectors = getTransportConnectors();
+ for (Iterator<TransportConnector> iter = transportConnectors.iterator(); iter.hasNext();) {
+ try {
+ TransportConnector tc = iter.next();
+ if (location.equals(tc.getConnectUri())) {
+ return false;
+ }
+ } catch (Throwable e) {
+ }
+ }
+ return true;
+ }
+ });
+
+ networkConnectors.add(connector);
return connector;
}
@@ -486,19 +533,63 @@ public class BrokerService implements Service {
public TransportConnector addConnector(URI bindAddress) throws Exception {
Integer port = bindAddress.getPort();
+ String host = bindAddress.getHost();
FakeTransportConnector connector = null;
- if (port != 0) {
- connector = new FakeTransportConnector(bindAddress);
- this.transportConnectors.add(connector);
- this.extraConnectors.add(port);
+
+ host = (host == null || host.length() == 0) ? "localhost" : host;
+ if ("0.0.0.0".equals(host)) {
+ host = "localhost";
}
- else {
- connector = new FakeTransportConnector(new URI(this.getDefaultUri()));
- this.transportConnectors.add(connector);
+
+ if (port == 0) {
+ //In actual impl in amq5, after connector has been added the socket
+ //is bound already. This means in case of 0 port uri, the random
+ //port is available after this call. With artemis wrapper however
+ //the real binding happens during broker start. To work around this
+ //we use manually calculated port for that.
+ port = getPseudoRandomPort();
+
}
+
+ System.out.println("Now host is: " + host);
+ bindAddress = new URI(bindAddress.getScheme(), bindAddress.getUserInfo(),
+ host, port, bindAddress.getPath(), bindAddress.getQuery(), bindAddress.getFragment());
+
+ connector = new FakeTransportConnector(bindAddress);
+ this.transportConnectors.add(connector);
+ this.extraConnectors.add(port);
+
return connector;
}
+ private int getPseudoRandomPort() {
+ int port = RANDOM_PORT_BASE.getAndIncrement();
+ while (!checkPort(port)) {
+ port = RANDOM_PORT_BASE.getAndIncrement();
+ }
+ return port;
+ }
+
+ private static boolean checkPort(final int port) {
+ ServerSocket ssocket = null;
+ try {
+ ssocket = new ServerSocket(port);
+ }
+ catch (Exception e) {
+ return false;
+ }
+ finally {
+ if (ssocket != null) {
+ try {
+ ssocket.close();
+ }
+ catch (IOException e) {
+ }
+ }
+ }
+ return true;
+ }
+
public void setCacheTempDestinations(boolean cacheTempDestinations) {
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
index 5c052a6..fb3c242 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerBase.java
@@ -17,7 +17,6 @@
package org.apache.activemq.broker.artemiswrapper;
import java.io.File;
-import java.io.IOException;
import java.net.URI;
import java.util.HashMap;
import java.util.Map;
@@ -65,7 +64,6 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.store.PListStore;
import org.apache.activemq.thread.Scheduler;
import org.apache.activemq.usage.Usage;
-import org.junit.rules.TemporaryFolder;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -83,20 +81,19 @@ public abstract class ArtemisBrokerBase implements Broker {
protected volatile boolean stopped;
protected BrokerId brokerId = new BrokerId("Artemis Broker");
protected BrokerService bservice;
- protected TemporaryFolder temporaryFolder = new TemporaryFolder();
- protected String testDir;
+
+ protected final File temporaryFolder;
+ protected final String testDir;
protected boolean realStore = false;
protected ActiveMQServer server;
protected boolean enableSecurity = false;
- public ArtemisBrokerBase() {
- try {
- this.temporaryFolder.create();
- }
- catch (IOException e) {
- }
+ public ArtemisBrokerBase(File temporaryFolder) {
+ this.temporaryFolder = temporaryFolder;
+ this.testDir = temporaryFolder.getAbsolutePath();
+
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
index 61d6250..3ad6072 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/ArtemisBrokerWrapper.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.broker.artemiswrapper;
+import java.io.File;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Iterator;
@@ -46,20 +47,16 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
protected final Map<String, SimpleString> testQueues = new HashMap<>();
protected JMSServerManagerImpl jmsServer;
- public ArtemisBrokerWrapper(BrokerService brokerService) {
+ public ArtemisBrokerWrapper(BrokerService brokerService, File temporaryFolder) {
+ super(temporaryFolder);
this.bservice = brokerService;
}
@Override
public void start() throws Exception {
- testDir = temporaryFolder.getRoot().getAbsolutePath();
clearDataRecreateServerDirs();
server = createServer(realStore, true);
server.getConfiguration().getAcceptorConfigurations().clear();
- HashMap<String, Object> params = new HashMap<>();
- params.put(TransportConstants.PORT_PROP_NAME, "61616");
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE,CORE");
- TransportConfiguration transportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
Configuration serverConfig = server.getConfiguration();
@@ -82,9 +79,11 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
commonSettings.setDeadLetterAddress(dla);
commonSettings.setAutoCreateJmsQueues(true);
- serverConfig.getAcceptorConfigurations().add(transportConfiguration);
+ HashMap<String, Object> params = new HashMap<String, Object>();
+ if (bservice.extraConnectors.size() == 0) {
+ serverConfig.addAcceptorConfiguration("home", "tcp://localhost:61616?protocols=OPENWIRE,CORE");
+ }
if (this.bservice.enableSsl()) {
- params = new HashMap<>();
params.put(TransportConstants.SSL_ENABLED_PROP_NAME, true);
params.put(TransportConstants.PORT_PROP_NAME, 61611);
params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE");
@@ -102,14 +101,7 @@ public class ArtemisBrokerWrapper extends ArtemisBrokerBase {
}
for (Integer port : bservice.extraConnectors) {
- if (port.intValue() != 61616) {
- //extra port
- params = new HashMap<>();
- params.put(TransportConstants.PORT_PROP_NAME, port.intValue());
- params.put(TransportConstants.PROTOCOLS_PROP_NAME, "OPENWIRE");
- TransportConfiguration extraTransportConfiguration = new TransportConfiguration(NETTY_ACCEPTOR_FACTORY, params);
- serverConfig.getAcceptorConfigurations().add(extraTransportConfiguration);
- }
+ serverConfig.addAcceptorConfiguration("homePort" + port, "tcp://localhost:" + port + "?protocols=OPENWIRE,CORE");
}
serverConfig.setSecurityEnabled(enableSecurity);
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
new file mode 100644
index 0000000..be9cf06
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/broker/artemiswrapper/OpenwireArtemisBaseTest.java
@@ -0,0 +1,266 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.broker.artemiswrapper;
+
+import java.io.File;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.activemq.artemis.api.core.management.ObjectNameBuilder;
+import org.apache.activemq.artemis.api.jms.management.JMSQueueControl;
+import org.apache.activemq.artemis.api.jms.management.JMSServerControl;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
+import org.apache.activemq.broker.BrokerService;
+import org.junit.Assert;
+import org.junit.Rule;
+import org.junit.rules.TemporaryFolder;
+import org.junit.rules.TestName;
+
+import javax.management.MBeanServer;
+import javax.management.MBeanServerInvocationHandler;
+import javax.management.ObjectName;
+
+public class OpenwireArtemisBaseTest {
+
+ @Rule
+ public TemporaryFolder temporaryFolder;
+ @Rule
+ public TestName name = new TestName();
+
+ public OpenwireArtemisBaseTest() {
+ File tmpRoot = new File("./target/tmp");
+ tmpRoot.mkdirs();
+ temporaryFolder = new TemporaryFolder(tmpRoot);
+ //The wrapper stuff will automatically create a default
+ //server on a normal connection factory, which will
+ //cause problems with clustering tests, which starts
+ //all servers explicitly. Setting this to true
+ //can prevent the auto-creation from happening.
+ BrokerService.disableWrapper = true;
+ }
+
+
+ public String getTmp() {
+ return getTmpFile().getAbsolutePath();
+ }
+
+ public File getTmpFile() {
+ return temporaryFolder.getRoot();
+ }
+
+ protected String getJournalDir(int serverID, boolean backup) {
+ return getTmp() + "/journal_" + serverID + "_" + backup;
+ }
+
+ protected String getBindingsDir(int serverID, boolean backup) {
+ return getTmp() + "/binding_" + serverID + "_" + backup;
+ }
+
+ protected String getPageDir(int serverID, boolean backup) {
+ return getTmp() + "/paging_" + serverID + "_" + backup;
+ }
+
+ protected String getLargeMessagesDir(int serverID, boolean backup) {
+ return getTmp() + "/paging_" + serverID + "_" + backup;
+ }
+
+ public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
+
+ protected Configuration createConfig(final int serverID) throws Exception {
+ return createConfig("localhost", serverID);
+ }
+
+ protected Configuration createConfig(final String hostAddress, final int serverID, final int port) throws Exception {
+ ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+ setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
+ setJournalDirectory(getJournalDir(serverID, false)).
+ setBindingsDirectory(getBindingsDir(serverID, false)).
+ setPagingDirectory(getPageDir(serverID, false)).
+ setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+ setJournalCompactMinFiles(0).
+ setJournalCompactPercentage(0).
+ setClusterPassword(CLUSTER_PASSWORD);
+
+ configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+ configuration.addAcceptorConfiguration("netty", newURIwithPort(hostAddress, port));
+ configuration.addConnectorConfiguration("netty-connector", newURIwithPort(hostAddress, port));
+
+ return configuration;
+ }
+
+ protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception {
+ ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+ setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
+ setJournalDirectory(getJournalDir(serverID, false)).
+ setBindingsDirectory(getBindingsDir(serverID, false)).
+ setPagingDirectory(getPageDir(serverID, false)).
+ setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+ setJournalCompactMinFiles(0).
+ setJournalCompactPercentage(0).
+ setClusterPassword(CLUSTER_PASSWORD);
+
+ configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+ configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID));
+ configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID));
+
+ return configuration;
+ }
+
+ //extraAcceptor takes form: "?name=value&name1=value ..."
+ protected Configuration createConfig(final int serverID, String extraAcceptorParams) throws Exception {
+ ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+ setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(100 * 1024).setJournalType(JournalType.NIO).
+ setJournalDirectory(getJournalDir(serverID, false)).
+ setBindingsDirectory(getBindingsDir(serverID, false)).
+ setPagingDirectory(getPageDir(serverID, false)).
+ setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+ setJournalCompactMinFiles(0).
+ setJournalCompactPercentage(0).
+ setClusterPassword(CLUSTER_PASSWORD);
+
+ configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+ String fullAcceptorUri = newURI(serverID) + extraAcceptorParams;
+ configuration.addAcceptorConfiguration("netty", fullAcceptorUri);
+
+ configuration.addConnectorConfiguration("netty-connector", newURI(serverID));
+ return configuration;
+ }
+
+ public void deployClusterConfiguration(Configuration config, Integer ... targetIDs) throws Exception {
+ StringBuffer stringBuffer = new StringBuffer();
+ String separator = "";
+ for (int x : targetIDs) {
+ stringBuffer.append(separator + newURI(x));
+ separator = ",";
+ }
+
+ String ccURI = "static://(" + stringBuffer.toString() + ")?connectorName=netty-connector;retryInterval=500;messageLoadBalancingType=STRICT;maxHops=1";
+
+ config.addClusterConfiguration("clusterCC", ccURI);
+ }
+
+ protected static String newURI(int serverID) {
+ return newURI("localhost", serverID);
+ }
+
+ protected static String newURI(String localhostAddress, int serverID) {
+ return "tcp://" + localhostAddress + ":" + (61616 + serverID);
+ }
+
+ protected static String newURIwithPort(String localhostAddress, int port) {
+ return "tcp://" + localhostAddress + ":" + port;
+ }
+
+ public static JMSServerControl createJMSServerControl(final MBeanServer mbeanServer) throws Exception {
+ return (JMSServerControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSServerObjectName(), JMSServerControl.class, mbeanServer);
+ }
+
+ public static JMSQueueControl createJMSQueueControl(final String name,
+ final MBeanServer mbeanServer) throws Exception {
+ return (JMSQueueControl) createProxy(ObjectNameBuilder.DEFAULT.getJMSQueueObjectName(name), JMSQueueControl.class, mbeanServer);
+ }
+
+ private static Object createProxy(final ObjectName objectName,
+ final Class mbeanInterface,
+ final MBeanServer mbeanServer) {
+ return MBeanServerInvocationHandler.newProxyInstance(mbeanServer, objectName, mbeanInterface, false);
+ }
+
+ protected void shutDownClusterServers(EmbeddedJMS[] servers) throws Exception {
+ for (int i = 0; i < servers.length; i++) {
+ try {
+ servers[i].stop();
+ }
+ catch (Throwable t) {
+ t.printStackTrace();
+ }
+ }
+ }
+
+ protected void shutDownNonClusterServers(EmbeddedJMS[] servers) throws Exception {
+ shutDownClusterServers(servers);
+ }
+
+ protected void setUpNonClusterServers(EmbeddedJMS[] servers) throws Exception {
+
+ Configuration[] serverCfgs = new Configuration[servers.length];
+ for (int i = 0; i < servers.length; i++) {
+ serverCfgs[i] = createConfig(i);
+ }
+
+ for (int i = 0; i < servers.length; i++) {
+ servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl());
+ }
+
+ for (int i = 0; i < servers.length; i++) {
+ servers[i].start();
+ }
+ }
+
+ protected void setUpClusterServers(EmbeddedJMS[] servers) throws Exception {
+
+ Configuration[] serverCfgs = new Configuration[servers.length];
+ for (int i = 0; i < servers.length; i++) {
+ serverCfgs[i] = createConfig(i);
+ }
+
+ for (int i = 0; i < servers.length; i++) {
+ deployClusterConfiguration(serverCfgs[i], getTargets(servers.length, i));
+ }
+
+ for (int i = 0; i < servers.length; i++) {
+ servers[i] = new EmbeddedJMS().setConfiguration(serverCfgs[i]).setJmsConfiguration(new JMSConfigurationImpl());
+ }
+
+ for (int i = 0; i < servers.length; i++) {
+ servers[i].start();
+ }
+
+ for (int i = 0; i < servers.length; i++) {
+ Assert.assertTrue(servers[i].waitClusterForming(100, TimeUnit.MILLISECONDS, 20, servers.length));
+ }
+ }
+
+ private Integer[] getTargets(int total, int self)
+ {
+ int lenTargets = total - self;
+ List<Integer> targets = new ArrayList<>();
+ for (int i = 0; i < lenTargets; i++) {
+ if (i != self) {
+ targets.add(i);
+ }
+ }
+ return targets.toArray(new Integer[0]);
+ }
+
+ public EmbeddedJMS createBroker() throws Exception {
+ Configuration config0 = createConfig(0);
+ EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ return newbroker;
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
index 34babf8..0843d3a 100644
--- a/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
+++ b/tests/activemq5-unit-tests/src/main/java/org/apache/activemq/transport/tcp/TcpTransportFactory.java
@@ -20,6 +20,7 @@ import java.io.IOException;
import java.net.URI;
import java.net.URISyntaxException;
import java.net.UnknownHostException;
+import java.util.Collections;
import java.util.HashMap;
import java.util.Map;
@@ -29,6 +30,7 @@ import javax.net.SocketFactory;
import org.apache.activemq.TransportLoggerSupport;
import org.apache.activemq.artemiswrapper.ArtemisBrokerHelper;
import org.apache.activemq.broker.BrokerRegistry;
+import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.transport.*;
import org.apache.activemq.util.IOExceptionSupport;
@@ -54,11 +56,10 @@ public class TcpTransportFactory extends TransportFactory {
//here check broker, if no broker, we start one
Map<String, String> params = URISupport.parseParameters(location);
String brokerId = params.remove("invmBrokerId");
- params.clear();
- location = URISupport.createRemainingURI(location, params);
- if (brokerService == null) {
+ URI location1 = URISupport.createRemainingURI(location, Collections.EMPTY_MAP);
+ if (brokerService == null && !BrokerService.disableWrapper) {
- ArtemisBrokerHelper.startArtemisBroker(location);
+ ArtemisBrokerHelper.startArtemisBroker(location1);
brokerService = location.toString();
if (brokerId != null) {
@@ -66,7 +67,8 @@ public class TcpTransportFactory extends TransportFactory {
System.out.println("bound: " + brokerId);
}
}
- return super.doConnect(location);
+ URI location2 = URISupport.createRemainingURI(location, params);
+ return super.doConnect(location2);
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
deleted file mode 100644
index fd06de9..0000000
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ActiveMQInputStreamTest.java
+++ /dev/null
@@ -1,148 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.io.InputStream;
-import java.io.OutputStream;
-
-import javax.jms.Queue;
-import javax.jms.Session;
-
-import junit.framework.TestCase;
-
-import org.apache.activemq.broker.BrokerService;
-import org.apache.activemq.command.ActiveMQDestination;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-@Deprecated
-public class ActiveMQInputStreamTest extends TestCase {
-
- private static final Logger LOG = LoggerFactory.getLogger(ActiveMQInputStreamTest.class);
-
- private static final String BROKER_URL = "tcp://localhost:0";
- private static final String DESTINATION = "destination";
- private static final int STREAM_LENGTH = 64 * 1024 + 0; // change 0 to 1 to make it not crash
-
- private BrokerService broker;
- private String connectionUri;
-
- @Override
- public void setUp() throws Exception {
- broker = new BrokerService();
- broker.setUseJmx(false);
- broker.setPersistent(false);
- broker.setDestinations(new ActiveMQDestination[]{ActiveMQDestination.createDestination(DESTINATION, ActiveMQDestination.QUEUE_TYPE),});
- broker.addConnector(BROKER_URL);
- broker.start();
- broker.waitUntilStarted();
-
- //some internal api we don't implement
- connectionUri = broker.getDefaultUri();
- }
-
- @Override
- public void tearDown() throws Exception {
- broker.stop();
- broker.waitUntilStopped();
- }
-
- public void testInputStreamSetSyncSendOption() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(DESTINATION + "?producer.alwaysSyncSend=true");
-
- OutputStream out = null;
- try {
- out = connection.createOutputStream(destination);
-
- assertTrue(((ActiveMQOutputStream) out).isAlwaysSyncSend());
-
- LOG.debug("writing...");
- for (int i = 0; i < STREAM_LENGTH; ++i) {
- out.write(0);
- }
- LOG.debug("wrote " + STREAM_LENGTH + " bytes");
- }
- finally {
- if (out != null) {
- out.close();
- }
- }
-
- InputStream in = null;
- try {
- in = connection.createInputStream(destination);
- LOG.debug("reading...");
- int count = 0;
- while (-1 != in.read()) {
- ++count;
- }
- LOG.debug("read " + count + " bytes");
- }
- finally {
- if (in != null) {
- in.close();
- }
- }
-
- connection.close();
- }
-
- public void testInputStreamMatchesDefaultChuckSize() throws Exception {
-
- ActiveMQConnectionFactory connectionFactory = new ActiveMQConnectionFactory(connectionUri);
- ActiveMQConnection connection = (ActiveMQConnection) connectionFactory.createConnection();
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- Queue destination = session.createQueue(DESTINATION);
-
- OutputStream out = null;
- try {
- out = connection.createOutputStream(destination);
- LOG.debug("writing...");
- for (int i = 0; i < STREAM_LENGTH; ++i) {
- out.write(0);
- }
- LOG.debug("wrote " + STREAM_LENGTH + " bytes");
- }
- finally {
- if (out != null) {
- out.close();
- }
- }
-
- InputStream in = null;
- try {
- in = connection.createInputStream(destination);
- LOG.debug("reading...");
- int count = 0;
- while (-1 != in.read()) {
- ++count;
- }
- LOG.debug("read " + count + " bytes");
- }
- finally {
- if (in != null) {
- in.close();
- }
- }
-
- connection.close();
- }
-}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
new file mode 100644
index 0000000..f47620f
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/AutoFailTestSupport.java
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import java.util.Map;
+import java.util.Map.Entry;
+import java.util.concurrent.atomic.AtomicBoolean;
+
+import junit.framework.TestCase;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Enforces a test case to run for only an allotted time to prevent them from
+ * hanging and breaking the whole testing.
+ *
+ *
+ */
+
+public abstract class AutoFailTestSupport extends TestCase {
+ public static final int EXIT_SUCCESS = 0;
+ public static final int EXIT_ERROR = 1;
+ private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class);
+
+ private long maxTestTime = 5 * 60 * 1000; // 5 mins by default
+ private Thread autoFailThread;
+
+ private boolean verbose = true;
+ private boolean useAutoFail; // Disable auto fail by default
+ private AtomicBoolean isTestSuccess;
+
+ protected void setUp() throws Exception {
+ // Runs the auto fail thread before performing any setup
+ if (isAutoFail()) {
+ startAutoFailThread();
+ }
+ super.setUp();
+ }
+
+ protected void tearDown() throws Exception {
+ super.tearDown();
+
+ // Stops the auto fail thread only after performing any clean up
+ stopAutoFailThread();
+ }
+
+ /**
+ * Manually start the auto fail thread. To start it automatically, just set
+ * the auto fail to true before calling any setup methods. As a rule, this
+ * method is used only when you are not sure, if the setUp and tearDown
+ * method is propagated correctly.
+ */
+ public void startAutoFailThread() {
+ setAutoFail(true);
+ isTestSuccess = new AtomicBoolean(false);
+ autoFailThread = new Thread(new Runnable() {
+ public void run() {
+ try {
+ // Wait for test to finish succesfully
+ Thread.sleep(getMaxTestTime());
+ } catch (InterruptedException e) {
+ // This usually means the test was successful
+ } finally {
+ // Check if the test was able to tear down succesfully,
+ // which usually means, it has finished its run.
+ if (!isTestSuccess.get()) {
+ LOG.error("Test case has exceeded the maximum allotted time to run of: " + getMaxTestTime() + " ms.");
+ dumpAllThreads(getName());
+ if (System.getProperty("org.apache.activemq.AutoFailTestSupport.disableSystemExit") == null) {
+ System.exit(EXIT_ERROR);
+ } else {
+ LOG.error("No system.exit as it kills surefire - forkedProcessTimeoutInSeconds (surefire.timeout) will kick in eventually see pom.xml surefire plugin config");
+ }
+ }
+ }
+ }
+ }, "AutoFailThread");
+
+ if (verbose) {
+ LOG.info("Starting auto fail thread...");
+ }
+
+ LOG.info("Starting auto fail thread...");
+ autoFailThread.start();
+ }
+
+ /**
+ * Manually stops the auto fail thread. As a rule, this method is used only
+ * when you are not sure, if the setUp and tearDown method is propagated
+ * correctly.
+ */
+ public void stopAutoFailThread() {
+ if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) {
+ isTestSuccess.set(true);
+
+ if (verbose) {
+ LOG.info("Stopping auto fail thread...");
+ }
+
+ LOG.info("Stopping auto fail thread...");
+ autoFailThread.interrupt();
+ }
+ }
+
+ /**
+ * Sets the auto fail value. As a rule, this should be used only before any
+ * setup methods is called to automatically enable the auto fail thread in
+ * the setup method of the test case.
+ *
+ * @param val
+ */
+ public void setAutoFail(boolean val) {
+ this.useAutoFail = val;
+ }
+
+ public boolean isAutoFail() {
+ return this.useAutoFail;
+ }
+
+ /**
+ * The assigned value will only be reflected when the auto fail thread has
+ * started its run. Value is in milliseconds.
+ *
+ * @param val
+ */
+ public void setMaxTestTime(long val) {
+ this.maxTestTime = val;
+ }
+
+ public long getMaxTestTime() {
+ return this.maxTestTime;
+ }
+
+
+ public static void dumpAllThreads(String prefix) {
+ Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
+ for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
+ System.err.println(prefix + " " + stackEntry.getKey());
+ for(StackTraceElement element : stackEntry.getValue()) {
+ System.err.println(" " + element);
+ }
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
index 5e5b993..b8397e2 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/ConnectionCleanupTest.java
@@ -52,22 +52,50 @@ public class ConnectionCleanupTest extends TestCase {
try {
connection.setClientID("test");
- // fail("Should have received JMSException");
+ fail("Should have received JMSException");
}
catch (JMSException e) {
}
- connection.cleanup();
+ connection.doCleanup(true);
connection.setClientID("test");
connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
try {
connection.setClientID("test");
- // fail("Should have received JMSException");
+ fail("Should have received JMSException");
}
catch (JMSException e) {
}
}
+ public void testChangeClientIDDenied() throws JMSException {
+
+ connection.setClientID("test");
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try {
+ connection.setClientID("test");
+ fail("Should have received JMSException");
+ } catch (JMSException e) {
+ }
+
+ connection.cleanup();
+
+ try {
+ connection.setClientID("test");
+ fail("Should have received JMSException");
+ } catch (JMSException e) {
+ }
+
+ connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ try {
+ connection.setClientID("test");
+ fail("Should have received JMSException");
+ } catch (JMSException e) {
+ }
+ }
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
index fa58ebe..b8dea70 100644
--- a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/EmbeddedBrokerTestSupport.java
@@ -16,15 +16,23 @@
*/
package org.apache.activemq;
+import org.apache.activemq.artemis.core.config.Configuration;
+import org.apache.activemq.artemis.core.config.impl.ConfigurationImpl;
+import org.apache.activemq.artemis.core.server.JournalType;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
+import org.apache.activemq.artemis.jms.server.config.impl.JMSConfigurationImpl;
+import org.apache.activemq.artemis.jms.server.embedded.EmbeddedJMS;
import org.apache.activemq.broker.BrokerService;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
+import org.junit.rules.TemporaryFolder;
import org.springframework.jms.core.JmsTemplate;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.Destination;
+import java.io.File;
/**
* A useful base class which creates and closes an embedded broker
@@ -32,17 +40,26 @@ import javax.jms.Destination;
public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
protected BrokerService broker;
- // protected String bindAddress = "tcp://localhost:61616";
- protected String bindAddress = "vm://localhost";
+ protected EmbeddedJMS artemisBroker;
+ protected String bindAddress = "tcp://localhost:61616";
protected ConnectionFactory connectionFactory;
protected boolean useTopic;
protected ActiveMQDestination destination;
protected JmsTemplate template;
- @Override
+ public TemporaryFolder temporaryFolder;
+
+ public String CLUSTER_PASSWORD = "OPENWIRECLUSTER";
+
protected void setUp() throws Exception {
- if (broker == null) {
- broker = createBroker();
+ BrokerService.disableWrapper = true;
+ File tmpRoot = new File("./target/tmp");
+ tmpRoot.mkdirs();
+ temporaryFolder = new TemporaryFolder(tmpRoot);
+ temporaryFolder.create();
+
+ if (artemisBroker == null) {
+ artemisBroker = createArtemisBroker();
}
startBroker();
@@ -58,13 +75,42 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
@Override
protected void tearDown() throws Exception {
- if (broker != null) {
+ if (artemisBroker != null) {
try {
- broker.stop();
+ artemisBroker.stop();
}
catch (Exception e) {
}
}
+ temporaryFolder.delete();
+ }
+
+ public String getTmp() {
+ return getTmpFile().getAbsolutePath();
+ }
+
+ public File getTmpFile() {
+ return temporaryFolder.getRoot();
+ }
+
+ protected String getJournalDir(int serverID, boolean backup) {
+ return getTmp() + "/journal_" + serverID + "_" + backup;
+ }
+
+ protected String getBindingsDir(int serverID, boolean backup) {
+ return getTmp() + "/binding_" + serverID + "_" + backup;
+ }
+
+ protected String getPageDir(int serverID, boolean backup) {
+ return getTmp() + "/paging_" + serverID + "_" + backup;
+ }
+
+ protected String getLargeMessagesDir(int serverID, boolean backup) {
+ return getTmp() + "/paging_" + serverID + "_" + backup;
+ }
+
+ protected static String newURI(String localhostAddress, int serverID) {
+ return "tcp://" + localhostAddress + ":" + (61616 + serverID);
}
/**
@@ -114,20 +160,44 @@ public abstract class EmbeddedBrokerTestSupport extends CombinationTestSupport {
return new ActiveMQConnectionFactory(bindAddress);
}
- /**
- * Factory method to create a new broker
- *
- * @throws Exception
- */
+
+ public EmbeddedJMS createArtemisBroker() throws Exception {
+ Configuration config0 = createConfig("localhost", 0);
+ EmbeddedJMS newbroker = new EmbeddedJMS().setConfiguration(config0).setJmsConfiguration(new JMSConfigurationImpl());
+ return newbroker;
+ }
+
+ protected Configuration createConfig(final String hostAddress, final int serverID) throws Exception {
+ ConfigurationImpl configuration = new ConfigurationImpl().setJMXManagementEnabled(false).
+ setSecurityEnabled(false).setJournalMinFiles(2).setJournalFileSize(1000 * 1024).setJournalType(JournalType.NIO).
+ setJournalDirectory(getJournalDir(serverID, false)).
+ setBindingsDirectory(getBindingsDir(serverID, false)).
+ setPagingDirectory(getPageDir(serverID, false)).
+ setLargeMessagesDirectory(getLargeMessagesDir(serverID, false)).
+ setJournalCompactMinFiles(0).
+ setJournalCompactPercentage(0).
+ setClusterPassword(CLUSTER_PASSWORD);
+
+ configuration.addAddressesSetting("#", new AddressSettings().setAutoCreateJmsQueues(true).setAutoDeleteJmsQueues(true));
+
+ configuration.addAcceptorConfiguration("netty", newURI(hostAddress, serverID));
+ configuration.addConnectorConfiguration("netty-connector", newURI(hostAddress, serverID));
+
+ return configuration;
+ }
+
+ //we keep this because some other tests uses it.
+ //we'll delete this when those tests are dealt with.
protected BrokerService createBroker() throws Exception {
BrokerService answer = new BrokerService();
answer.setPersistent(isPersistent());
+ answer.getManagementContext().setCreateConnector(false);
answer.addConnector(bindAddress);
return answer;
}
protected void startBroker() throws Exception {
- broker.start();
+ artemisBroker.start();
}
/**
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
new file mode 100755
index 0000000..b7c2e94
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsQueueTransactionTest.java
@@ -0,0 +1,234 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Destination;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.util.ArrayList;
+import java.util.Enumeration;
+
+import org.apache.activemq.test.JmsResourceProvider;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public class JmsQueueTransactionTest extends JmsTransactionTestSupport {
+ private static final Logger LOG = LoggerFactory.getLogger(JmsQueueTransactionTest.class);
+
+ /**
+ * @see org.apache.activemq.JmsTransactionTestSupport#getJmsResourceProvider()
+ */
+ protected JmsResourceProvider getJmsResourceProvider() {
+ JmsResourceProvider p = new JmsResourceProvider();
+ p.setTopic(false);
+ return p;
+ }
+
+ /**
+ * Tests if the the connection gets reset, the messages will still be
+ * received.
+ *
+ * @throws Exception
+ */
+ public void testReceiveTwoThenCloseConnection() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from previous test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
+ Message message = consumer.receive(2000);
+ assertEquals(outbound[0], message);
+
+ message = consumer.receive(2000);
+ assertNotNull(message);
+ assertEquals(outbound[1], message);
+
+ // Close and reopen connection.
+ reconnect();
+
+ // Consume again.. the previous message should
+ // get redelivered.
+ beginTx();
+ message = consumer.receive(2000);
+ assertNotNull("Should have re-received the first message again!", message);
+ messages.add(message);
+ assertEquals(outbound[0], message);
+
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the second message again!", message);
+ messages.add(message);
+ assertEquals(outbound[1], message);
+ commitTx();
+
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+
+ assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+ }
+
+ /**
+ * Tests sending and receiving messages with two sessions(one for producing
+ * and another for consuming).
+ *
+ * @throws Exception
+ */
+ public void testSendReceiveInSeperateSessionTest() throws Exception {
+ session.close();
+ int batchCount = 10;
+
+ for (int i = 0; i < batchCount; i++) {
+ // Session that sends messages
+ {
+ Session session = resourceProvider.createSession(connection);
+ this.session = session;
+ MessageProducer producer = resourceProvider.createProducer(session, destination);
+ // consumer = resourceProvider.createConsumer(session,
+ // destination);
+ beginTx();
+ producer.send(session.createTextMessage("Test Message: " + i));
+ commitTx();
+ session.close();
+ }
+
+ // Session that consumes messages
+ {
+ Session session = resourceProvider.createSession(connection);
+ this.session = session;
+ MessageConsumer consumer = resourceProvider.createConsumer(session, destination);
+
+ beginTx();
+ TextMessage message = (TextMessage)consumer.receive(1000 * 5);
+ assertNotNull("Received only " + i + " messages in batch ", message);
+ assertEquals("Test Message: " + i, message.getText());
+
+ commitTx();
+ session.close();
+ }
+ }
+ }
+
+ /**
+ * Tests the queue browser. Browses the messages then the consumer tries to
+ * receive them. The messages should still be in the queue even when it was
+ * browsed.
+ *
+ * @throws Exception
+ */
+ public void testReceiveBrowseReceive() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message")};
+
+ // lets consume any outstanding messages from previous test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ producer.send(outbound[2]);
+ commitTx();
+
+ // Get the first.
+ beginTx();
+ assertEquals(outbound[0], consumer.receive(1000));
+ consumer.close();
+ commitTx();
+
+ beginTx();
+ QueueBrowser browser = session.createBrowser((Queue)destination);
+ Enumeration enumeration = browser.getEnumeration();
+
+ // browse the second
+ assertTrue("should have received the second message", enumeration.hasMoreElements());
+ assertEquals(outbound[1], (Message)enumeration.nextElement());
+
+ // browse the third.
+ assertTrue("Should have received the third message", enumeration.hasMoreElements());
+ assertEquals(outbound[2], (Message)enumeration.nextElement());
+
+ LOG.info("Check for more...");
+ // There should be no more.
+ boolean tooMany = false;
+ while (enumeration.hasMoreElements()) {
+ LOG.info("Got extra message: " + ((TextMessage)enumeration.nextElement()).getText());
+ tooMany = true;
+ }
+ assertFalse(tooMany);
+ LOG.info("close browser...");
+ browser.close();
+
+ LOG.info("reopen and consume...");
+ // Re-open the consumer.
+ consumer = resourceProvider.createConsumer(session, destination);
+ // Receive the second.
+ assertEquals(outbound[1], consumer.receive(1000));
+ // Receive the third.
+ assertEquals(outbound[2], consumer.receive(1000));
+ consumer.close();
+
+ commitTx();
+ }
+
+ public void testCloseConsumer() throws Exception {
+ Destination dest = session.createQueue(getSubject() + "?consumer.prefetchSize=0");
+ producer = session.createProducer(dest);
+ beginTx();
+ producer.send(session.createTextMessage("message 1"));
+ producer.send(session.createTextMessage("message 2"));
+ commitTx();
+
+ beginTx();
+ consumer = session.createConsumer(dest);
+ Message message1 = consumer.receive(1000);
+ String text1 = ((TextMessage)message1).getText();
+ assertNotNull(message1);
+ assertEquals("message 1", text1);
+
+ consumer.close();
+
+ consumer = session.createConsumer(dest);
+
+ Message message2 = consumer.receive(1000);
+ String text2 = ((TextMessage)message2).getText();
+ assertNotNull(message2);
+ assertEquals("message 2", text2);
+ commitTx();
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/6c023bf2/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
----------------------------------------------------------------------
diff --git a/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
new file mode 100755
index 0000000..dfcf302
--- /dev/null
+++ b/tests/activemq5-unit-tests/src/test/java/org/apache/activemq/JmsTransactionTestSupport.java
@@ -0,0 +1,721 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.activemq;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.ArrayList;
+import java.util.List;
+
+import org.apache.activemq.broker.BrokerFactory;
+import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.test.JmsResourceProvider;
+import org.apache.activemq.test.TestSupport;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ *
+ */
+public abstract class JmsTransactionTestSupport extends TestSupport implements MessageListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsTransactionTestSupport.class);
+ private static final int MESSAGE_COUNT = 5;
+ private static final String MESSAGE_TEXT = "message";
+
+ protected ConnectionFactory connectionFactory;
+ protected Connection connection;
+ protected Session session;
+ protected MessageConsumer consumer;
+ protected MessageProducer producer;
+ protected JmsResourceProvider resourceProvider;
+ protected Destination destination;
+ protected int batchCount = 10;
+ protected int batchSize = 20;
+ protected BrokerService broker;
+
+ // for message listener test
+ private final List<Message> unackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+ private final List<Message> ackMessages = new ArrayList<Message>(MESSAGE_COUNT);
+ private boolean resendPhase;
+
+ public JmsTransactionTestSupport() {
+ super();
+ }
+
+ public JmsTransactionTestSupport(String name) {
+ super(name);
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#setUp()
+ */
+ @Override
+ protected void setUp() throws Exception {
+ broker = createBroker();
+ broker.start();
+ broker.waitUntilStarted();
+
+ resourceProvider = getJmsResourceProvider();
+ topic = resourceProvider.isTopic();
+ // We will be using transacted sessions.
+ setSessionTransacted();
+ connectionFactory = newConnectionFactory();
+ reconnect();
+ }
+
+ protected void setSessionTransacted() {
+ resourceProvider.setTransacted(true);
+ }
+
+ protected ConnectionFactory newConnectionFactory() throws Exception {
+ return resourceProvider.createConnectionFactory();
+ }
+
+ protected void beginTx() throws Exception {
+ //no-op for local tx
+ }
+
+ protected void commitTx() throws Exception {
+ session.commit();
+ }
+
+ protected void rollbackTx() throws Exception {
+ session.rollback();
+ }
+
+ /**
+ */
+ protected BrokerService createBroker() throws Exception, URISyntaxException {
+ return BrokerFactory.createBroker(new URI("broker://()/localhost?persistent=false"));
+ }
+
+ /*
+ * (non-Javadoc)
+ *
+ * @see junit.framework.TestCase#tearDown()
+ */
+ @Override
+ protected void tearDown() throws Exception {
+ LOG.info("Closing down connection");
+
+ try {
+ session.close();
+ session = null;
+ connection.close();
+ connection = null;
+ } catch (Exception e) {
+ LOG.info("Caught exception while closing resources.");
+ }
+
+ try {
+ broker.stop();
+ broker.waitUntilStopped();
+ broker = null;
+ } catch (Exception e) {
+ LOG.info("Caught exception while shutting down the Broker", e);
+ }
+
+ LOG.info("Connection closed.");
+ }
+
+ protected abstract JmsResourceProvider getJmsResourceProvider();
+
+ /**
+ * Sends a batch of messages and validates that the messages are received.
+ *
+ * @throws Exception
+ */
+ public void testSendReceiveTransactedBatches() throws Exception {
+
+ TextMessage message = session.createTextMessage("Batch Message");
+ for (int j = 0; j < batchCount; j++) {
+ LOG.info("Producing bacth " + j + " of " + batchSize + " messages");
+
+ beginTx();
+ for (int i = 0; i < batchSize; i++) {
+ producer.send(message);
+ }
+ messageSent();
+ commitTx();
+ LOG.info("Consuming bacth " + j + " of " + batchSize + " messages");
+
+ beginTx();
+ for (int i = 0; i < batchSize; i++) {
+ message = (TextMessage)consumer.receive(1000 * 5);
+ assertNotNull("Received only " + i + " messages in batch " + j, message);
+ assertEquals("Batch Message", message.getText());
+ }
+
+ commitTx();
+ }
+ }
+
+ protected void messageSent() throws Exception {
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * not consumed.
+ *
+ * @throws Exception
+ */
+ public void testSendRollback() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ commitTx();
+
+ // sends a message that gets rollbacked
+ beginTx();
+ producer.send(session.createTextMessage("I'm going to get rolled back."));
+ rollbackTx();
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[1]);
+ commitTx();
+
+ // receives the first message
+ beginTx();
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // receives the second message
+ LOG.info("About to consume message 2");
+ message = consumer.receive(4000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+ }
+
+ /**
+ * spec section 3.6 acking a message with automation acks has no effect.
+ * @throws Exception
+ */
+ public void testAckMessageInTx() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ outbound[0].acknowledge();
+ commitTx();
+ outbound[0].acknowledge();
+
+ // receives the first message
+ beginTx();
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Message not delivered.", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the message sent before
+ * session close is not consumed.
+ *
+ * This test only works with local transactions, not xa.
+ * @throws Exception
+ */
+ public void testSendSessionClose() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ commitTx();
+
+ // sends a message that gets rollbacked
+ beginTx();
+ producer.send(session.createTextMessage("I'm going to get rolled back."));
+ consumer.close();
+
+ reconnectSession();
+
+ // sends a message
+ producer.send(outbound[1]);
+ commitTx();
+
+ // receives the first message
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ beginTx();
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // receives the second message
+ LOG.info("About to consume message 2");
+ message = consumer.receive(4000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the message sent before
+ * session close is not consumed.
+ *
+ * @throws Exception
+ */
+ public void testSendSessionAndConnectionClose() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[0]);
+ commitTx();
+
+ // sends a message that gets rollbacked
+ beginTx();
+ producer.send(session.createTextMessage("I'm going to get rolled back."));
+ consumer.close();
+ session.close();
+
+ reconnect();
+
+ // sends a message
+ beginTx();
+ producer.send(outbound[1]);
+ commitTx();
+
+ // receives the first message
+ ArrayList<Message> messages = new ArrayList<Message>();
+ LOG.info("About to consume message 1");
+ beginTx();
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // receives the second message
+ LOG.info("About to consume message 2");
+ message = consumer.receive(4000);
+ messages.add(message);
+ LOG.info("Received: " + message);
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work.", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * redelivered.
+ *
+ * @throws Exception
+ */
+ public void testReceiveRollback() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from prev test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ // sent both messages
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
+ Message message = consumer.receive(1000);
+ messages.add(message);
+ assertEquals(outbound[0], message);
+ commitTx();
+
+ // rollback so we can get that last message again.
+ beginTx();
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(outbound[1], message);
+ rollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ beginTx();
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the message again!", message);
+ messages.add(message);
+ commitTx();
+
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * redelivered.
+ *
+ * @throws Exception
+ */
+ public void testReceiveTwoThenRollback() throws Exception {
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from prev test runs
+ beginTx();
+ while (consumer.receive(1000) != null) {
+ }
+ commitTx();
+
+ //
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ ArrayList<Message> messages = new ArrayList<Message>();
+ beginTx();
+ Message message = consumer.receive(1000);
+ assertEquals(outbound[0], message);
+
+ message = consumer.receive(1000);
+ assertNotNull(message);
+ assertEquals(outbound[1], message);
+ rollbackTx();
+
+ // Consume again.. the prev message should
+ // get redelivered.
+ beginTx();
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the first message again!", message);
+ messages.add(message);
+ assertEquals(outbound[0], message);
+ message = consumer.receive(5000);
+ assertNotNull("Should have re-received the second message again!", message);
+ messages.add(message);
+ assertEquals(outbound[1], message);
+
+ assertNull(consumer.receiveNoWait());
+ commitTx();
+
+ Message inbound[] = new Message[messages.size()];
+ messages.toArray(inbound);
+ assertTextMessagesEqual("Rollback did not work", outbound, inbound);
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * not consumed.
+ *
+ * @throws Exception
+ */
+ public void testSendReceiveWithPrefetchOne() throws Exception {
+ setPrefetchToOne();
+ Message[] outbound = new Message[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message"), session.createTextMessage("Third Message"),
+ session.createTextMessage("Fourth Message")};
+
+ beginTx();
+ for (int i = 0; i < outbound.length; i++) {
+ // sends a message
+ producer.send(outbound[i]);
+ }
+ commitTx();
+
+ // receives the first message
+ beginTx();
+ for (int i = 0; i < outbound.length; i++) {
+ LOG.info("About to consume message 1");
+ Message message = consumer.receive(1000);
+ assertNotNull(message);
+ LOG.info("Received: " + message);
+ }
+
+ // validates that the rollbacked was not consumed
+ commitTx();
+ }
+
+ /**
+ * Perform the test that validates if the rollbacked message was redelivered
+ * multiple times.
+ *
+ * @throws Exception
+ */
+ public void testReceiveTwoThenRollbackManyTimes() throws Exception {
+ for (int i = 0; i < 5; i++) {
+ testReceiveTwoThenRollback();
+ }
+ }
+
+ /**
+ * Sends a batch of messages and validates that the rollbacked message was
+ * not consumed. This test differs by setting the message prefetch to one.
+ *
+ * @throws Exception
+ */
+ public void testSendRollbackWithPrefetchOfOne() throws Exception {
+ setPrefetchToOne();
+ testSendRollback();
+ }
+
+ /**
+ * Sends a batch of messages and and validates that the rollbacked message
+ * was redelivered. This test differs by setting the message prefetch to
+ * one.
+ *
+ * @throws Exception
+ */
+ public void testReceiveRollbackWithPrefetchOfOne() throws Exception {
+ setPrefetchToOne();
+ testReceiveRollback();
+ }
+
+ /**
+ * Tests if the messages can still be received if the consumer is closed
+ * (session is not closed).
+ *
+ * @throws Exception see http://jira.codehaus.org/browse/AMQ-143
+ */
+ public void testCloseConsumerBeforeCommit() throws Exception {
+ TextMessage[] outbound = new TextMessage[] {session.createTextMessage("First Message"), session.createTextMessage("Second Message")};
+
+ // lets consume any outstanding messages from prev test runs
+ beginTx();
+ while (consumer.receiveNoWait() != null) {
+ }
+
+ commitTx();
+
+ // sends the messages
+ beginTx();
+ producer.send(outbound[0]);
+ producer.send(outbound[1]);
+ commitTx();
+ LOG.info("Sent 0: " + outbound[0]);
+ LOG.info("Sent 1: " + outbound[1]);
+
+ beginTx();
+ TextMessage message = (TextMessage)consumer.receive(1000);
+ assertEquals(outbound[0].getText(), message.getText());
+ // Close the consumer before the commit. This should not cause the
+ // received message
+ // to rollback.
+ consumer.close();
+ commitTx();
+
+ // Create a new consumer
+ consumer = resourceProvider.createConsumer(session, destination);
+ LOG.info("Created consumer: " + consumer);
+
+ beginTx();
+ message = (TextMessage)consumer.receive(1000);
+ assertEquals(outbound[1].getText(), message.getText());
+ commitTx();
+ }
+
+ public void testChangeMutableObjectInObjectMessageThenRollback() throws Exception {
+ ArrayList<String> list = new ArrayList<String>();
+ list.add("First");
+ Message outbound = session.createObjectMessage(list);
+ outbound.setStringProperty("foo", "abc");
+
+ beginTx();
+ producer.send(outbound);
+ commitTx();
+
+ LOG.info("About to consume message 1");
+ beginTx();
+ Message message = consumer.receive(5000);
+
+ List<String> body = assertReceivedObjectMessageWithListBody(message);
+
+ // now lets try mutate it
+ try {
+ message.setStringProperty("foo", "def");
+ fail("Cannot change properties of the object!");
+ } catch (JMSException e) {
+ LOG.info("Caught expected exception: " + e, e);
+ }
+ body.clear();
+ body.add("This should never be seen!");
+ rollbackTx();
+
+ beginTx();
+ message = consumer.receive(5000);
+ List<String> secondBody = assertReceivedObjectMessageWithListBody(message);
+ assertNotSame("Second call should return a different body", secondBody, body);
+ commitTx();
+ }
+
+ @SuppressWarnings("unchecked")
+ protected List<String> assertReceivedObjectMessageWithListBody(Message message) throws JMSException {
+ assertNotNull("Should have received a message!", message);
+ assertEquals("foo header", "abc", message.getStringProperty("foo"));
+
+ assertTrue("Should be an object message but was: " + message, message instanceof ObjectMessage);
+ ObjectMessage objectMessage = (ObjectMessage)message;
+ List<String> body = (List<String>)objectMessage.getObject();
+ LOG.info("Received body: " + body);
+
+ assertEquals("Size of list should be 1", 1, body.size());
+ assertEquals("element 0 of list", "First", body.get(0));
+ return body;
+ }
+
+ /**
+ * Recreates the connection.
+ *
+ * @throws javax.jms.JMSException
+ */
+ protected void reconnect() throws Exception {
+
+ if (connection != null) {
+ // Close the prev connection.
+ connection.close();
+ }
+ session = null;
+ connection = resourceProvider.createConnection(connectionFactory);
+ reconnectSession();
+ connection.start();
+ }
+
+ /**
+ * Recreates the connection.
+ *
+ * @throws javax.jms.JMSException
+ */
+ protected void reconnectSession() throws JMSException {
+ if (session != null) {
+ session.close();
+ }
+
+ session = resourceProvider.createSession(connection);
+ destination = resourceProvider.createDestination(session, getSubject());
+ producer = resourceProvider.createProducer(session, destination);
+ consumer = resourceProvider.createConsumer(session, destination);
+ }
+
+ /**
+ * Sets the prefeftch policy to one.
+ */
+ protected void setPrefetchToOne() {
+ ActiveMQPrefetchPolicy prefetchPolicy = getPrefetchPolicy();
+ prefetchPolicy.setQueuePrefetch(1);
+ prefetchPolicy.setTopicPrefetch(1);
+ prefetchPolicy.setDurableTopicPrefetch(1);
+ prefetchPolicy.setOptimizeDurableTopicPrefetch(1);
+ }
+
+ protected ActiveMQPrefetchPolicy getPrefetchPolicy() {
+ return ((ActiveMQConnection)connection).getPrefetchPolicy();
+ }
+
+ //This test won't work with xa tx so no beginTx() has been added.
+ public void testMessageListener() throws Exception {
+ // send messages
+ for (int i = 0; i < MESSAGE_COUNT; i++) {
+ producer.send(session.createTextMessage(MESSAGE_TEXT + i));
+ }
+ commitTx();
+ consumer.setMessageListener(this);
+ // wait receive
+ waitReceiveUnack();
+ assertEquals(unackMessages.size(), MESSAGE_COUNT);
+ // resend phase
+ waitReceiveAck();
+ assertEquals(ackMessages.size(), MESSAGE_COUNT);
+ // should no longer re-receive
+ consumer.setMessageListener(null);
+ assertNull(consumer.receive(500));
+ reconnect();
+ }
+
+ @Override
+ public void onMessage(Message message) {
+ if (!resendPhase) {
+ unackMessages.add(message);
+ if (unackMessages.size() == MESSAGE_COUNT) {
+ try {
+ rollbackTx();
+ resendPhase = true;
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ } else {
+ ackMessages.add(message);
+ if (ackMessages.size() == MESSAGE_COUNT) {
+ try {
+ commitTx();
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+ }
+ }
+
+ private void waitReceiveUnack() throws Exception {
+ for (int i = 0; i < 100 && !resendPhase; i++) {
+ Thread.sleep(100);
+ }
+ assertTrue(resendPhase);
+ }
+
+ private void waitReceiveAck() throws Exception {
+ for (int i = 0; i < 100 && ackMessages.size() < MESSAGE_COUNT; i++) {
+ Thread.sleep(100);
+ }
+ assertFalse(ackMessages.size() < MESSAGE_COUNT);
+ }
+}