You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/10/04 00:01:08 UTC

svn commit: r1393790 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/ main/java/org/apache/activemq/transport/amqp/transform/ test/java/org/apache/activemq/transport/amqp/

Author: chirino
Date: Wed Oct  3 22:01:08 2012
New Revision: 1393790

URL: http://svn.apache.org/viewvc?rev=1393790&view=rev
Log:
AMQP impl: A simple send and receive is now working

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1393790&r1=1393789&r2=1393790&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct  3 22:01:08 2012
@@ -86,7 +86,7 @@ class AmqpProtocolConverter {
                 int count = protonTransport.output(data, 0, size);
                 if (count > 0) {
                     final Buffer buffer = new Buffer(data, 0, count);
-                    System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
+//                    System.out.println("writing: " + buffer.toString().substring(5).replaceAll("(..)", "$1 "));
                     amqpTransport.sendToAmqp(buffer);
                 } else {
                     done = true;
@@ -116,7 +116,7 @@ class AmqpProtocolConverter {
 
 
         try {
-            System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
+//            System.out.println("reading: " + frame.toString().substring(5).replaceAll("(..)", "$1 "));
             protonTransport.input(frame.data, frame.offset, frame.length);
         } catch (Throwable e) {
             handleException(new AmqpProtocolException("Could not decode AMQP frame: " + frame, true, e));

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java?rev=1393790&r1=1393789&r2=1393790&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java Wed Oct  3 22:01:08 2012
@@ -45,7 +45,7 @@ public class AMQPNativeInboundTransforme
         }
 
         rc.setLongProperty(prefixVendor + "MESSAGE_FORMAT", messageFormat);
-        rc.setBooleanProperty(prefixVendor + "NATIVE", false);
+        rc.setBooleanProperty(prefixVendor + "NATIVE", true);
         return rc;
     }
 }

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java?rev=1393790&r1=1393789&r2=1393790&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/SwiftMQClientTest.java Wed Oct  3 22:01:08 2012
@@ -23,6 +23,7 @@ import com.swiftmq.amqp.v100.messaging.A
 import com.swiftmq.amqp.v100.types.AMQPString;
 import com.swiftmq.amqp.v100.types.AMQPType;
 import org.junit.Test;
+import static org.junit.Assert.*;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -50,39 +51,38 @@ public class SwiftMQClientTest extends A
             });
             connection.connect();
             {
-                String data = String.format("%010d", 0);
 
                 Session session = connection.createSession(10, 10);
                 Producer p = session.createProducer(queue, qos);
                 for (int i = 0; i < nMsgs; i++) {
                     AMQPMessage msg = new AMQPMessage();
-                    String s = "Message #" + (i + 1);
-                    System.out.println("Sending " + s);
-                    msg.setAmqpValue(new AmqpValue(new AMQPString(s + ", data: " + data)));
+                    System.out.println("Sending " + i);
+                    msg.setAmqpValue(new AmqpValue(new AMQPString(String.format("%010d", i))));
                     p.send(msg);
                 }
                 p.close();
                 session.close();
             }
 
-//            {
-//                Session session = connection.createSession(10, 10);
-//                Consumer c = session.createConsumer(queue, 100, qos, true, null);
-//
-//                // Receive messages non-transacted
-//                for (int i = 0; i < nMsgs; i++) {
-//                    AMQPMessage msg = c.receive();
-//                    final AMQPType value = msg.getAmqpValue().getValue();
-//                    if (value instanceof AMQPString) {
-//                        AMQPString s = (AMQPString) value;
-//                        System.out.println("Received: " + s.getValue());
-//                    }
-//                    if (!msg.isSettled())
-//                        msg.accept();
-//                }
-//                c.close();
-//                session.close();
-//            }
+            {
+                Session session = connection.createSession(10, 10);
+                Consumer c = session.createConsumer(queue, 100, qos, true, null);
+
+                // Receive messages non-transacted
+                for (int i = 0; i < nMsgs; i++) {
+                    AMQPMessage msg = c.receive();
+                    final AMQPType value = msg.getAmqpValue().getValue();
+                    if (value instanceof AMQPString) {
+                        String s = ((AMQPString) value).getValue();
+                        assertEquals(String.format("%010d", i), s);
+                        System.out.println("Received: " + i);
+                    }
+                    if (!msg.isSettled())
+                        msg.accept();
+                }
+                c.close();
+                session.close();
+            }
             connection.close();
         } catch (Exception e) {
             e.printStackTrace();