You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@activemq.apache.org by ch...@apache.org on 2012/11/29 00:16:44 UTC

svn commit: r1414990 - in /activemq/trunk: ./ activemq-amqp/ activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/ activemq-amqp/src/test/java/org/apache/activemq/trans...

Author: chirino
Date: Wed Nov 28 23:16:42 2012
New Revision: 1414990

URL: http://svn.apache.org/viewvc?rev=1414990&view=rev
Log:
Moved the JMS mapping logic into a proton module.

Removed:
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPNativeOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AMQPRawInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/AutoOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/DroppingWritableBuffer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/EncodedMessage.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/InboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingInboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSMappingOutboundTransformer.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/JMSVendor.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/transform/OutboundTransformer.java
Modified:
    activemq/trunk/activemq-amqp/pom.xml
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
    activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
    activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
    activemq/trunk/assembly/src/main/descriptors/common-bin.xml
    activemq/trunk/pom.xml

Modified: activemq/trunk/activemq-amqp/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/pom.xml?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/pom.xml (original)
+++ activemq/trunk/activemq-amqp/pom.xml Wed Nov 28 23:16:42 2012
@@ -44,7 +44,7 @@
 
     <dependency>
       <groupId>org.apache.qpid</groupId>
-      <artifactId>proton</artifactId>
+      <artifactId>proton-jms</artifactId>
       <version>${qpid-proton-version}</version>
     </dependency>
 

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/ActiveMQJMSVendor.java Wed Nov 28 23:16:42 2012
@@ -1,7 +1,7 @@
 package org.apache.activemq.transport.amqp;
 
 import org.apache.activemq.command.*;
-import org.apache.activemq.transport.amqp.transform.JMSVendor;
+import org.apache.qpid.proton.jms.JMSVendor;
 
 import javax.jms.*;
 import javax.jms.Message;

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpProtocolConverter.java Wed Nov 28 23:16:42 2012
@@ -18,16 +18,18 @@ package org.apache.activemq.transport.am
 
 import org.apache.activemq.broker.BrokerContext;
 import org.apache.activemq.command.*;
-import org.apache.activemq.transport.amqp.transform.*;
+import org.apache.activemq.selector.SelectorParser;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.util.IdGenerator;
 import org.apache.activemq.util.LongSequenceGenerator;
 import org.apache.qpid.proton.engine.*;
 import org.apache.qpid.proton.engine.impl.*;
 import org.apache.qpid.proton.framing.TransportFrame;
+import org.apache.qpid.proton.jms.*;
 import org.apache.qpid.proton.type.Binary;
 import org.apache.qpid.proton.type.DescribedType;
 import org.apache.qpid.proton.type.Symbol;
+import org.apache.qpid.proton.type.UnsignedInteger;
 import org.apache.qpid.proton.type.messaging.*;
 import org.apache.qpid.proton.type.messaging.Modified;
 import org.apache.qpid.proton.type.messaging.Rejected;
@@ -60,6 +62,8 @@ class AmqpProtocolConverter {
     private static final Symbol COPY = Symbol.getSymbol("copy");
     private static final Symbol JMS_SELECTOR = Symbol.valueOf("jms-selector");
     private static final Symbol NO_LOCAL = Symbol.valueOf("no-local");
+    private static final UnsignedInteger DURABLE = new UnsignedInteger(2);
+    private static final Symbol DURABLE_SUBSCRIPTION_ENDED = Symbol.getSymbol("DURABLE_SUBSCRIPTION_ENDED");
 
     public AmqpProtocolConverter(AmqpTransport amqpTransport, BrokerContext brokerContext) {
         this.amqpTransport = amqpTransport;
@@ -86,7 +90,7 @@ class AmqpProtocolConverter {
 //    private String clientId;
 //    private final String QOS_PROPERTY_NAME = "QoSPropertyName";
     int prefetch = 100;
-    boolean trace = false;
+    boolean trace = true;
 
     TransportImpl protonTransport = new TransportImpl();
     ConnectionImpl protonConnection = new ConnectionImpl();
@@ -345,8 +349,6 @@ class AmqpProtocolConverter {
         String clientId = protonConnection.getRemoteContainer();
         if (clientId != null && !clientId.isEmpty()) {
             connectionInfo.setClientId(clientId);
-        } else {
-            connectionInfo.setClientId("" + connectionInfo.getConnectionId().toString());
         }
 
 
@@ -679,6 +681,7 @@ class AmqpProtocolConverter {
         private final ConsumerId consumerId;
         private final Sender sender;
         private boolean presettle;
+        private boolean closed;
 
         public ConsumerContext(ConsumerId consumerId, Sender sender) {
             this.consumerId = consumerId;
@@ -714,23 +717,28 @@ class AmqpProtocolConverter {
 
         @Override
         public void onClose() throws Exception {
-            sendToActiveMQ(new RemoveInfo(consumerId), null);
+            if( !closed ) {
+                closed = true;
+                sendToActiveMQ(new RemoveInfo(consumerId), null);
+            }
         }
 
         LinkedList<MessageDispatch> outbound = new LinkedList<MessageDispatch>();
 
         // called when the connection receives a JMS message from ActiveMQ
         public void onMessageDispatch(MessageDispatch md) throws Exception {
-            outbound.addLast(md);
-            pumpOutbound();
-            pumpProtonToSocket();
+            if( !closed ) {
+                outbound.addLast(md);
+                pumpOutbound();
+                pumpProtonToSocket();
+            }
         }
 
         Buffer currentBuffer;
         Delivery currentDelivery;
 
         public void pumpOutbound() throws Exception {
-            while(true) {
+            while(!closed) {
 
                 while( currentBuffer !=null ) {
                     int sent = sender.send(currentBuffer.data, currentBuffer.offset, currentBuffer.length);
@@ -876,41 +884,94 @@ class AmqpProtocolConverter {
     private final ConcurrentHashMap<ConsumerId, ConsumerContext> subscriptionsByConsumerId = new ConcurrentHashMap<ConsumerId, ConsumerContext>();
 
     void onSenderOpen(final Sender sender, AmqpSessionContext sessionContext) {
-        // sender.get
-        ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
+        org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
+
+        final ConsumerId id = new ConsumerId(sessionContext.sessionId, sessionContext.nextConsumerId++);
         ConsumerContext consumerContext = new ConsumerContext(id, sender);
+        sender.setContext(consumerContext);
 
-        subscriptionsByConsumerId.put(id, consumerContext);
+        String selector = null;
+        if( source!=null ) {
+            Map filter = source.getFilter();
+            if (filter != null) {
+                DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
+                if( value!=null ) {
+                    selector = value.getDescribed().toString();
+                    // Validate the Selector.
+                    try {
+                        SelectorParser.parse(selector);
+                    } catch (InvalidSelectorException e) {
+                        sender.setSource(null);
+                        ((LinkImpl)sender).setLocalError(new EndpointError("amqp:invalid-field", e.getMessage()));
+                        sender.close();
+                        consumerContext.closed = true;
+                        return;
+                    }
+                }
+            }
+        }
 
         ActiveMQDestination dest;
-        org.apache.qpid.proton.type.messaging.Source source = (org.apache.qpid.proton.type.messaging.Source)sender.getRemoteSource();
-        if( source != null && !source.getDynamic() ) {
-            dest = createDestination(source);
-        } else {
+        if( source == null ) {
+
+            source = new org.apache.qpid.proton.type.messaging.Source();
+            source.setAddress("");
+            source.setCapabilities(DURABLE_SUBSCRIPTION_ENDED);
+            sender.setSource(source);
+
+            // Looks like durable sub removal.
+            RemoveSubscriptionInfo rsi = new RemoveSubscriptionInfo();
+            rsi.setConnectionId(connectionId);
+            rsi.setSubscriptionName(sender.getName());
+            rsi.setClientId(connectionInfo.getClientId());
+
+            consumerContext.closed=true;
+            sendToActiveMQ(rsi, new ResponseHandler() {
+                public void onResponse(AmqpProtocolConverter converter, Response response) throws IOException {
+                    if (response.isException()) {
+                        sender.setSource(null);
+                        Throwable exception = ((ExceptionResponse) response).getException();
+                        String name = exception.getClass().getName();
+                        ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                    }
+                    sender.open();
+                    pumpProtonToSocket();
+                }
+            });
+            return;
+        } else if( contains(source.getCapabilities(), DURABLE_SUBSCRIPTION_ENDED) ) {
+            consumerContext.closed=true;
+            sender.close();
+            pumpProtonToSocket();
+            return;
+        } else if( source.getDynamic() ) {
             // lets create a temp dest.
             dest = createTempQueue();
             source = new org.apache.qpid.proton.type.messaging.Source();
             source.setAddress(dest.getQualifiedName());
             source.setDynamic(true);
             sender.setSource(source);
+        } else {
+            dest = createDestination(source);
         }
 
-        sender.setContext(consumerContext);
+        subscriptionsByConsumerId.put(id, consumerContext);
         ConsumerInfo consumerInfo = new ConsumerInfo(id);
+        consumerInfo.setSelector(selector);
+        consumerInfo.setNoRangeAcks(true);
         consumerInfo.setDestination(dest);
         consumerInfo.setPrefetchSize(100);
         consumerInfo.setDispatchAsync(true);
         if( source.getDistributionMode() == COPY && dest.isQueue() ) {
             consumerInfo.setBrowser(true);
         }
+        if( DURABLE.equals(source.getDurable()) && dest.isTopic() ) {
+            consumerInfo.setSubscriptionName(sender.getName());
+        }
 
         Map filter = source.getFilter();
         if (filter != null) {
-            DescribedType value = (DescribedType)filter.get(JMS_SELECTOR);
-            if( value!=null ) {
-                consumerInfo.setSelector(value.getDescribed().toString());
-            }
-            value = (DescribedType)filter.get(NO_LOCAL);
+            DescribedType value = (DescribedType)filter.get(NO_LOCAL);
             if( value!=null ) {
                 consumerInfo.setNoLocal(true);
             }
@@ -926,6 +987,7 @@ class AmqpProtocolConverter {
                         name = "amqp:invalid-field";
                     }
                     ((LinkImpl)sender).setLocalError(new EndpointError(name, exception.getMessage()));
+                    subscriptionsByConsumerId.remove(id);
                     sender.close();
                 } else {
                     sender.open();
@@ -936,6 +998,17 @@ class AmqpProtocolConverter {
 
     }
 
+    static private boolean contains(Symbol[] haystack, Symbol needle) {
+        if( haystack!=null ) {
+            for (Symbol capability : haystack) {
+                if( capability == needle) {
+                    return true;
+                }
+            }
+        }
+        return false;
+    }
+
     private ActiveMQDestination createTempQueue() {
         ActiveMQDestination rc;
         rc = new ActiveMQTempQueue(connectionId, nextTempDestinationId++);

Modified: activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java (original)
+++ activemq/trunk/activemq-amqp/src/main/java/org/apache/activemq/transport/amqp/AmqpTransportFilter.java Wed Nov 28 23:16:42 2012
@@ -21,7 +21,7 @@ import org.apache.activemq.command.Comma
 import org.apache.activemq.transport.Transport;
 import org.apache.activemq.transport.TransportFilter;
 import org.apache.activemq.transport.TransportListener;
-import org.apache.activemq.transport.amqp.transform.InboundTransformer;
+import org.apache.qpid.proton.jms.InboundTransformer;
 import org.apache.activemq.transport.tcp.SslTransport;
 import org.apache.activemq.util.IOExceptionSupport;
 import org.apache.activemq.wireformat.WireFormat;

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/JMSClientTest.java Wed Nov 28 23:16:42 2012
@@ -20,6 +20,7 @@ import org.apache.activemq.transport.amq
 import org.apache.qpid.amqp_1_0.jms.impl.ConnectionFactoryImpl;
 import org.apache.qpid.amqp_1_0.jms.impl.QueueImpl;
 import org.junit.Test;
+import org.objectweb.jtests.jms.framework.TestConfig;
 
 import javax.jms.*;
 
@@ -27,6 +28,7 @@ import java.util.Enumeration;
 
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
+import static org.junit.Assert.assertTrue;
 
 /**
  * @author <a href="http://hiramchirino.com">Hiram Chirino</a>
@@ -42,30 +44,21 @@ public class JMSClientTest extends AmqpT
         {
             Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
             MessageProducer p = session.createProducer(queue);
-            Message msg = session.createTextMessage("Hello World");
-            msg.setObjectProperty("x", 1);
-            p.send(msg);
-//            session.commit();
-/*            MessageConsumer c = session.createConsumer(queue, "x = 1");
-            Message received = c.receive(2000);
-            assertNotNull(received);
-            System.out.println("first: " + ((TextMessage)received).getText());
-            System.out.println(received.getJMSRedelivered());*/
 
+            TextMessage message = session.createTextMessage();
+            message.setText("hello");
+            p.send(message);
 
             QueueBrowser browser = session.createBrowser(queue);
             Enumeration enumeration = browser.getEnumeration();
             while (enumeration.hasMoreElements()) {
-
-                System.out.println("BROWSE " + enumeration.nextElement());
+                Message m = (Message) enumeration.nextElement();
+                assertTrue(m instanceof TextMessage);
             }
 
-
-//            session.rollback();
-//
-//            msg = c.receive();
-//            System.out.println("second:"+msg);
-//            System.out.println(msg.getJMSRedelivered());
+            MessageConsumer consumer = session.createConsumer(queue);
+            Message msg = consumer.receive(TestConfig.TIMEOUT);
+            assertTrue(message instanceof TextMessage);
         }
         connection.close();
 

Modified: activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java
URL: http://svn.apache.org/viewvc/activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java (original)
+++ activemq/trunk/activemq-amqp/src/test/java/org/apache/activemq/transport/amqp/joram/JoramJmsTest.java Wed Nov 28 23:16:42 2012
@@ -52,19 +52,21 @@ public class JoramJmsTest extends TestCa
         TestSuite suite = new TestSuite();
 
         // TODO: Fix these tests..
-        if (false) {
-            // Fails due to durable subs not being implemented.
-            suite.addTestSuite(TopicSessionTest.class);
-            // Fails due to https://issues.apache.org/jira/browse/PROTON-110 and DestinationImpl vs QueueImpl mapping issues
+        if (true) {
+            // Fails due to https://issues.apache.org/jira/browse/QPID-4454
             suite.addTestSuite(MessageHeaderTest.class);
-            // Fails due to inconsistent Message mapping in the JMS client.
-            suite.addTestSuite(MessageTypeTest.class);
+            // Fails due to https://issues.apache.org/jira/browse/QPID-4455
             suite.addTestSuite(QueueBrowserTest.class);
+        }
 
+        // TODO: enable once QPID 0.21 is released
+        if(true) {
+            suite.addTestSuite(MessageTypeTest.class);
         }
 
+        suite.addTestSuite(TopicSessionTest.class);
         // TODO: enable once QPID 0.19 is released
-        if(false) {
+        if(true) {
             suite.addTestSuite(UnifiedSessionTest.class);
             suite.addTestSuite(TemporaryTopicTest.class);
             suite.addTestSuite(TopicConnectionTest.class);

Modified: activemq/trunk/assembly/src/main/descriptors/common-bin.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/assembly/src/main/descriptors/common-bin.xml?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/assembly/src/main/descriptors/common-bin.xml (original)
+++ activemq/trunk/assembly/src/main/descriptors/common-bin.xml Wed Nov 28 23:16:42 2012
@@ -183,7 +183,8 @@
         <include>org.jasypt:jasypt</include>
         <include>org.jasypt:jasypt-spring3</include>
         <include>javax.jmdns:jmdns</include>
-        <include>org.apache.qpid:qpid-proton</include>
+        <include>org.apache.qpid:proton</include>
+        <include>org.apache.qpid:proton-jms</include>
       </includes>
     </dependencySet>
     <dependencySet>

Modified: activemq/trunk/pom.xml
URL: http://svn.apache.org/viewvc/activemq/trunk/pom.xml?rev=1414990&r1=1414989&r2=1414990&view=diff
==============================================================================
--- activemq/trunk/pom.xml (original)
+++ activemq/trunk/pom.xml Wed Nov 28 23:16:42 2012
@@ -92,7 +92,7 @@
     <org.osgi.core-version>4.2.0</org.osgi.core-version>
     <p2psockets-version>1.1.2</p2psockets-version>
     <qpid-proton-version>1.0-SNAPSHOT</qpid-proton-version>
-    <qpid-jms-version>0.18</qpid-jms-version>
+    <qpid-jms-version>0.21-SNAPSHOT</qpid-jms-version>
     <regexp-version>1.3</regexp-version>
     <rome-version>1.0</rome-version>
     <saxon-version>9.4</saxon-version>