You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ta...@apache.org on 2013/08/01 00:05:28 UTC
svn commit: r1509046 - in /activemq/trunk/activemq-amqp/src:
main/java/org/apache/activemq/transport/amqp/
test/java/org/apache/activemq/transport/amqp/
Author: tabish
Date: Wed Jul 31 22:05:27 2013
New Revision: 1509046
URL: http://svn.apache.org/r1509046
Log:
fix and tests for: https://issues.apache.org/jira/browse/AMQ-4651
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1509046&r1=1509045&r2=1509046&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Jul 31 22:05:27 2013
@@ -102,6 +102,7 @@ import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
class AmqpProtocolConverter {
+
static final Logger TRACE_FRAMES = AmqpTransportFilter.TRACE_FRAMES;
public static final EnumSet<EndpointState> UNINITIALIZED_SET = EnumSet.of(EndpointState.UNINITIALIZED);
public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
@@ -151,7 +152,6 @@ class AmqpProtocolConverter {
}
}
-
void pumpProtonToSocket() {
try {
int size = 1024 * 64;
@@ -179,6 +179,8 @@ class AmqpProtocolConverter {
long nextProducerId = 0;
long nextConsumerId = 0;
+ final LinkedList<ConsumerContext> consumers = new LinkedList<ConsumerContext>();
+
public AmqpSessionContext(ConnectionId connectionId, long id) {
sessionId = new SessionId(connectionId, id);
}
@@ -368,7 +370,9 @@ class AmqpProtocolConverter {
} else if (command.isBrokerInfo()) {
// ignore
} else {
- LOG.debug("Do not know how to process ActiveMQ Command " + command);
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("Do not know how to process ActiveMQ Command " + command);
+ }
}
}
@@ -379,13 +383,16 @@ class AmqpProtocolConverter {
private long nextTempDestinationId = 0;
static abstract class AmqpDeliveryListener {
+
abstract public void onDelivery(Delivery delivery) throws Exception;
- public void onClose() throws Exception {
- }
+ public void onClose() throws Exception {}
- public void drainCheck() {
- }
+ public void drainCheck() {}
+
+ abstract void doCommit() throws Exception;
+
+ abstract void doRollback() throws Exception;
}
private void onConnectionOpen() throws AmqpProtocolException {
@@ -505,6 +512,12 @@ class AmqpProtocolConverter {
onMessage(receiver, delivery, buffer);
}
+ @Override
+ void doCommit() throws Exception {}
+
+ @Override
+ void doRollback() throws Exception {}
+
abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
}
@@ -573,7 +586,7 @@ class AmqpProtocolConverter {
}
}
- long nextTransactionId = 0;
+ long nextTransactionId = 1;
class Transaction {
}
@@ -592,6 +605,7 @@ class AmqpProtocolConverter {
}
AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
+
@Override
protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
@@ -605,7 +619,7 @@ class AmqpProtocolConverter {
len -= decoded;
}
- Object action = ((AmqpValue) msg.getBody()).getValue();
+ final Object action = ((AmqpValue) msg.getBody()).getValue();
LOG.debug("COORDINATOR received: " + action + ", [" + buffer + "]");
if (action instanceof Declare) {
Declare declare = (Declare) action;
@@ -628,7 +642,7 @@ class AmqpProtocolConverter {
Discharge discharge = (Discharge) action;
long txid = toLong(discharge.getTxnId());
- byte operation;
+ final byte operation;
if (discharge.getFail()) {
if (LOG.isTraceEnabled()) {
LOG.trace("rollback transaction " + txid);
@@ -640,6 +654,16 @@ class AmqpProtocolConverter {
}
operation = TransactionInfo.COMMIT_ONE_PHASE;
}
+
+ AmqpSessionContext context = (AmqpSessionContext) receiver.getSession().getContext();
+ for (ConsumerContext consumer : context.consumers) {
+ if (operation == TransactionInfo.ROLLBACK) {
+ consumer.doRollback();
+ } else {
+ consumer.doCommit();
+ }
+ }
+
TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
sendToActiveMQ(txinfo, new ResponseHandler() {
@Override
@@ -650,10 +674,20 @@ class AmqpProtocolConverter {
rejected.setError(createErrorCondition("failed", er.getException().getMessage()));
delivery.disposition(rejected);
}
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("TX: {} settling {}", operation, action);
+ }
delivery.settle();
pumpProtonToSocket();
}
});
+
+ for (ConsumerContext consumer : context.consumers) {
+ if (operation == TransactionInfo.ROLLBACK) {
+ consumer.pumpOutbound();
+ }
+ }
+
} else {
throw new Exception("Expected coordinator message type: " + action.getClass());
}
@@ -744,6 +778,7 @@ class AmqpProtocolConverter {
public ConsumerInfo info;
private boolean endOfBrowse = false;
+ protected LinkedList<MessageDispatch> dispatchedInTx = new LinkedList<MessageDispatch>();
public ConsumerContext(ConsumerId consumerId, Sender sender) {
this.consumerId = consumerId;
@@ -789,7 +824,10 @@ class AmqpProtocolConverter {
// called when the connection receives a JMS message from ActiveMQ
public void onMessageDispatch(MessageDispatch md) throws Exception {
if (!closed) {
- outbound.addLast(md);
+ // Lock to prevent stepping on TX redelivery
+ synchronized (outbound) {
+ outbound.addLast(md);
+ }
pumpOutbound();
pumpProtonToSocket();
}
@@ -853,7 +891,7 @@ class AmqpProtocolConverter {
}
}
- private void settle(final Delivery delivery, int ackType) throws Exception {
+ private void settle(final Delivery delivery, final int ackType) throws Exception {
byte[] tag = delivery.getTag();
if (tag != null && tag.length > 0) {
checkinTag(tag);
@@ -877,11 +915,16 @@ class AmqpProtocolConverter {
if (remoteState != null && remoteState instanceof TransactionalState) {
TransactionalState s = (TransactionalState) remoteState;
long txid = toLong(s.getTxnId());
- ack.setTransactionId(new LocalTransactionId(connectionId, txid));
+ LocalTransactionId localTxId = new LocalTransactionId(connectionId, txid);
+ ack.setTransactionId(localTxId);
+
+ // Store the message sent in this TX we might need to re-send on rollback
+ md.getMessage().setTransactionId(localTxId);
+ dispatchedInTx.addFirst(md);
}
if (LOG.isTraceEnabled()) {
- LOG.trace("Sending Ack for MessageId:{} to ActiveMQ", ack.getLastMessageId());
+ LOG.trace("Sending Ack to ActiveMQ: {}", ack);
}
sendToActiveMQ(ack, new ResponseHandler() {
@@ -917,46 +960,129 @@ class AmqpProtocolConverter {
@Override
public void onDelivery(Delivery delivery) throws Exception {
MessageDispatch md = (MessageDispatch) delivery.getContext();
- final DeliveryState state = delivery.getRemoteState();
- if (state instanceof Accepted) {
- if (!delivery.remotelySettled()) {
- delivery.disposition(new Accepted());
- }
- settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
- } else if (state instanceof Rejected) {
- // re-deliver /w incremented delivery counter.
- md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
- settle(delivery, -1);
- } else if (state instanceof Released) {
- // re-deliver && don't increment the counter.
- settle(delivery, -1);
- } else if (state instanceof Modified) {
- Modified modified = (Modified) state;
- if (modified.getDeliveryFailed()) {
- // increment delivery counter..
- md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+ DeliveryState state = delivery.getRemoteState();
+
+ if (state instanceof TransactionalState) {
+ TransactionalState txState = (TransactionalState) state;
+ if (txState.getOutcome() instanceof DeliveryState) {
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("onDelivery: TX delivery state = {}", state);
+ }
+
+ state = (DeliveryState) txState.getOutcome();
+
+ if (state instanceof Accepted) {
+ if (!delivery.remotelySettled()) {
+ delivery.disposition(new Accepted());
+ }
+ settle(delivery, MessageAck.DELIVERED_ACK_TYPE);
+ }
}
- byte ackType = -1;
- Boolean undeliverableHere = modified.getUndeliverableHere();
- if (undeliverableHere != null && undeliverableHere) {
- // receiver does not want the message..
- // perhaps we should DLQ it?
- ackType = MessageAck.POSION_ACK_TYPE;
+ } else {
+ if (state instanceof Accepted) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("onDelivery: accepted state = {}", state);
+ }
+
+ if (!delivery.remotelySettled()) {
+ delivery.disposition(new Accepted());
+ }
+ settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+ } else if (state instanceof Rejected) {
+ // re-deliver /w incremented delivery counter.
+ md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("onDelivery: Rejected state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+ }
+ settle(delivery, -1);
+ } else if (state instanceof Released) {
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("onDelivery: Released state = {}", state);
+ }
+ // re-deliver && don't increment the counter.
+ settle(delivery, -1);
+ } else if (state instanceof Modified) {
+ Modified modified = (Modified) state;
+ if (modified.getDeliveryFailed()) {
+ // increment delivery counter..
+ md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+ }
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("onDelivery: Modified state = {}, delivery count now {}", state, md.getRedeliveryCounter());
+ }
+ byte ackType = -1;
+ Boolean undeliverableHere = modified.getUndeliverableHere();
+ if (undeliverableHere != null && undeliverableHere) {
+ // receiver does not want the message..
+ // perhaps we should DLQ it?
+ ackType = MessageAck.POSION_ACK_TYPE;
+ }
+ settle(delivery, ackType);
}
- settle(delivery, ackType);
}
pumpOutbound();
}
+
+ @Override
+ void doCommit() throws Exception {
+ if (!dispatchedInTx.isEmpty()) {
+
+ MessageDispatch md = dispatchedInTx.getFirst();
+ MessageAck pendingTxAck = new MessageAck(md, MessageAck.STANDARD_ACK_TYPE, dispatchedInTx.size());
+ pendingTxAck.setTransactionId(md.getMessage().getTransactionId());
+ pendingTxAck.setFirstMessageId(dispatchedInTx.getLast().getMessage().getMessageId());
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Sending commit Ack to ActiveMQ: {}", pendingTxAck);
+ }
+
+ dispatchedInTx.clear();
+
+ sendToActiveMQ(pendingTxAck, new ResponseHandler() {
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ if (response.isException()) {
+ if (response.isException()) {
+ Throwable exception = ((ExceptionResponse) response).getException();
+ exception.printStackTrace();
+ sender.close();
+ }
+ }
+ pumpProtonToSocket();
+ }
+ });
+ }
+ }
+
+ @Override
+ void doRollback() throws Exception {
+ synchronized (outbound) {
+
+ if (LOG.isTraceEnabled()) {
+ LOG.trace("Rolling back {} messages for redelivery. ", dispatchedInTx.size());
+ }
+
+ for (MessageDispatch md : dispatchedInTx) {
+ md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+ md.getMessage().setTransactionId(null);
+ outbound.addFirst(md);
+ }
+
+ dispatchedInTx.clear();
+ }
+ }
}
private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
- void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
+ @SuppressWarnings("rawtypes")
+ void onSenderOpen(final Sender sender, final AmqpSessionContext sessionContext) {
org.apache.qpid.proton.amqp.messaging.Source source = (org.apache.qpid.proton.amqp.messaging.Source) sender.getRemoteSource();
try {
final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
- ConsumerContext consumerContext = new ConsumerContext(id, sender);
+ final ConsumerContext consumerContext = new ConsumerContext(id, sender);
sender.setContext(consumerContext);
String selector = null;
@@ -1062,6 +1188,7 @@ class AmqpProtocolConverter {
subscriptionsByConsumerId.remove(id);
sender.close();
} else {
+ sessionContext.consumers.add(consumerContext);
sender.open();
}
pumpProtonToSocket();
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?rev=1509046&r1=1509045&r2=1509046&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Wed Jul 31 22:05:27 2013
@@ -114,7 +114,7 @@ public class AmqpTestSupport {
p.send(message);
}
- p.close();
+ session.close();
}
protected BrokerViewMBean getProxyToBroker() throws MalformedObjectNameException, JMSException {
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1509046&r1=1509045&r2=1509046&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Wed Jul 31 22:05:27 2013
@@ -37,7 +37,9 @@ import org.apache.activemq.broker.jmx.Qu
import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
+import org.junit.Rule;
import org.junit.Test;
+import org.junit.rules.TestName;
import org.objectweb.jtests.jms.framework.TestConfig;
/**
@@ -45,11 +47,13 @@ import org.objectweb.jtests.jms.framewor
*/
public class JMSClientTest extends AmqpTestSupport {
+ @Rule public TestName name = new TestName();
+
@SuppressWarnings("rawtypes")
@Test
public void testProducerConsume() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
- QueueImpl queue = new QueueImpl("queue://txqueue");
+ QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
{
@@ -78,32 +82,29 @@ public class JMSClientTest extends AmqpT
@Test
public void testTransactedConsumer() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
- QueueImpl queue = new QueueImpl("queue://txqueue");
- final int msgCount = 10;
+ QueueImpl queue = new QueueImpl("queue://" + name);
+ final int msgCount = 1;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
- QueueViewMBean queueView = getProxyToQueue("txqueue");
+ QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
- // Consumer all in TX and commit.
- {
- Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer consumer = session.createConsumer(queue);
+ Session session = connection.createSession(true, Session.AUTO_ACKNOWLEDGE);
+ MessageConsumer consumer = session.createConsumer(queue);
- for (int i = 0; i < msgCount; ++i) {
- Message msg = consumer.receive(TestConfig.TIMEOUT);
- assertNotNull(msg);
- assertTrue(msg instanceof TextMessage);
- }
+ Message msg = consumer.receive(TestConfig.TIMEOUT);
+ assertNotNull(msg);
+ assertTrue(msg instanceof TextMessage);
- consumer.close();
- session.commit();
- }
+ LOG.info("Queue size before session commit is: {}", queueView.getQueueSize());
+ assertEquals(msgCount, queueView.getQueueSize());
+
+ session.commit();
- LOG.info("Queue size after consumer commit is: {}", queueView.getQueueSize());
+ LOG.info("Queue size after session commit is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
connection.close();
@@ -113,13 +114,13 @@ public class JMSClientTest extends AmqpT
public void testRollbackRececeivedMessage() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
- QueueImpl queue = new QueueImpl("queue://txqueue");
+ QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 1;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
- QueueViewMBean queueView = getProxyToQueue("txqueue");
+ QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
@@ -128,6 +129,7 @@ public class JMSClientTest extends AmqpT
// Receive and roll back, first receive should not show redelivered.
Message msg = consumer.receive(TestConfig.TIMEOUT);
+ LOG.info("Test received msg: {}", msg);
assertNotNull(msg);
assertTrue(msg instanceof TextMessage);
assertEquals(false, msg.getJMSRedelivered());
@@ -147,19 +149,22 @@ public class JMSClientTest extends AmqpT
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
+
+ session.close();
+ connection.close();
}
@Test
public void testTXConsumerAndLargeNumberOfMessages() throws Exception {
ActiveMQAdmin.enableJMSFrameTracing();
- QueueImpl queue = new QueueImpl("queue://txqueue");
+ QueueImpl queue = new QueueImpl("queue://" + name);
final int msgCount = 500;
Connection connection = createConnection();
sendMessages(connection, queue, msgCount);
- QueueViewMBean queueView = getProxyToQueue("txqueue");
+ QueueViewMBean queueView = getProxyToQueue(name.toString());
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(msgCount, queueView.getQueueSize());
@@ -177,10 +182,13 @@ public class JMSClientTest extends AmqpT
assertTrue(msg instanceof TextMessage);
}
- consumer.close();
session.commit();
+ consumer.close();
+ session.close();
}
+ connection.close();
+
LOG.info("Queue size after produce is: {}", queueView.getQueueSize());
assertEquals(0, queueView.getQueueSize());
}
@@ -189,7 +197,7 @@ public class JMSClientTest extends AmqpT
@Test
public void testSelectors() throws Exception{
ActiveMQAdmin.enableJMSFrameTracing();
- QueueImpl queue = new QueueImpl("queue://txqueue");
+ QueueImpl queue = new QueueImpl("queue://" + name);
Connection connection = createConnection();
{