You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by cl...@apache.org on 2015/08/14 02:42:14 UTC
[2/4] activemq-artemis git commit: ARTEMIS-204 Improvements on
OpenWire
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
index e7582f2..ada2a70 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireConnection.java
@@ -40,9 +40,23 @@ import org.apache.activemq.artemis.api.core.ActiveMQException;
import org.apache.activemq.artemis.api.core.ActiveMQNonExistentQueueException;
import org.apache.activemq.artemis.api.core.ActiveMQSecurityException;
import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQCompositeConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSingleConsumerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
+import org.apache.activemq.artemis.core.remoting.CloseListener;
+import org.apache.activemq.artemis.core.remoting.FailureListener;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.BrokerInfo;
@@ -77,20 +91,7 @@ import org.apache.activemq.command.ShutdownInfo;
import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQBrokerStoppedException;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQMessageAuthorizationPolicy;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQTransaction;
-import org.apache.activemq.artemis.core.remoting.CloseListener;
-import org.apache.activemq.artemis.core.remoting.FailureListener;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
import org.apache.activemq.openwire.OpenWireFormat;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
import org.apache.activemq.state.CommandVisitor;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ConsumerState;
@@ -100,7 +101,6 @@ import org.apache.activemq.thread.TaskRunner;
import org.apache.activemq.thread.TaskRunnerFactory;
import org.apache.activemq.transport.TransmitCallback;
import org.apache.activemq.util.ByteSequence;
-import org.apache.activemq.artemis.utils.ConcurrentHashSet;
import org.apache.activemq.wireformat.WireFormat;
/**
@@ -176,8 +176,6 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
private final Set<String> tempQueues = new ConcurrentHashSet<String>();
- private DataInputWrapper dataInput = new DataInputWrapper();
-
private Map<TransactionId, TransactionInfo> txMap = new ConcurrentHashMap<TransactionId, TransactionInfo>();
private volatile AMQSession advisorySession;
@@ -196,96 +194,78 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
@Override
public void bufferReceived(Object connectionID, ActiveMQBuffer buffer) {
try {
- dataInput.receiveData(buffer);
- }
- catch (Throwable t) {
- ActiveMQServerLogger.LOGGER.error("decoding error", t);
- return;
- }
-
- // this.setDataReceived();
- while (dataInput.readable()) {
- try {
- Object object = null;
- try {
- object = wireFormat.unmarshal(dataInput);
- dataInput.mark();
- }
- catch (NotEnoughBytesException e) {
- //meaning the dataInput hasn't enough bytes for a command.
- //in that case we just return and waiting for the next
- //call of bufferReceived()
- return;
+ Object object = wireFormat.unmarshal(buffer);
+
+ Command command = (Command) object;
+ boolean responseRequired = command.isResponseRequired();
+ int commandId = command.getCommandId();
+ // the connection handles pings, negotiations directly.
+ // and delegate all other commands to manager.
+ if (command.getClass() == KeepAliveInfo.class) {
+ KeepAliveInfo info = (KeepAliveInfo) command;
+ if (info.isResponseRequired()) {
+ info.setResponseRequired(false);
+ protocolManager.sendReply(this, info);
}
+ }
+ else if (command.getClass() == WireFormatInfo.class) {
+ // amq here starts a read/write monitor thread (detect ttl?)
+ negotiate((WireFormatInfo) command);
+ }
+ else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class ||
+ command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) ||
+ command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class ||
+ command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
+ Response response = null;
- Command command = (Command) object;
- boolean responseRequired = command.isResponseRequired();
- int commandId = command.getCommandId();
- // the connection handles pings, negotiations directly.
- // and delegate all other commands to manager.
- if (command.getClass() == KeepAliveInfo.class) {
- KeepAliveInfo info = (KeepAliveInfo) command;
- if (info.isResponseRequired()) {
- info.setResponseRequired(false);
- protocolManager.sendReply(this, info);
- }
- }
- else if (command.getClass() == WireFormatInfo.class) {
- // amq here starts a read/write monitor thread (detect ttl?)
- negotiate((WireFormatInfo) command);
+ if (pendingStop) {
+ response = new ExceptionResponse(this.stopError);
}
- else if (command.getClass() == ConnectionInfo.class || command.getClass() == ConsumerInfo.class || command.getClass() == RemoveInfo.class || command.getClass() == SessionInfo.class || command.getClass() == ProducerInfo.class || ActiveMQMessage.class.isAssignableFrom(command.getClass()) || command.getClass() == MessageAck.class || command.getClass() == TransactionInfo.class || command.getClass() == DestinationInfo.class || command.getClass() == ShutdownInfo.class || command.getClass() == RemoveSubscriptionInfo.class) {
- Response response = null;
-
- if (pendingStop) {
- response = new ExceptionResponse(this.stopError);
- }
- else {
- response = ((Command) command).visit(this);
-
- if (response instanceof ExceptionResponse) {
- if (!responseRequired) {
- Throwable cause = ((ExceptionResponse) response).getException();
- serviceException(cause);
- response = null;
- }
- }
- }
-
- if (responseRequired) {
- if (response == null) {
- response = new Response();
- }
- }
+ else {
+ response = ((Command) command).visit(this);
- // The context may have been flagged so that the response is not
- // sent.
- if (context != null) {
- if (context.isDontSendReponse()) {
- context.setDontSendReponse(false);
+ if (response instanceof ExceptionResponse) {
+ if (!responseRequired) {
+ Throwable cause = ((ExceptionResponse) response).getException();
+ serviceException(cause);
response = null;
}
}
+ }
- if (response != null && !protocolManager.isStopping()) {
- response.setCorrelationId(commandId);
- dispatchSync(response);
+ if (responseRequired) {
+ if (response == null) {
+ response = new Response();
}
+ }
+ // The context may have been flagged so that the response is not
+ // sent.
+ if (context != null) {
+ if (context.isDontSendReponse()) {
+ context.setDontSendReponse(false);
+ response = null;
+ }
}
- else {
- // note!!! wait for negotiation (e.g. use a countdown latch)
- // before handling any other commands
- this.protocolManager.handleCommand(this, command);
+
+ if (response != null && !protocolManager.isStopping()) {
+ response.setCorrelationId(commandId);
+ dispatchSync(response);
}
+
}
- catch (IOException e) {
- ActiveMQServerLogger.LOGGER.error("error decoding", e);
- }
- catch (Throwable t) {
- ActiveMQServerLogger.LOGGER.error("error decoding", t);
+ else {
+ // note!!! wait for negotiation (e.g. use a countdown latch)
+ // before handling any other commands
+ this.protocolManager.handleCommand(this, command);
}
}
+ catch (IOException e) {
+ ActiveMQServerLogger.LOGGER.error("error decoding", e);
+ }
+ catch (Throwable t) {
+ ActiveMQServerLogger.LOGGER.error("error decoding", t);
+ }
}
private void negotiate(WireFormatInfo command) throws IOException {
@@ -624,6 +604,7 @@ public class OpenWireConnection implements RemotingConnection, CommandVisitor {
public void serviceExceptionAsync(final IOException e) {
if (asyncException.compareAndSet(false, true)) {
+ // Why this is not through an executor?
new Thread("Async Exception Handler") {
@Override
public void run() {
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
index 260ee02..90518ec 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireMessageConverter.java
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.artemis.core.protocol.openwire;
+import javax.jms.JMSException;
import java.io.ByteArrayOutputStream;
import java.io.DataInputStream;
import java.io.DataOutputStream;
@@ -23,13 +24,19 @@ import java.io.IOException;
import java.io.InputStream;
import java.util.Iterator;
import java.util.Map;
-import java.util.Set;
import java.util.Map.Entry;
-
-import javax.jms.JMSException;
+import java.util.Set;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.ActiveMQPropertyConversionException;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.server.ServerMessage;
+import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.utils.DataConstants;
+import org.apache.activemq.artemis.utils.TypedProperties;
+import org.apache.activemq.artemis.utils.UUIDGenerator;
import org.apache.activemq.command.ActiveMQBytesMessage;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMapMessage;
@@ -51,14 +58,6 @@ import org.apache.activemq.util.ByteSequence;
import org.apache.activemq.util.MarshallingSupport;
import org.apache.activemq.wireformat.WireFormat;
import org.fusesource.hawtbuf.UTF8Buffer;
-import org.apache.activemq.artemis.api.core.SimpleString;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
-import org.apache.activemq.artemis.core.server.ServerMessage;
-import org.apache.activemq.artemis.core.server.impl.ServerMessageImpl;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.utils.DataConstants;
-import org.apache.activemq.artemis.utils.TypedProperties;
-import org.apache.activemq.artemis.utils.UUIDGenerator;
public class OpenWireMessageConverter implements MessageConverter {
@@ -429,7 +428,7 @@ public class OpenWireMessageConverter implements MessageConverter {
}
amqMsg.setBrokerInTime(brokerInTime);
- ActiveMQBuffer buffer = coreMessage.getBodyBuffer();
+ ActiveMQBuffer buffer = coreMessage.getBodyBufferCopy();
if (buffer != null) {
buffer.resetReaderIndex();
byte[] bytes = null;
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
----------------------------------------------------------------------
diff --git a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
index 10d67a1..8c20c46 100644
--- a/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
+++ b/artemis-protocols/artemis-openwire-protocol/src/main/java/org/apache/activemq/artemis/core/protocol/openwire/OpenWireProtocolManager.java
@@ -31,18 +31,38 @@ import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ScheduledExecutorService;
import io.netty.channel.ChannelPipeline;
+import io.netty.handler.codec.LengthFieldBasedFrameDecoder;
import org.apache.activemq.advisory.AdvisorySupport;
import org.apache.activemq.artemis.api.core.ActiveMQBuffer;
import org.apache.activemq.artemis.api.core.BaseInterceptor;
import org.apache.activemq.artemis.api.core.Interceptor;
import org.apache.activemq.artemis.api.core.SimpleString;
import org.apache.activemq.artemis.api.core.management.CoreNotificationType;
-import org.apache.activemq.artemis.core.io.IOCallback;
import org.apache.activemq.artemis.api.core.management.ManagementHelper;
+import org.apache.activemq.artemis.core.io.IOCallback;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConsumer;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
+import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
+import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
+import org.apache.activemq.artemis.core.security.CheckType;
+import org.apache.activemq.artemis.core.server.ActiveMQServer;
+import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
+import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.artemis.core.server.management.ManagementService;
import org.apache.activemq.artemis.core.server.management.Notification;
import org.apache.activemq.artemis.core.server.management.NotificationListener;
+import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
+import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
+import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
+import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
+import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
+import org.apache.activemq.artemis.spi.core.remoting.Connection;
+import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
+import org.apache.activemq.artemis.utils.DataConstants;
import org.apache.activemq.command.ActiveMQDestination;
import org.apache.activemq.command.ActiveMQMessage;
import org.apache.activemq.command.ActiveMQTopic;
@@ -66,26 +86,8 @@ import org.apache.activemq.command.TransactionId;
import org.apache.activemq.command.TransactionInfo;
import org.apache.activemq.command.WireFormatInfo;
import org.apache.activemq.command.XATransactionId;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQConnectionContext;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQPersistenceAdapter;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQProducerBrokerExchange;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQServerSession;
-import org.apache.activemq.artemis.core.protocol.openwire.amq.AMQSession;
-import org.apache.activemq.artemis.core.remoting.impl.netty.NettyServerConnection;
-import org.apache.activemq.artemis.core.security.CheckType;
-import org.apache.activemq.artemis.core.server.ActiveMQServer;
-import org.apache.activemq.artemis.core.server.ActiveMQServerLogger;
-import org.apache.activemq.artemis.core.server.impl.ActiveMQServerImpl;
import org.apache.activemq.openwire.OpenWireFormat;
import org.apache.activemq.openwire.OpenWireFormatFactory;
-import org.apache.activemq.artemis.spi.core.protocol.ConnectionEntry;
-import org.apache.activemq.artemis.spi.core.protocol.MessageConverter;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManager;
-import org.apache.activemq.artemis.spi.core.protocol.ProtocolManagerFactory;
-import org.apache.activemq.artemis.spi.core.protocol.RemotingConnection;
-import org.apache.activemq.artemis.spi.core.remoting.Acceptor;
-import org.apache.activemq.artemis.spi.core.remoting.Connection;
-import org.apache.activemq.artemis.spi.core.security.ActiveMQSecurityManager;
import org.apache.activemq.state.ConnectionState;
import org.apache.activemq.state.ProducerState;
import org.apache.activemq.state.SessionState;
@@ -183,8 +185,8 @@ public class OpenWireProtocolManager implements ProtocolManager<Interceptor>, No
@Override
public void addChannelHandlers(ChannelPipeline pipeline) {
- // TODO Auto-generated method stub
-
+ // each read will have a full packet with this
+ pipeline.addLast("packet-decipher", new LengthFieldBasedFrameDecoder(Integer.MAX_VALUE, 0, DataConstants.SIZE_INT));
}
@Override
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
----------------------------------------------------------------------
diff --git a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
index 741c32b..12ddb94 100644
--- a/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
+++ b/artemis-server/src/main/java/org/apache/activemq/artemis/core/server/impl/ServerConsumerImpl.java
@@ -275,12 +275,6 @@ public class ServerConsumerImpl implements ServerConsumer, ReadyListener {
return HandleStatus.BUSY;
}
- // TODO - https://jira.jboss.org/browse/HORNETQ-533
- // if (!writeReady.get())
- // {
- // return HandleStatus.BUSY;
- // }
-
synchronized (lock) {
// If the consumer is stopped then we don't accept the message, it
// should go back into the
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
index 83c94ee..faa947e 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/SimpleOpenWireTest.java
@@ -27,18 +27,18 @@ import javax.jms.Session;
import javax.jms.TemporaryQueue;
import javax.jms.TemporaryTopic;
import javax.jms.TextMessage;
+import java.util.concurrent.TimeUnit;
import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.apache.activemq.command.ActiveMQQueue;
import org.apache.activemq.command.ActiveMQTopic;
-import org.apache.activemq.artemis.core.settings.impl.AddressSettings;
import org.junit.Before;
import org.junit.Rule;
import org.junit.Test;
import org.junit.rules.ExpectedException;
-import java.util.concurrent.TimeUnit;
-
public class SimpleOpenWireTest extends BasicOpenWireTest {
@Rule
@@ -300,6 +300,42 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
}
+ @Test
+ public void testFailoverTransportReconnect() throws Exception {
+ Connection exConn = null;
+
+ try {
+ String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
+ ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+
+ Queue queue = new ActiveMQQueue(durableQueueName);
+
+ exConn = exFact.createConnection();
+ exConn.start();
+
+ Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer messageProducer = session.createProducer(queue);
+ messageProducer.send(session.createTextMessage("Test"));
+
+ MessageConsumer consumer = session.createConsumer(queue);
+ assertNotNull(consumer.receive(5000));
+
+ server.stop();
+ Thread.sleep(3000);
+
+ server.start();
+ server.waitForActivation(10, TimeUnit.SECONDS);
+
+ messageProducer.send(session.createTextMessage("Test2"));
+ assertNotNull(consumer.receive(5000));
+ }
+ finally {
+ if (exConn != null) {
+ exConn.close();
+ }
+ }
+ }
+
/**
* This is the example shipped with the distribution
*
@@ -309,41 +345,30 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
public void testOpenWireExample() throws Exception {
Connection exConn = null;
+ SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+ this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
try {
- String urlString = "tcp://" + OWHOST + ":" + OWPORT + "?wireFormat.cacheEnabled=true";
- ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+ ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
- // Step 2. Perfom a lookup on the queue
Queue queue = new ActiveMQQueue(durableQueueName);
- // Step 4.Create a JMS Connection
exConn = exFact.createConnection();
- // Step 10. Start the Connection
exConn.start();
- // Step 5. Create a JMS Session
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- // Step 6. Create a JMS Message Producer
MessageProducer producer = session.createProducer(queue);
- // Step 7. Create a Text Message
TextMessage message = session.createTextMessage("This is a text message");
- //System.out.println("Sent message: " + message.getText());
-
- // Step 8. Send the Message
producer.send(message);
- // Step 9. Create a JMS Message Consumer
MessageConsumer messageConsumer = session.createConsumer(queue);
- // Step 11. Receive the message
TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
- System.out.println("Received message: " + messageReceived);
-
assertEquals("This is a text message", messageReceived.getText());
}
finally {
@@ -354,39 +379,88 @@ public class SimpleOpenWireTest extends BasicOpenWireTest {
}
+
+ /**
+ * This is the example shipped with the distribution
+ *
+ * @throws Exception
+ */
@Test
- public void testFailoverTransportReconnect() throws Exception {
+ public void testMultipleConsumers() throws Exception {
Connection exConn = null;
+ SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+ this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
try {
- String urlString = "failover:(tcp://" + OWHOST + ":" + OWPORT + ")";
- ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory(urlString);
+ ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
Queue queue = new ActiveMQQueue(durableQueueName);
exConn = exFact.createConnection();
+
exConn.start();
Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer messageProducer = session.createProducer(queue);
- messageProducer.send(session.createTextMessage("Test"));
- MessageConsumer consumer = session.createConsumer(queue);
- assertNotNull(consumer.receive(5000));
+ MessageProducer producer = session.createProducer(queue);
- server.stop();
- Thread.sleep(3000);
+ TextMessage message = session.createTextMessage("This is a text message");
- server.start();
- server.waitForActivation(10, TimeUnit.SECONDS);
+ producer.send(message);
- messageProducer.send(session.createTextMessage("Test2"));
- assertNotNull(consumer.receive(5000));
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+ assertEquals("This is a text message", messageReceived.getText());
}
finally {
if (exConn != null) {
exConn.close();
}
}
+
}
+
+ @Test
+ public void testMixedOpenWireExample() throws Exception {
+ Connection openConn = null;
+
+ SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+ this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+ ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
+
+ Queue queue = new ActiveMQQueue("exampleQueue");
+
+ openConn = openCF.createConnection();
+
+ openConn.start();
+
+ Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = openSession.createProducer(queue);
+
+ TextMessage message = openSession.createTextMessage("This is a text message");
+
+ producer.send(message);
+
+ org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+
+ Connection artemisConn = artemisCF.createConnection();
+ Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ artemisConn.start();
+ MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
+
+ TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+ assertEquals("This is a text message", messageReceived.getText());
+
+ openConn.close();
+ artemisConn.close();
+
+ }
+
+
}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
new file mode 100644
index 0000000..8d315d3
--- /dev/null
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/openwire/VerySimpleOenwireTest.java
@@ -0,0 +1,118 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+
+package org.apache.activemq.artemis.tests.integration.openwire;
+
+import javax.jms.Connection;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.Session;
+import javax.jms.TextMessage;
+
+import org.apache.activemq.ActiveMQConnectionFactory;
+import org.apache.activemq.artemis.api.core.SimpleString;
+import org.apache.activemq.command.ActiveMQQueue;
+import org.junit.Test;
+
+/** This is useful to debug connection ordering. There's only one connection being made from these tests */
+public class VerySimpleOenwireTest extends OpenWireTestBase {
+
+ /**
+ * This is the example shipped with the distribution
+ *
+ * @throws Exception
+ */
+ @Test
+ public void testOpenWireExample() throws Exception {
+ Connection exConn = null;
+
+ SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+ this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+ try {
+ ActiveMQConnectionFactory exFact = new ActiveMQConnectionFactory();
+
+ Queue queue = new ActiveMQQueue("exampleQueue");
+
+ exConn = exFact.createConnection();
+
+ exConn.start();
+
+ Session session = exConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = session.createProducer(queue);
+
+ TextMessage message = session.createTextMessage("This is a text message");
+
+ producer.send(message);
+
+ MessageConsumer messageConsumer = session.createConsumer(queue);
+
+ TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+ assertEquals("This is a text message", messageReceived.getText());
+ }
+ finally {
+ if (exConn != null) {
+ exConn.close();
+ }
+ }
+
+ }
+
+ @Test
+ public void testMixedOpenWireExample() throws Exception {
+ Connection openConn = null;
+
+ SimpleString durableQueue = new SimpleString("jms.queue.exampleQueue");
+ this.server.createQueue(durableQueue, durableQueue, null, true, false);
+
+ ActiveMQConnectionFactory openCF = new ActiveMQConnectionFactory();
+
+ Queue queue = new ActiveMQQueue("exampleQueue");
+
+ openConn = openCF.createConnection();
+
+ openConn.start();
+
+ Session openSession = openConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ MessageProducer producer = openSession.createProducer(queue);
+
+ TextMessage message = openSession.createTextMessage("This is a text message");
+
+ producer.send(message);
+
+ org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory artemisCF = new org.apache.activemq.artemis.jms.client.ActiveMQConnectionFactory();
+
+ Connection artemisConn = artemisCF.createConnection();
+ Session artemisSession = artemisConn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ artemisConn.start();
+ MessageConsumer messageConsumer = artemisSession.createConsumer(artemisSession.createQueue("exampleQueue"));
+
+ TextMessage messageReceived = (TextMessage) messageConsumer.receive(5000);
+
+ assertEquals("This is a text message", messageReceived.getText());
+
+ openConn.close();
+ artemisConn.close();
+
+ }
+
+
+}
http://git-wip-us.apache.org/repos/asf/activemq-artemis/blob/1dae9974/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
----------------------------------------------------------------------
diff --git a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
index a770183..321fda7 100644
--- a/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
+++ b/tests/integration-tests/src/test/java/org/apache/activemq/artemis/tests/integration/transports/netty/ActiveMQFrameDecoder2Test.java
@@ -16,6 +16,10 @@
*/
package org.apache.activemq.artemis.tests.integration.transports.netty;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Random;
+
import io.netty.buffer.ByteBuf;
import io.netty.buffer.Unpooled;
import io.netty.channel.embedded.EmbeddedChannel;
@@ -24,10 +28,6 @@ import org.apache.activemq.artemis.tests.util.ActiveMQTestBase;
import org.junit.Assert;
import org.junit.Test;
-import java.util.ArrayList;
-import java.util.List;
-import java.util.Random;
-
public class ActiveMQFrameDecoder2Test extends ActiveMQTestBase {
private static final int MSG_CNT = 10000;