You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/04 23:00:36 UTC

svn commit: r1394266 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/ main/java/org/apache/activemq/transport/amqp/transform/ test/java/org/apache/activemq/transport/amqp/

Author: chirino
Date: Thu Oct  4 21:00:36 2012
New Revision: 1394266

URL: http://svn.apache.org/viewvc?rev=1394266&view=rev
Log:
AMQP: Sending and receiving large number of messages was broken.

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Thu Oct  4 21:00:36 2012
@@ -305,11 +305,18 @@ class AmqpProtocolConverter {
 
         @Override
         public void onDelivery(Delivery delivery) throws Exception {
-            if( current ==null ) {
+            Receiver receiver = ((Receiver)delivery.getLink());
+            if( !delivery.isReadable() ) {
+                System.out.println("it was not readable!");
+//                delivery.settle();
+//                receiver.advance();
+                return;
+            }
+
+            if( current==null ) {
                 current = new ByteArrayOutputStream();
             }
 
-            Receiver receiver = ((Receiver)delivery.getLink());
             int count;
             byte data[] = new byte[1024*4];
             while( (count = receiver.recv(data, 0, data.length)) > 0 ) {
@@ -321,6 +328,9 @@ class AmqpProtocolConverter {
                 return;
             }
 
+            receiver.advance();
+            delivery.settle();
+
             final Buffer buffer = current.toBuffer();
             EncodedMessage em = new EncodedMessage(delivery.getMessageFormat(), buffer.data, buffer.offset, buffer.length);
             final ActiveMQMessage message = (ActiveMQMessage) inboundTransformer.transform(em);
@@ -410,18 +420,21 @@ class AmqpProtocolConverter {
             pumpProtonToSocket();
         }
 
-        Buffer current;
+        Buffer currentBuffer;
+        Delivery currentDelivery;
 
         public void pumpOutbound() {
             while(true) {
 
-                while( current!=null ) {
-                    int sent = sender.send(current.data, current.offset, current.length);
+                while( currentBuffer !=null ) {
+                    int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
                     if( sent > 0 ) {
-                        current.moveHead(sent);
-                        if( current.length == 0 ) {
+                        currentBuffer.moveHead(sent);
+                        if( currentBuffer.length == 0 ) {
+                            currentDelivery.settle();
                             sender.advance();
-                            current = null;
+                            currentBuffer = null;
+                            currentDelivery = null;
                         }
                     } else {
                         return;
@@ -438,10 +451,10 @@ class AmqpProtocolConverter {
                     final EncodedMessage amqp = outboundTransformer.transform(jms);
                     if( amqp!=null && amqp.getLength() > 0 ) {
 
-                        current = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
+                        currentBuffer = new Buffer(amqp.getArray(), amqp.getArrayOffset(), amqp.getLength());
                         final byte[] tag = nextTag();
-                        final Delivery delivery = sender.delivery(tag, 0, tag.length);
-                        delivery.setContext(md);
+                        currentDelivery = sender.delivery(tag, 0, tag.length);
+                        currentDelivery.setContext(md);
 
                     } else {
                         // TODO: message could not be generated what now?

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Thu Oct  4 21:00:36 2012
@@ -59,7 +59,9 @@ public class AmqpTransportFilter extends
     public void oneway(Object o) throws IOException {
         try {
             final Command command = (Command) o;
-            protocolConverter.onActiveMQCommand(command);
+            synchronized (protocolConverter) {
+                protocolConverter.onActiveMQCommand(command);
+            }
         } catch (Exception e) {
             throw IOExceptionSupport.create(e);
         }
@@ -70,8 +72,9 @@ public class AmqpTransportFilter extends
             if (trace) {
                 TRACE.trace("Received: \n" + command);
             }
-
-            protocolConverter.onAMQPData((Buffer) command);
+            synchronized (protocolConverter) {
+                protocolConverter.onAMQPData((Buffer) command);
+            }
         } catch (IOException e) {
             handleException(e);
         } catch (JMSException e) {

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Thu Oct  4 21:00:36 2012
@@ -52,7 +52,9 @@ public class JMSMappingInboundTransforme
 
         Message rc;
         final Section body = amqp.getBody();
-        if( body instanceof Data ) {
+        if( body == null ) {
+            rc = vendor.createMessage();
+        } else if( body instanceof Data ) {
             Binary d = ((Data) body).getValue();
             BytesMessage m = vendor.createBytesMessage();
             m.writeBytes(d.getArray(), d.getArrayOffset(), d.getLength());
@@ -91,7 +93,7 @@ public class JMSMappingInboundTransforme
 //                    jms = m;
             }
         } else {
-            throw new RuntimeException("Unexpected body type.");
+            throw new RuntimeException("Unexpected body type: "+body.getClass());
         }
         rc.setJMSDeliveryMode(defaultDeliveryMode);
         rc.setJMSPriority(defaultPriority);

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java?rev=1394266&r1=1394265&r2=1394266&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java Thu Oct  4 21:00:36 2012
@@ -34,7 +34,9 @@ public class SwiftMQClientTest extends A
     public void testSendReceive() throws Exception {
 
         String queue = "testqueue";
-        int nMsgs = 1;
+        int nMsgs = 100;
+        final String dataFormat = "%01024d";
+
         int qos = QoS.AT_MOST_ONCE;
         AMQPContext ctx = new AMQPContext(AMQPContext.CLIENT);
 
@@ -57,28 +59,34 @@ public class SwiftMQClientTest extends A
                 for (int i = 0; i < nMsgs; i++) {
                     AMQPMessage msg = new AMQPMessage();
                     System.out.println("Sending " + i);
-                    msg.setAmqpValue(new AmqpValue(new AMQPString(String.format("%010d", i))));
+                    msg.setAmqpValue(new AmqpValue(new AMQPString(String.format(dataFormat, i))));
                     p.send(msg);
                 }
                 p.close();
                 session.close();
             }
-
+            System.out.println("=======================================================================================");
+            System.out.println(" receiving ");
+            System.out.println("=======================================================================================");
             {
                 Session session = connection.createSession(10, 10);
                 Consumer c = session.createConsumer(queue, 100, qos, true, null);
 
                 // Receive messages non-transacted
-                for (int i = 0; i < nMsgs; i++) {
+                int i = 0;
+                while ( i < nMsgs) {
                     AMQPMessage msg = c.receive();
-                    final AMQPType value = msg.getAmqpValue().getValue();
-                    if (value instanceof AMQPString) {
-                        String s = ((AMQPString) value).getValue();
-                        assertEquals(String.format("%010d", i), s);
-                        System.out.println("Received: " + i);
+                    if( msg!=null ) {
+                        final AMQPType value = msg.getAmqpValue().getValue();
+                        if (value instanceof AMQPString) {
+                            String s = ((AMQPString) value).getValue();
+                            assertEquals(String.format(dataFormat, i), s);
+                            System.out.println("Received: " + i);
+                        }
+                        if (!msg.isSettled())
+                            msg.accept();
+                        i++;
                     }
-                    if (!msg.isSettled())
-                        msg.accept();
                 }
                 c.close();
                 session.close();