You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by de...@apache.org on 2012/10/24 15:32:32 UTC

svn commit: r1401674 - in /activemq/trunk/activemq-amqp/src: main/java/org/apache/activemq/transport/amqp/ main/java/org/apache/activemq/transport/amqp/transform/ test/java/org/apache/activemq/transport/amqp/

Author: dejanb
Date: Wed Oct 24 13:32:32 2012
New Revision: 1401674

URL: http://svn.apache.org/viewvc?rev=1401674&view=rev
Log:
initial amqp selector support

Modified:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1401674&r1=1401673&r2=1401674&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Oct 24 13:32:32 2012
@@ -22,12 +22,16 @@ import org.apache.activemq.transport.amq
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
+import org.apache.qpid.proton.codec.Decoder;
+import org.apache.qpid.proton.codec.DecoderImpl;
 import org.apache.qpid.proton.engine.*;
 import org.apache.qpid.proton.engine.impl.ConnectionImpl;
 import org.apache.qpid.proton.engine.impl.ProtocolTracer;
 import org.apache.qpid.proton.engine.impl.TransportImpl;
 import org.apache.qpid.proton.framing.TransportFrame;
 import org.apache.qpid.proton.type.Binary;
+import org.apache.qpid.proton.type.DescribedType;
+import org.apache.qpid.proton.type.Symbol;
 import org.apache.qpid.proton.type.messaging.*;
 import org.apache.qpid.proton.type.messaging.Modified;
 import org.apache.qpid.proton.type.messaging.Rejected;
@@ -359,7 +363,8 @@ class AmqpProtocolConverter {
         }
     }
 
-    InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    //InboundTransformer inboundTransformer = new AMQPNativeInboundTransformer(ActiveMQJMSVendor.INSTANCE);
+    InboundTransformer inboundTransformer = new JMSMappingInboundTransformer(ActiveMQJMSVendor.INSTANCE);
 
     abstract class BaseProducerContext extends AmqpDeliveryListener {
 
@@ -776,7 +781,6 @@ class AmqpProtocolConverter {
     private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
 
     void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
-
         // sender.get
         ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
         ConsumerContext consumerContext = new ConsumerContext(id, sender);
@@ -810,6 +814,11 @@ class AmqpProtocolConverter {
         consumerInfo.setDestination(dest);
         consumerInfo.setPrefetchSize(100);
         consumerInfo.setDispatchAsync(true);
+        Map filter = ((org.apache.qpid.proton.type.messaging.Source)remoteSource).getFilter();
+        if (filter != null) {
+            DescribedType type = (DescribedType)filter.get(Symbol.valueOf("jms-selector"));
+            consumerInfo.setSelector(type.getDescribed().toString());
+        }
 
         sendToActiveMQ(consumerInfo, new ResponseHandler() {
             public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java?rev=1401674&r1=1401673&r2=1401674&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java Wed Oct 24 13:32:32 2012
@@ -182,7 +182,7 @@ public class JMSMappingInboundTransforme
         }
 
         final ApplicationProperties ap = amqp.getApplicationProperties();
-        if( da!=null ) {
+        if( ap !=null ) {
             for (Map.Entry entry : (Set<Map.Entry>)ap.getValue().entrySet()) {
                 String key = entry.getKey().toString();
                 setProperty(rc, key, entry.getValue());
@@ -190,7 +190,7 @@ public class JMSMappingInboundTransforme
         }
 
         final Footer fp = amqp.getFooter();
-        if( da!=null ) {
+        if( fp !=null ) {
             for (Map.Entry entry : (Set<Map.Entry>)fp.getValue().entrySet()) {
                 String key = entry.getKey().toString();
                 setProperty(rc, prefixVendor + prefixFooter + key, entry.getValue());
@@ -203,12 +203,13 @@ public class JMSMappingInboundTransforme
     }
 
     private void setProperty(Message msg, String key, Object value) throws JMSException {
+        //TODO support all types
         if( value instanceof String ) {
             msg.setStringProperty(key, (String) value);
-//        } else if( value instanceof Integer ) {
-//            msg.setIntProperty(key, ((Integer) value).intValue());
-//        } else if( value instanceof Long ) {
-//            msg.setLongProperty(key, ((Long) value).longValue());
+        } else if( value instanceof Integer ) {
+            msg.setIntProperty(key, ((Integer) value).intValue());
+        } else if( value instanceof Long ) {
+            msg.setLongProperty(key, ((Long) value).longValue());
         } else {
             throw new RuntimeException("Unexpected value type: "+value.getClass());
         }

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1401674&r1=1401673&r2=1401674&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Wed Oct 24 13:32:32 2012
@@ -24,6 +24,7 @@ import org.junit.Test;
 import javax.jms.*;
 
 import static org.junit.Assert.assertEquals;
+import static org.junit.Assert.assertNotNull;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -39,13 +40,15 @@ public class JMSClientTest extends AmqpT
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             MessageProducer p = session.createProducer(queue);
-            p.send(session.createTextMessage("Hello World"));
+            Message msg = session.createTextMessage("Hello World");
+            msg.setObjectProperty("x", 1);
+            p.send(msg);
 //            session.commit();
-
-            MessageConsumer c = session.createConsumer(queue);
-            Message msg = c.receive();
-            System.out.println("first:"+msg);
-            System.out.println(msg.getJMSRedelivered());
+            MessageConsumer c = session.createConsumer(queue, "x = 1");
+            Message received = c.receive(2000);
+            assertNotNull(received);
+            System.out.println("first: " + ((TextMessage)received).getText());
+            System.out.println(received.getJMSRedelivered());
 
 //            session.rollback();
 //