You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/04 23:00:36 UTC
svn commit: r1394266 - in /activemq/trunk/activemq-amqp/src:
main/java/org/apache/activemq/transport/amqp/
main/java/org/apache/activemq/transport/amqp/transform/
test/java/org/apache/activemq/transport/amqp/
Author: chirino
Date: Thu Oct 4 21:00:36 2012
New Revision: 1394266
URL: http://svn.apache.org/viewvc?rev=1394266&view=rev
Log:
AMQP: Sending and receiving large number of messages was broken.
Modified:
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu Oct 4 21:00:36 2012
@@ -305,11 +305,18 @@ class AmqpProtocolConverter {
@Override
public void onDelivery(Delivery delivery) throws Exception {
- if( current ==null ) {
+ Receiver receiver = ((Receiver)delivery.getLink());
+ if( !delivery.isReadable() ) {
+ System.out.println("it was not readable!");
+// delivery.settle();
+// receiver.advance();
+ return;
+ }
+
+ if( current==null ) {
current = new ByteArrayOutputStream();
}
- Receiver receiver = ((Receiver)delivery.getLink());
int count;
byte data[] = new byte[1024*4];
while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
@@ -321,6 +328,9 @@ class AmqpProtocolConverter {
return;
}
+ receiver.advance();
+ delivery.settle();
+
final Buffer buffer = current.toBuffer();
EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
@@ -410,18 +420,21 @@ class AmqpProtocolConverter {
pumpProtonToSocket();
}
- Buffer current;
+ Buffer currentBuffer;
+ Delivery currentDelivery;
public void pumpOutbound() {
while(true) {
- while( current!=null ) {
- int sent = sender.send(current.data, current.offset, current.length);
+ while( currentBuffer !=null ) {
+ int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
if( sent > 0 ) {
- current.moveHead(sent);
- if( current.length == 0 ) {
+ currentBuffer.moveHead(sent);
+ if( currentBuffer.length == 0 ) {
+ currentDelivery.settle();
sender.advance();
- current = null;
+ currentBuffer = null;
+ currentDelivery = null;
}
} else {
return;
@@ -438,10 +451,10 @@ class AmqpProtocolConverter {
final EncodedMessage amqp = outboundTransformer.transform(jms);
if( amqp!=null && amqp.getLength() > 0 ) {
- current = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
+ currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
final byte[] tag = nextTag();
- final Delivery delivery = sender.delivery(tag, 0, tag.length);
- delivery.setContext(md);
+ currentDelivery = sender.delivery(tag, 0, tag.length);
+ currentDelivery.setContext(md);
} else {
// TODO: message could not be generated what now?
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Thu Oct 4 21:00:36 2012
@@ -59,7 +59,9 @@ public class AmqpTransportFilter extends
public void oneway(Object o) throws IOException {
try {
final Command command = (Command) o;
- protocolConverter.onActiveMQCommand(command);
+ synchronized (protocolConverter) {
+ protocolConverter.onActiveMQCommand(command);
+ }
} catch (Exception e) {
throw IOExceptionSupport.create(e);
}
@@ -70,8 +72,9 @@ public class AmqpTransportFilter extends
if (trace) {
TRACE.trace("Received: \n" + command);
}
-
- protocolConverter.onAMQPData((Buffer) command);
+ synchronized (protocolConverter) {
+ protocolConverter.onAMQPData((Buffer) command);
+ }
} catch (IOException e) {
handleException(e);
} catch (JMSException e) {
Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Thu Oct 4 21:00:36 2012
@@ -52,7 +52,9 @@ public class JMSMappingInboundTransforme
Message rc;
final Section body = amqp.getBody();
- if( body instanceof Data ) {
+ if( body == null ) {
+ rc = vendor.createMessage();
+ } else if( body instanceof Data ) {
Binary d = ((Data) body).getValue();
BytesMessage m = vendor.createBytesMessage();
m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
@@ -91,7 +93,7 @@ public class JMSMappingInboundTransforme
// jms = m;
}
} else {
- throw new RuntimeException("Unexpected body type.");
+ throw new RuntimeException("Unexpected body type: "+body.getClass());
}
rc.setJMSDeliveryMode(defaultDeliveryMode);
rc.setJMSPriority(defaultPriority);
Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java Thu Oct 4 21:00:36 2012
@@ -34,7 +34,9 @@ public class SwiftMQClientTest extends A
public void testSendReceive() throws Exception {
String queue = "testqueue";
- int nMsgs = 1;
+ int nMsgs = 100;
+ final String dataFormat = "%01024d";
+
int qos = QoS.AT_MOST_ONCE;
AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
@@ -57,28 +59,34 @@ public class SwiftMQClientTest extends A
for (int i = 0; i < nMsgs; i++) {
AMQPMessage msg = new AMQPMessage();
System.out.println("Sending " + i);
- msg.setAmqpValue(new AmqpValue(new AMQPString(String.format("%010d", i))));
+ msg.setAmqpValue(new AmqpValue(new AMQPString(String.format(dataFormat, i))));
p.send(msg);
}
p.close();
session.close();
}
-
+ System.out.println("=======================================================================================");
+ System.out.println(" receiving ");
+ System.out.println("=======================================================================================");
{
Session session = connection.createSession(10, 10);
Consumer c = session.createConsumer(queue, 100, qos, true, null);
// Receive messages non-transacted
- for (int i = 0; i < nMsgs; i++) {
+ int i = 0;
+ while ( i < nMsgs) {
AMQPMessage msg = c.receive();
- final AMQPType value = msg.getAmqpValue().getValue();
- if (value instanceof AMQPString) {
- String s = ((AMQPString) value).getValue();
- assertEquals(String.format("%010d", i), s);
- System.out.println("Received: " + i);
+ if( msg!=null ) {
+ final AMQPType value = msg.getAmqpValue().getValue();
+ if (value instanceof AMQPString) {
+ String s = ((AMQPString) value).getValue();
+ assertEquals(String.format(dataFormat, i), s);
+ System.out.println("Received: " + i);
+ }
+ if (!msg.isSettled())
+ msg.accept();
+ i++;
}
- if (!msg.isSettled())
- msg.accept();
}
c.close();
session.close();