You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/19 16:28:03 UTC
svn commit: r1400116 - in /activemq/trunk/activemq-amqp/src:
main/java/org/apache/activemq/transport/amqp/
main/java/org/apache/activemq/transport/amqp/transform/
main/resources/META-INF/services/org/apache/activemq/transport/
test/java/org/apache/acti...
Author: chirino
Date: Fri Oct 19 14:28:02 2012
New Revision: 1400116
URL: http://svn.apache.org/viewvc?rev=1400116&view=rev
Log:
More AMQP impl changes. More tests pass, initial cut of transaction support is in.
Added:
activemq/trunk/activemq-amqp/src/test/resources/keystore
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Fri Oct 19 14:28:02 2012
@@ -24,7 +24,18 @@ import org.apache.activemq.util.IdGenera
import org.apache.activemq.util.LongSequenceGenerator;
import org.apache.qpid.proton.engine.*;
import org.apache.qpid.proton.engine.impl.ConnectionImpl;
+import org.apache.qpid.proton.engine.impl.ProtocolTracer;
import org.apache.qpid.proton.engine.impl.TransportImpl;
+import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.messaging.*;
+import org.apache.qpid.proton.type.messaging.Modified;
+import org.apache.qpid.proton.type.messaging.Rejected;
+import org.apache.qpid.proton.type.messaging.Released;
+import org.apache.qpid.proton.type.transaction.*;
+import org.apache.qpid.proton.type.transport.DeliveryState;
+import org.apache.qpid.proton.type.transport.SenderSettleMode;
+import org.apache.qpid.proton.type.transport.Source;
import org.fusesource.hawtbuf.Buffer;
import org.fusesource.hawtbuf.ByteArrayOutputStream;
import org.slf4j.Logger;
@@ -43,8 +54,9 @@ class AmqpProtocolConverter {
public static final EnumSet<EndpointState> INITIALIZED_SET = EnumSet.complementOf(UNINITIALIZED_SET);
public static final EnumSet<EndpointState> ACTIVE_STATE = EnumSet.of(EndpointState.ACTIVE);
public static final EnumSet<EndpointState> CLOSED_STATE = EnumSet.of(EndpointState.CLOSED);
+ public static final EnumSet<EndpointState> ALL_STATES = EnumSet.of(EndpointState.CLOSED, EndpointState.ACTIVE, EndpointState.UNINITIALIZED);
private static final Logger LOG = LoggerFactory.getLogger(AmqpProtocolConverter.class);
-
+ static final public byte[] EMPTY_BYTE_ARRAY = new byte[]{};
private final AmqpTransport amqpTransport;
public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
@@ -78,6 +90,18 @@ class AmqpProtocolConverter {
{
this.protonTransport.bind(this.protonConnection);
+ this.protonTransport.setProtocolTracer(new ProtocolTracer() {
+ @Override
+ public void receivedFrame(TransportFrame transportFrame) {
+ System.out.println(String.format("RECV: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
+ }
+
+ @Override
+ public void sentFrame(TransportFrame transportFrame) {
+ System.out.println(String.format("SENT: %05d | %s", transportFrame.getChannel(), transportFrame.getBody()));
+ }
+ });
+
}
void pumpProtonToSocket() {
@@ -88,7 +112,8 @@ class AmqpProtocolConverter {
while (!done) {
int count = protonTransport.output(data, 0, size);
if (count > 0) {
- final Buffer buffer = new Buffer(data, 0, count);
+ final Buffer buffer;
+ buffer = new Buffer(data, 0, count);
// System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
amqpTransport.sendToAmqp(buffer);
} else {
@@ -107,7 +132,7 @@ class AmqpProtocolConverter {
long nextConsumerId = 0;
public AmqpSessionContext(ConnectionId connectionId, long id) {
- sessionId = new SessionId(connectionId, -1);
+ sessionId = new SessionId(connectionId, id);
}
}
@@ -163,6 +188,13 @@ class AmqpProtocolConverter {
link = link.next(ACTIVE_STATE, CLOSED_STATE);
}
+ link = protonConnection.linkHead(ACTIVE_STATE, ALL_STATES);
+ while (link != null) {
+ ((AmqpDeliveryListener)link.getContext()).drainCheck();
+ link = link.next(ACTIVE_STATE, CLOSED_STATE);
+ }
+
+
session = protonConnection.sessionHead(ACTIVE_STATE, CLOSED_STATE);
while (session != null) {
//TODO - close links?
@@ -170,8 +202,7 @@ class AmqpProtocolConverter {
session = session.next(ACTIVE_STATE, CLOSED_STATE);
}
if (protonConnection.getLocalState() == EndpointState.ACTIVE && protonConnection.getRemoteState() == EndpointState.CLOSED) {
-// listener.onConnectionClose(protonConnection);
- protonConnection.close();
+ doClose();
}
} catch (Throwable e) {
@@ -181,6 +212,35 @@ class AmqpProtocolConverter {
pumpProtonToSocket();
}
+ boolean closing = false;
+ boolean closedSocket = false;
+
+ private void doClose() {
+ if( !closing ) {
+ closing = true;
+ sendToActiveMQ(new RemoveInfo(connectionId), new ResponseHandler() {
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ protonConnection.close();
+ if( !closedSocket) {
+ pumpProtonToSocket();
+ }
+ }
+ });
+ }
+ }
+
+
+ public void onAMQPException(IOException error) {
+ closedSocket = true;
+ if( !closing) {
+ System.out.println("AMQP client disconnected");
+ error.printStackTrace();
+ } else {
+ doClose();
+ }
+ }
+
public void onActiveMQCommand(Command command) throws Exception {
if (command.isResponse()) {
Response response = (Response) command;
@@ -221,6 +281,7 @@ class AmqpProtocolConverter {
static abstract class AmqpDeliveryListener {
abstract public void onDelivery(Delivery delivery) throws Exception;
public void onClose() throws Exception {}
+ public void drainCheck() {}
}
private void onConnectionOpen() throws AmqpProtocolException {
@@ -278,13 +339,17 @@ class AmqpProtocolConverter {
private void onSessionClose(Session session) {
AmqpSessionContext sessionContext = (AmqpSessionContext)session.getContext();
- sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
+ if( sessionContext!=null ) {
+ System.out.println(sessionContext.sessionId);
+ sendToActiveMQ(new RemoveInfo(sessionContext.sessionId), null);
+ session.setContext(null);
+ }
session.close();
}
private void onLinkOpen(Link link) {
- link.setLocalSourceAddress(link.getRemoteSourceAddress());
- link.setLocalTargetAddress(link.getRemoteTargetAddress());
+ link.setSource(link.getRemoteSource());
+ link.setTarget(link.getRemoteTarget());
AmqpSessionContext sessionContext = (AmqpSessionContext) link.getSession().getContext();
if (link instanceof Receiver) {
@@ -296,17 +361,9 @@ class AmqpProtocolConverter {
InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
- class ProducerContext extends AmqpDeliveryListener {
- private final ProducerId producerId;
- private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
- private final ActiveMQDestination destination;
- ByteArrayOutputStream current = new ByteArrayOutputStream();
-
- public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
- this.producerId = producerId;
- this.destination = destination;
- }
+ abstract class BaseProducerContext extends AmqpDeliveryListener {
+ ByteArrayOutputStream current = new ByteArrayOutputStream();
@Override
public void onDelivery(Delivery delivery) throws Exception {
@@ -336,7 +393,26 @@ class AmqpProtocolConverter {
receiver.advance();
delivery.settle();
- final Buffer buffer = current.toBuffer();
+ Buffer buffer = current.toBuffer();
+ current = null;
+ onMessage(receiver, delivery, buffer);
+ }
+
+ abstract protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception;
+ }
+
+ class ProducerContext extends BaseProducerContext {
+ private final ProducerId producerId;
+ private final LongSequenceGenerator messageIdGenerator = new LongSequenceGenerator();
+ private final ActiveMQDestination destination;
+
+ public ProducerContext(ProducerId producerId, ActiveMQDestination destination) {
+ this.producerId = producerId;
+ this.destination = destination;
+ }
+
+ @Override
+ protected void onMessage(Receiver receiver, Delivery delivery, Buffer buffer) throws Exception {
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
current = null;
@@ -348,6 +424,14 @@ class AmqpProtocolConverter {
if( message.getMessageId()==null ) {
message.setMessageId(new MessageId(producerId, messageIdGenerator.getNextSequenceId()));
}
+
+ DeliveryState remoteState = delivery.getRemoteState();
+ if( remoteState!=null && remoteState instanceof TransactionalState) {
+ TransactionalState s = (TransactionalState) remoteState;
+ long txid = toLong(s.getTxnId());
+ message.setTransactionId(new LocalTransactionId(connectionId, txid));
+ }
+
message.onSend();
// sendToActiveMQ(message, createResponseHandler(command));
sendToActiveMQ(message, null);
@@ -355,37 +439,166 @@ class AmqpProtocolConverter {
}
+ long nextTransactionId = 0;
+ class Transaction {
- void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
- // Client is producing to this receiver object
+ }
+ HashMap<Long, Transaction> transactions = new HashMap<Long, Transaction>();
- ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
- ActiveMQDestination dest = ActiveMQDestination.createDestination(receiver.getRemoteTargetAddress(), ActiveMQDestination.QUEUE_TYPE);
- ProducerContext producerContext = new ProducerContext(producerId, dest);
-
- receiver.setContext(producerContext);
- receiver.flow(1024 * 64);
- ProducerInfo producerInfo = new ProducerInfo(producerId);
- producerInfo.setDestination(dest);
- sendToActiveMQ(producerInfo, new ResponseHandler() {
- public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
- receiver.open();
- if (response.isException()) {
- // If the connection attempt fails we close the socket.
- Throwable exception = ((ExceptionResponse) response).getException();
- receiver.close();
+ public byte[] toBytes(long value) {
+ Buffer buffer = new Buffer(8);
+ buffer.bigEndianEditor().writeLong(value);
+ return buffer.data;
+ }
+
+ private long toLong(Binary value) {
+ Buffer buffer = new Buffer(value.getArray(), value.getArrayOffset(), value.getLength());
+ return buffer.bigEndianEditor().readLong();
+ }
+
+
+ AmqpDeliveryListener coordinatorContext = new BaseProducerContext() {
+ @Override
+ protected void onMessage(Receiver receiver, final Delivery delivery, Buffer buffer) throws Exception {
+
+ org.apache.qpid.proton.message.Message msg = new org.apache.qpid.proton.message.Message();
+
+ int offset = buffer.offset;
+ int len = buffer.length;
+ while( len > 0 ) {
+ final int decoded = msg.decode(buffer.data, offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+
+ Object action = ((AmqpValue)msg.getBody()).getValue();
+ System.out.println("COORDINATOR received: "+action+", ["+buffer+"]");
+ if( action instanceof Declare ) {
+ Declare declare = (Declare) action;
+ if( declare.getGlobalId()!=null ) {
+ throw new Exception("don't know how to handle a declare /w a set GlobalId");
}
- pumpProtonToSocket();
+
+ long txid = nextTransactionId++;
+ TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), TransactionInfo.BEGIN);
+ sendToActiveMQ(txinfo, null);
+ System.out.println("started transaction "+txid);
+
+ Declared declared = new Declared();
+ declared.setTxnId(new Binary(toBytes(txid)));
+ delivery.disposition(declared);
+ delivery.settle();
+
+ } else if( action instanceof Discharge) {
+ Discharge discharge = (Discharge) action;
+ long txid = toLong(discharge.getTxnId());
+
+ byte operation;
+ if( discharge.getFail() ) {
+ System.out.println("rollback transaction "+txid);
+ operation = TransactionInfo.ROLLBACK ;
+ } else {
+ System.out.println("commit transaction "+txid);
+ operation = TransactionInfo.COMMIT_ONE_PHASE;
+ }
+ TransactionInfo txinfo = new TransactionInfo(connectionId, new LocalTransactionId(connectionId, txid), operation);
+ sendToActiveMQ(txinfo, new ResponseHandler() {
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ if( response.isException() ) {
+ ExceptionResponse er = (ExceptionResponse)response;
+ Rejected rejected = new Rejected();
+ ArrayList errors = new ArrayList();
+ errors.add(er.getException().getMessage());
+ rejected.setError(errors);
+ delivery.disposition(rejected);
+ }
+ delivery.settle();
+ pumpProtonToSocket();
+ }
+ });
+ receiver.advance();
+
+ } else {
+ throw new Exception("Expected coordinator message type: "+action.getClass());
}
- });
+ }
+
+ };
+
+
+ void onReceiverOpen(final Receiver receiver, AmqpSessionContext sessionContext) {
+ // Client is producing to this receiver object
+ org.apache.qpid.proton.type.transport.Target remoteTarget = receiver.getRemoteTarget();
+ if( remoteTarget instanceof Coordinator ) {
+ pumpProtonToSocket();
+ receiver.setContext(coordinatorContext);
+ receiver.flow(1024 * 64);
+ receiver.open();
+ pumpProtonToSocket();
+ } else {
+ ProducerId producerId = new ProducerId(sessionContext.sessionId, sessionContext.nextProducerId++);
+ ActiveMQDestination dest = createDestination(remoteTarget);
+ ProducerContext producerContext = new ProducerContext(producerId, dest);
+
+ receiver.setContext(producerContext);
+ receiver.flow(1024 * 64);
+ ProducerInfo producerInfo = new ProducerInfo(producerId);
+ producerInfo.setDestination(dest);
+ sendToActiveMQ(producerInfo, new ResponseHandler() {
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ receiver.open();
+ if (response.isException()) {
+ // If the connection attempt fails we close the socket.
+ Throwable exception = ((ExceptionResponse) response).getException();
+ receiver.close();
+ }
+ pumpProtonToSocket();
+ }
+ });
+ }
+
+
+ }
+
+ private ActiveMQDestination createDestination(Object terminus) {
+ if( terminus == null ) {
+ return null;
+ } else if( terminus instanceof org.apache.qpid.proton.type.messaging.Source) {
+ org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)terminus;
+ return ActiveMQDestination.createDestination(source.getAddress(), ActiveMQDestination.QUEUE_TYPE);
+ } else if( terminus instanceof org.apache.qpid.proton.type.messaging.Target) {
+ org.apache.qpid.proton.type.messaging.Target target = (org.apache.qpid.proton.type.messaging.Target)terminus;
+ return ActiveMQDestination.createDestination(target.getAddress(), ActiveMQDestination.QUEUE_TYPE);
+ } else if( terminus instanceof Coordinator ) {
+ Coordinator target = (Coordinator)terminus;
+ return null;
+ } else {
+ throw new RuntimeException("Unexpected terminus type: "+terminus);
+ }
+ }
+
+ private Source createSource(ActiveMQDestination dest) {
+ org.apache.qpid.proton.type.messaging.Source rc = new org.apache.qpid.proton.type.messaging.Source();
+ rc.setAddress(inboundTransformer.getVendor().toAddress(dest));
+ return rc;
}
OutboundTransformer outboundTransformer = new AutoOutboundTransformer(ActiveMQJMSVendor.INSTANCE);
+
class ConsumerContext extends AmqpDeliveryListener {
private final ConsumerId consumerId;
private final Sender sender;
+ private boolean presettle;
+
+ public ConsumerContext(ConsumerId consumerId, Sender sender) {
+ this.consumerId = consumerId;
+ this.sender = sender;
+ this.presettle = sender.getRemoteSenderSettleMode() == SenderSettleMode.SETTLED;
+ }
long nextTagId = 0;
HashSet<byte[]> tagCache = new HashSet<byte[]>();
@@ -406,11 +619,13 @@ class AmqpProtocolConverter {
return rc;
}
- public ConsumerContext(ConsumerId consumerId, Sender sender) {
- this.consumerId = consumerId;
- this.sender = sender;
+ void checkinTag(byte[] data) {
+ if( tagCache.size() < 1024 ) {
+ tagCache.add(data);
+ }
}
+
@Override
public void onClose() throws Exception {
sendToActiveMQ(new RemoveInfo(consumerId), null);
@@ -428,7 +643,7 @@ class AmqpProtocolConverter {
Buffer currentBuffer;
Delivery currentDelivery;
- public void pumpOutbound() {
+ public void pumpOutbound() throws Exception {
while(true) {
while( currentBuffer !=null ) {
@@ -436,8 +651,11 @@ class AmqpProtocolConverter {
if( sent > 0 ) {
currentBuffer.moveHead(sent);
if( currentBuffer.length == 0 ) {
- currentDelivery.settle();
- sender.advance();
+ if( presettle ) {
+ settle(currentDelivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+ } else {
+ sender.advance();
+ }
currentBuffer = null;
currentDelivery = null;
}
@@ -453,12 +671,17 @@ class AmqpProtocolConverter {
final MessageDispatch md = outbound.removeFirst();
try {
final ActiveMQMessage jms = (ActiveMQMessage) md.getMessage();
+ jms.setRedeliveryCounter(md.getRedeliveryCounter());
final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) {
currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
- final byte[] tag = nextTag();
- currentDelivery = sender.delivery(tag, 0, tag.length);
+ if( presettle ) {
+ currentDelivery = sender.delivery(EMPTY_BYTE_ARRAY, 0, 0);
+ } else {
+ final byte[] tag = nextTag();
+ currentDelivery = sender.delivery(tag, 0, tag.length);
+ }
currentDelivery.setContext(md);
} else {
@@ -471,12 +694,81 @@ class AmqpProtocolConverter {
}
}
- @Override
- public void onDelivery(Delivery delivery) throws JMSException {
- if( delivery.remotelySettled() ) {
+ private void settle(final Delivery delivery, int ackType) throws Exception {
+ byte[] tag = delivery.getTag();
+ if( tag !=null && tag.length>0 ) {
+ checkinTag(tag);
+ }
+
+ if( ackType == -1) {
+ // we are going to settle, but redeliver.. we we won't yet ack to ActiveMQ
+ delivery.settle();
+ onMessageDispatch((MessageDispatch) delivery.getContext());
+ } else {
MessageDispatch md = (MessageDispatch) delivery.getContext();
- pumpOutbound();
+ MessageAck ack = new MessageAck();
+ ack.setConsumerId(consumerId);
+ ack.setFirstMessageId(md.getMessage().getMessageId());
+ ack.setLastMessageId(md.getMessage().getMessageId());
+ ack.setMessageCount(1);
+ ack.setAckType((byte)ackType);
+
+ DeliveryState remoteState = delivery.getRemoteState();
+ if( remoteState!=null && remoteState instanceof TransactionalState) {
+ TransactionalState s = (TransactionalState) remoteState;
+ long txid = toLong(s.getTxnId());
+ ack.setTransactionId(new LocalTransactionId(connectionId, txid));
+ }
+
+ sendToActiveMQ(ack, new ResponseHandler() {
+ @Override
+ public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+ delivery.settle();
+ pumpProtonToSocket();
+ }
+ });
+ }
+ }
+
+ @Override
+ public void drainCheck() {
+ if( outbound.isEmpty() ) {
+ sender.drained();
+ }
+ }
+
+ @Override
+ public void onDelivery(Delivery delivery) throws Exception {
+ MessageDispatch md = (MessageDispatch) delivery.getContext();
+ final DeliveryState state = delivery.getRemoteState();
+ if( state instanceof Accepted ) {
+ if( !delivery.remotelySettled() ) {
+ delivery.disposition(new Accepted());
+ }
+ settle(delivery, MessageAck.INDIVIDUAL_ACK_TYPE);
+ } else if( state instanceof Rejected) {
+ // re-deliver /w incremented delivery counter.
+ md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+ settle(delivery, -1);
+ } else if( state instanceof Released) {
+ // re-deliver && don't increment the counter.
+ settle(delivery, -1);
+ } else if( state instanceof Modified) {
+ Modified modified = (Modified) state;
+ if ( modified.getDeliveryFailed() ) {
+ // increment delivery counter..
+ md.setRedeliveryCounter(md.getRedeliveryCounter() + 1);
+ }
+ byte ackType = -1;
+ Boolean undeliverableHere = modified.getUndeliverableHere();
+ if( undeliverableHere !=null && undeliverableHere ) {
+ // receiver does not want the message..
+ // perhaps we should DLQ it?
+ ackType = MessageAck.POSION_ACK_TYPE;
+ }
+ settle(delivery, ackType);
}
+ pumpOutbound();
}
}
@@ -485,14 +777,16 @@ class AmqpProtocolConverter {
void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
+ // sender.get
ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
ConsumerContext consumerContext = new ConsumerContext(id, sender);
subscriptionsByConsumerId.put(id, consumerContext);
ActiveMQDestination dest;
- if( sender.getRemoteSourceAddress() != null ) {
- dest = ActiveMQDestination.createDestination(sender.getRemoteSourceAddress(), ActiveMQDestination.QUEUE_TYPE);
+ final Source remoteSource = sender.getRemoteSource();
+ if( remoteSource != null ) {
+ dest = createDestination(remoteSource);
} else {
// lets create a temp dest.
// if (topic) {
@@ -507,7 +801,7 @@ class AmqpProtocolConverter {
info.setDestination(dest);
sendToActiveMQ(info, null);
tempDestinations.put(sender, dest);
- sender.setLocalSourceAddress(inboundTransformer.getVendor().toAddress(dest));
+ sender.setSource(createSource(dest));
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Fri Oct 19 14:28:02 2012
@@ -70,6 +70,20 @@ public class AmqpTransportFilter extends
}
}
+ @Override
+ public void onException(IOException error) {
+ try {
+ protocolConverter.lock.lock();
+ try {
+ protocolConverter.onAMQPException(error);
+ } finally {
+ protocolConverter.lock.unlock();
+ }
+ } finally {
+ super.onException(error);
+ }
+ }
+
public void onCommand(Object command) {
try {
if (trace) {
@@ -140,6 +154,8 @@ public class AmqpTransportFilter extends
return this.wireFormat;
}
+
+
public void handleException(IOException e) {
super.onException(e);
}
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java Fri Oct 19 14:28:02 2012
@@ -16,10 +16,15 @@
*/
package org.apache.activemq.transport.amqp.transform;
+import org.apache.qpid.proton.codec.CompositeWritableBuffer;
+import org.apache.qpid.proton.codec.WritableBuffer;
+import org.apache.qpid.proton.type.UnsignedInteger;
+
import javax.jms.BytesMessage;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageFormatException;
+import java.nio.ByteBuffer;
/**
* @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -54,8 +59,43 @@ public class AMQPNativeOutboundTransform
return null;
}
byte data[] = new byte[(int) msg.getBodyLength()];
+ int dataSize = data.length;
msg.readBytes(data);
- return new EncodedMessage(messageFormat, data, 0, data.length);
+ msg.reset();
+
+ try {
+ int count = msg.getIntProperty("JMSXDeliveryCount");
+ if( count > 1 ) {
+
+ // decode...
+ org.apache.qpid.proton.message.Message amqp = new org.apache.qpid.proton.message.Message();
+ int offset = 0;
+ int len = data.length;
+ while( len > 0 ) {
+ final int decoded = amqp.decode(data, offset, len);
+ assert decoded > 0: "Make progress decoding the message";
+ offset += decoded;
+ len -= decoded;
+ }
+
+ // Update the DeliveryCount header...
+ amqp.getHeader().setDeliveryCount(new UnsignedInteger(count));
+
+ // Re-encode...
+ ByteBuffer buffer = ByteBuffer.wrap(new byte[1024*4]);
+ final DroppingWritableBuffer overflow = new DroppingWritableBuffer();
+ int c = amqp.encode(new CompositeWritableBuffer(new WritableBuffer.ByteBufferWrapper(buffer), overflow));
+ if( overflow.position() > 0 ) {
+ buffer = ByteBuffer.wrap(new byte[1024*4+overflow.position()]);
+ c = amqp.encode(new WritableBuffer.ByteBufferWrapper(buffer));
+ }
+ data = buffer.array();
+ dataSize = c;
+ }
+ } catch (JMSException e) {
+ }
+
+ return new EncodedMessage(messageFormat, data, 0, dataSize);
}
}
Modified: activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp%2Bssl?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl (original)
+++ activemq/trunk/activemq-amqp/src/main/resources/META-INF/services/org/apache/activemq/transport/amqp+ssl Fri Oct 19 14:28:02 2012
@@ -14,4 +14,4 @@
## See the License for the specific language governing permissions and
## limitations under the License.
## ---------------------------------------------------------------------------
-class=org.apache.activemq.transport.amqp.AmqpSslTransportFactory
+class=org.apache.activemq.transport.amqp.AMQPSslTransportFactory
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/AmqpTestSupport.java Fri Oct 19 14:28:02 2012
@@ -19,12 +19,15 @@ package org.apache.activemq.transport.am
import junit.framework.TestCase;
import org.apache.activemq.AutoFailTestSupport;
import org.apache.activemq.broker.BrokerService;
+import org.apache.activemq.broker.SslContext;
import org.apache.activemq.broker.TransportConnector;
+import org.apache.activemq.spring.SpringSslContext;
import org.junit.After;
import org.junit.Before;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.File;
import java.util.Vector;
import static org.fusesource.hawtbuf.UTF8Buffer.utf8;
@@ -40,21 +43,51 @@ public class AmqpTestSupport {
protected int numberOfMessages;
AutoFailTestSupport autoFailTestSupport = new AutoFailTestSupport() {};
protected int port;
+ protected int sslPort;
+
+
+ public static void main(String[] args) throws Exception {
+ final AmqpTestSupport s = new AmqpTestSupport();
+ s.sslPort = 5671;
+ s.port = 5672;
+ s.startBroker();
+ while(true) {
+ Thread.sleep(100000);
+ }
+ }
@Before
- public void startBroker() throws Exception {
+ public void setUp() throws Exception {
autoFailTestSupport.startAutoFailThread();
exceptions.clear();
+ startBroker();
+ }
+
+ public void startBroker() throws Exception {
brokerService = new BrokerService();
brokerService.setPersistent(false);
brokerService.setAdvisorySupport(false);
+
+ // Setup SSL context...
+ final File classesDir = new File(AmqpProtocolConverter.class.getProtectionDomain().getCodeSource().getLocation().getFile());
+ File keystore = new File(classesDir, "../../src/test/resources/keystore");
+ final SpringSslContext sslContext = new SpringSslContext();
+ sslContext.setKeyStore(keystore.getCanonicalPath());
+ sslContext.setKeyStorePassword("password");
+ sslContext.setTrustStore(keystore.getCanonicalPath());
+ sslContext.setTrustStorePassword("password");
+ sslContext.afterPropertiesSet();
+ brokerService.setSslContext(sslContext);
+
addAMQPConnector();
brokerService.start();
this.numberOfMessages = 2000;
}
protected void addAMQPConnector() throws Exception {
- final TransportConnector connector = brokerService.addConnector("amqp://localhost:0");
+ TransportConnector connector =brokerService.addConnector("amqp+ssl://0.0.0.0:"+sslPort);
+ sslPort = connector.getConnectUri().getPort();
+ connector = brokerService.addConnector("amqp://0.0.0.0:"+port);
port = connector.getConnectUri().getPort();
}
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Fri Oct 19 14:28:02 2012
@@ -16,6 +16,7 @@
*/
package org.apache.activemq.transport.amqp;
+import org.apache.activemq.transport.amqp.joram.ActiveMQAdmin;
import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
import org.junit.Test;
@@ -30,59 +31,114 @@ import static org.junit.Assert.assertEqu
public class JMSClientTest extends AmqpTestSupport {
@Test
- public void testSendReceive() throws Exception {
+ public void testTransactions() throws Exception {
+ ActiveMQAdmin.enableJMSFrameTracing();
+ QueueImpl queue = new QueueImpl("queue://txqueue");
+
+ Connection connection = createConnection();
+ {
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+ MessageProducer p = session.createProducer(queue);
+ p.send(session.createTextMessage("Hello World"));
+// session.commit();
+
+ MessageConsumer c = session.createConsumer(queue);
+ Message msg = c.receive();
+ System.out.println("first:"+msg);
+ System.out.println(msg.getJMSRedelivered());
+
+// session.rollback();
+//
+// msg = c.receive();
+// System.out.println("second:"+msg);
+// System.out.println(msg.getJMSRedelivered());
+ }
+ connection.close();
- QueueImpl queue = new QueueImpl("queue://testqueue");
- int nMsgs = 100;
- final String dataFormat = "%010240d";
+ }
- final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
+// @Test
+// public void testSendReceive() throws Exception {
+// ActiveMQAdmin.enableJMSFrameTracing();
+// QueueImpl queue = new QueueImpl("queue://testqueue");
+// int nMsgs = 1;
+// final String dataFormat = "%01024d";
+//
+//
+// try {
+// Connection connection = createConnection();
+// {
+// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageProducer p = session.createProducer(queue);
+// for (int i = 0; i < nMsgs; i++) {
+// System.out.println("Sending " + i);
+// p.send(session.createTextMessage(String.format(dataFormat, i)));
+// }
+// }
+// connection.close();
+//
+// System.out.println("=======================================================================================");
+// System.out.println(" failing a receive ");
+// System.out.println("=======================================================================================");
+// connection = createConnection();
+// {
+// Session session = connection.createSession(false, Session.CLIENT_ACKNOWLEDGE);
+// MessageConsumer c = session.createConsumer(queue);
+//
+// // Receive messages non-transacted
+// int i = 0;
+// while ( i < 1) {
+// TextMessage msg = (TextMessage) c.receive();
+// if( msg!=null ) {
+// String s = msg.getText();
+// assertEquals(String.format(dataFormat, i), s);
+// System.out.println("Received: " + i);
+// i++;
+// }
+// }
+// }
+// connection.close();
+//
+//
+// System.out.println("=======================================================================================");
+// System.out.println(" receiving ");
+// System.out.println("=======================================================================================");
+// connection = createConnection();
+// {
+// Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+// MessageConsumer c = session.createConsumer(queue);
+//
+// // Receive messages non-transacted
+// int i = 0;
+// while ( i < nMsgs) {
+// TextMessage msg = (TextMessage) c.receive();
+// if( msg!=null ) {
+// String s = msg.getText();
+// assertEquals(String.format(dataFormat, i), s);
+// System.out.println("Received: " + i);
+// i++;
+// }
+// }
+// }
+// connection.close();
+//
+// } catch (Exception e) {
+// e.printStackTrace();
+// }
+//
+// }
- try {
- final Connection connection = factory.createConnection();
- connection.setExceptionListener(new ExceptionListener() {
- @Override
- public void onException(JMSException exception) {
- exception.printStackTrace();
- }
- });
- connection.start();
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageProducer p = session.createProducer(queue);
- for (int i = 0; i < nMsgs; i++) {
- System.out.println("Sending " + i);
- p.send(session.createTextMessage(String.format(dataFormat, i)));
- }
- p.close();
- session.close();
- }
- System.out.println("=======================================================================================");
- System.out.println(" receiving ");
- System.out.println("=======================================================================================");
- {
- Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
- MessageConsumer c = session.createConsumer(queue);
-
- // Receive messages non-transacted
- int i = 0;
- while ( i < nMsgs) {
- TextMessage msg = (TextMessage) c.receive();
- if( msg!=null ) {
- String s = msg.getText();
- assertEquals(String.format(dataFormat, i), s);
- System.out.println("Received: " + i);
- i++;
- }
- }
- c.close();
- session.close();
+ private Connection createConnection() throws JMSException {
+ final ConnectionFactoryImpl factory = new ConnectionFactoryImpl("localhost", port, null, null);
+ final Connection connection = factory.createConnection();
+ connection.setExceptionListener(new ExceptionListener() {
+ @Override
+ public void onException(JMSException exception) {
+ exception.printStackTrace();
}
- connection.close();
- } catch (Exception e) {
- e.printStackTrace();
- }
-
+ });
+ connection.start();
+ return connection;
}
}
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/ActiveMQAdmin.java Fri Oct 19 14:28:02 2012
@@ -29,6 +29,9 @@ import javax.naming.Context;
import javax.naming.InitialContext;
import javax.naming.NamingException;
import java.io.File;
+import java.io.FileNotFoundException;
+import java.io.FileOutputStream;
+import java.io.PrintStream;
import java.net.URI;
import java.util.Hashtable;
import java.util.logging.*;
@@ -53,17 +56,18 @@ public class ActiveMQAdmin implements Ad
}
}
- private void enableJMSFrameTracing() {
+ static public void enableJMSFrameTracing() throws FileNotFoundException {
final SimpleFormatter formatter = new SimpleFormatter();
+ final PrintStream out = new PrintStream(new FileOutputStream(new File("/tmp/amqp-trace.txt")));
Handler handler = new Handler() {
@Override
public void publish(LogRecord r) {
- System.out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
+ out.println(String.format("%s:%s", r.getLoggerName(), r.getMessage()));
}
@Override
public void flush() {
- System.out.flush();
+ out.flush();
}
@Override
@@ -71,7 +75,7 @@ public class ActiveMQAdmin implements Ad
}
};
- Logger log = Logger.getLogger("RAW");
+ Logger log = Logger.getLogger("FRM");
log.addHandler(handler);
log.setLevel(Level.FINEST);
}
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java?rev=1400116&r1=1400115&r2=1400116&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java Fri Oct 19 14:28:02 2012
@@ -47,6 +47,8 @@ public class JoramJmsTest extends TestCa
TestSuite suite = new TestSuite();
// Passing tests
+ suite.addTestSuite(ConnectionTest.class);
+ suite.addTestSuite(SessionTest.class);
suite.addTestSuite(JMSXPropertyTest.class);
suite.addTestSuite(MessageBodyTest.class);
suite.addTestSuite(MessageDefaultTest.class);
@@ -54,13 +56,7 @@ public class JoramJmsTest extends TestCa
suite.addTestSuite(MessagePropertyTest.class);
if (false ) {
-// TODO: Fails due to JMS client impl error.
- suite.addTestSuite(UnifiedSessionTest.class);
-// TODO: Fails due to https://issues.apache.org/jira/browse/PROTON-62: ClassCastException when processing an Attach frame
- suite.addTestSuite(QueueSessionTest.class);
- suite.addTestSuite(SessionTest.class);
-// TODO: Fails due to inconsistent ObjectMessage mapping in the JMS client.
- suite.addTestSuite(MessageTypeTest.class);
+
// TODO: Fails due to temp destinations not being supported yet.
suite.addTestSuite(MessageHeaderTest.class);
suite.addTestSuite(TemporaryQueueTest.class);
@@ -68,13 +64,17 @@ public class JoramJmsTest extends TestCa
// TODO: Fails due to selectors not being implemented yet.
suite.addTestSuite(SelectorSyntaxTest.class);
suite.addTestSuite(SelectorTest.class);
+ suite.addTestSuite(QueueSessionTest.class);
+// TODO: Browsers not yet supported.
+ suite.addTestSuite(QueueBrowserTest.class);
+// TODO: Fails due to JMS client impl error.
+ suite.addTestSuite(UnifiedSessionTest.class);
+// TODO: Fails due to inconsistent ObjectMessage mapping in the JMS client.
+ suite.addTestSuite(MessageTypeTest.class);
// TODO: Fails due to: javax.jms.IllegalStateException: Cannot set client-id to "publisherConnection"; client-id must be set on connection creation
suite.addTestSuite(TopicConnectionTest.class);
suite.addTestSuite(TopicSessionTest.class);
-// TODO: figure out why the following tests fail..
-// TODO: figure out why the following tests hang..
- suite.addTestSuite(ConnectionTest.class);
- suite.addTestSuite(QueueBrowserTest.class);
+
}
return suite;
Added: activemq/trunk/activemq-amqp/src/test/resources/keystore
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/resources/keystore?rev=1400116&view=auto
==============================================================================
Files activemq/trunk/activemq-amqp/src/test/resources/keystore (added) and activemq/trunk/activemq-amqp/src/test/resources/keystore Fri Oct 19 14:28:02 2012 differ