You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:08 UTC

[7/7] qpid-jms git commit: QPIDJMS-207 Adds support for the JMS 2.0 Delayed Delivery feature

QPIDJMS-207 Adds support for the JMS 2.0 Delayed Delivery feature

Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6e442f4c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6e442f4c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6e442f4c

Branch: refs/heads/master
Commit: 6e442f4c6aa1401a14031c6f2f05d7edbd58037c
Parents: 0c39522
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 12 15:20:25 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 12 15:20:25 2016 -0400

----------------------------------------------------------------------
 .../message/JmsMessagePropertyIntercepter.java  |  32 +++++
 .../qpid/jms/message/JmsMessageSupport.java     |   1 +
 .../provider/amqp/AmqpConnectionProperties.java |  23 ++++
 .../jms/provider/amqp/AmqpFixedProducer.java    |  13 +-
 .../qpid/jms/provider/amqp/AmqpSupport.java     |   1 +
 .../amqp/message/AmqpJmsMessageFacade.java      |  16 ++-
 .../amqp/message/AmqpMessageSupport.java        |   5 +
 .../integration/ProducerIntegrationTest.java    |  82 +++++++++++++
 .../JmsMessagePropertyIntercepterTest.java      | 110 +++++++++++++++++
 .../amqp/message/AmqpJmsMessageFacadeTest.java  |  10 ++
 .../transports/netty/NettySimpleAmqpServer.java | 123 ++++++++++++++++++-
 11 files changed, 407 insertions(+), 9 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index bb2eb0b..65c8c9a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID;
+import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERYTIME;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_EXPIRATION;
@@ -135,6 +136,7 @@ public class JmsMessagePropertyIntercepter {
         STANDARD_HEADERS.add(JMS_TYPE);
         STANDARD_HEADERS.add(JMS_EXPIRATION);
         STANDARD_HEADERS.add(JMS_PRIORITY);
+        STANDARD_HEADERS.add(JMS_DELIVERYTIME);
 
         VENDOR_PROPERTIES.add(JMS_AMQP_ACK_TYPE);
 
@@ -638,6 +640,36 @@ public class JmsMessagePropertyIntercepter {
                 return true;
             }
         });
+        PROPERTY_INTERCEPTERS.put(JMS_DELIVERYTIME, new PropertyIntercepter() {
+            @Override
+            public Object getProperty(JmsMessage message) throws JMSException {
+                return Long.valueOf(message.getFacade().getDeliveryTime());
+            }
+
+            @Override
+            public void setProperty(JmsMessage message, Object value) throws JMSException {
+                Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+                if (rc == null) {
+                    throw new JMSException("Property JMSDeliveryTime cannot be set from a " + value.getClass().getName() + ".");
+                }
+                message.getFacade().setDeliveryTime(rc.longValue());
+            }
+
+            @Override
+            public boolean propertyExists(JmsMessage message) {
+                return message.getFacade().getDeliveryTime() > 0;
+            }
+
+            @Override
+            public void clearProperty(JmsMessage message) {
+                message.getFacade().setDeliveryTime(0);
+            }
+
+            @Override
+            public boolean isAlwaysWritable() {
+                return false;
+            }
+        });
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
index c3ce451..657542c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
@@ -31,6 +31,7 @@ public class JmsMessageSupport {
     public static final String JMS_CORRELATIONID = "JMSCorrelationID";
     public static final String JMS_EXPIRATION = "JMSExpiration";
     public static final String JMS_REDELIVERED = "JMSRedelivered";
+    public static final String JMS_DELIVERYTIME = "JMSDeliveryTime";
 
     public static final String JMSX_GROUPID = "JMSXGroupID";
     public static final String JMSX_GROUPSEQ = "JMSXGroupSeq";

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index 79a0d95..c090853 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.amqp;
 
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX;
 
@@ -41,6 +42,7 @@ public class AmqpConnectionProperties {
 
     private final JmsConnectionInfo connectionInfo;
 
+    private boolean delayedDeliverySupported = false;
     private boolean anonymousRelaySupported = false;
     private boolean connectionOpenFailed = false;
 
@@ -78,6 +80,10 @@ public class AmqpConnectionProperties {
         if (list.contains(ANONYMOUS_RELAY)) {
             anonymousRelaySupported = true;
         }
+
+        if (list.contains(DELAYED_DELIVERY)) {
+            delayedDeliverySupported = true;
+        }
     }
 
     protected void processProperties(Map<Symbol, Object> properties) {
@@ -104,6 +110,23 @@ public class AmqpConnectionProperties {
     }
 
     /**
+     * @return true if the connection supports sending message with delivery delays.
+     */
+    public boolean isDelayedDeliverySupported() {
+        return delayedDeliverySupported;
+    }
+
+    /**
+     * Sets if the connection supports sending message with assigned delivery delays.
+     *
+     * @param deliveryDelaySupported
+     *      true if the delivery delay features is supported.
+     */
+    public void setDeliveryDelaySupported(boolean deliveryDelaySupported) {
+        this.delayedDeliverySupported = deliveryDelaySupported;
+    }
+
+    /**
      * @return true if the connection supports sending to an anonymous relay.
      */
     public boolean isAnonymousRelaySupported() {

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 9233ce1..df39b78 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -68,12 +68,16 @@ public class AmqpFixedProducer extends AmqpProducer {
 
     private AsyncResult sendCompletionWatcher;
 
+    private final AmqpConnection connection;
+
     public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
-        super(session, info);
+        this(session, info, null);
     }
 
     public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) {
         super(session, info, sender);
+
+        connection = session.getConnection();
     }
 
     @Override
@@ -93,7 +97,12 @@ public class AmqpFixedProducer extends AmqpProducer {
             request.onFailure(new IllegalStateException("The MessageProducer is closed"));
         }
 
-        if (getEndpoint().getCredit() <= 0) {
+        if (!connection.getProperties().isDelayedDeliverySupported() &&
+            envelope.getMessage().getJMSDeliveryTime() != 0) {
+
+            // Don't allow sends with delay if the remote said it can't handle them
+            request.onFailure(new JMSException("Remote does not support delayed message delivery"));
+        } else if (getEndpoint().getCredit() <= 0) {
             LOG.trace("Holding Message send until credit is available.");
 
             InFlightSend send = new InFlightSend(envelope, request);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 10ae94f..9738d68 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -43,6 +43,7 @@ public class AmqpSupport {
     // Symbols used for connection capabilities
     public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
     public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+    public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
 
     // Symbols used to announce connection error information
     public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index 82f63e0..89094a1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -17,6 +17,7 @@
 package org.apache.qpid.jms.provider.amqp.message;
 
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_DELIVERY_TIME;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE;
 import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
 
@@ -555,14 +556,21 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
 
     @Override
     public long getDeliveryTime() {
-        // TODO Auto-generated method stub
-        return 0;
+        Object deliveryTime = getMessageAnnotation(JMS_DELIVERY_TIME);
+        if (deliveryTime != null) {
+            return (long) deliveryTime;
+        }
+
+        return 0l;
     }
 
     @Override
     public void setDeliveryTime(long deliveryTime) {
-        // TODO Auto-generated method stub
-
+        if (deliveryTime != 0) {
+            setMessageAnnotation(JMS_DELIVERY_TIME, deliveryTime);
+        } else {
+            removeMessageAnnotation(JMS_DELIVERY_TIME);
+        }
     }
 
     /**

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 271481b..40987e1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -37,6 +37,11 @@ public final class AmqpMessageSupport {
     public static final String JMS_MSG_TYPE = "x-opt-jms-msg-type";
 
     /**
+     * Attribute used to mark the Application defined delivery time assigned to the message
+     */
+    public static final String JMS_DELIVERY_TIME = "x-opt-delivery-time";
+
+    /**
      * Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message
      * which has no body.
      */

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 0dee795..0e6b445 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -18,11 +18,13 @@
  */
 package org.apache.qpid.jms.integration;
 
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
 import static org.hamcrest.Matchers.both;
 import static org.hamcrest.Matchers.equalTo;
 import static org.hamcrest.Matchers.greaterThanOrEqualTo;
 import static org.hamcrest.Matchers.isA;
 import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
 import static org.hamcrest.Matchers.nullValue;
 import static org.junit.Assert.assertEquals;
 import static org.junit.Assert.assertNotNull;
@@ -86,6 +88,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSect
 import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
 import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
 import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.UnsignedByte;
 import org.apache.qpid.proton.amqp.UnsignedInteger;
 import org.hamcrest.Matcher;
@@ -1834,6 +1837,85 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
     }
 
     @Test(timeout = 20000)
+    public void testSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            // DO NOT add capability to indicate server support for DELAYED-DELIVERY
+
+            Connection connection = testFixture.establishConnecton(testPeer);
+
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            String topicName = "myTopic";
+            Topic dest = session.createTopic(topicName);
+
+            MessageProducer producer = session.createProducer(dest);
+            producer.setDeliveryDelay(5000);
+
+            // Producer should fail to send when message has delivery delay since remote
+            // did not report that it supports that option.
+            Message message = session.createMessage();
+            try {
+                producer.send(message);
+                fail("Send should fail");
+            } catch (JMSException jmsEx) {
+                LOG.debug("Caught expected error from failed send.");
+            }
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
+    public void testSendWorksWhenDelayedDeliveryIsSupported() throws Exception {
+        try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+            String topicName = "myTopic";
+
+            // DO add capability to indicate server support for DELAYED-DELIVERY
+
+            Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
+
+            connection.start();
+
+            testPeer.expectBegin();
+            testPeer.expectSenderAttach();
+
+            MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+            MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+            Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
+            msgAnnotationsMatcher.withEntry(annotationKey, notNullValue());
+
+            TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+            messageMatcher.setHeadersMatcher(headersMatcher);
+            messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+            testPeer.expectTransfer(messageMatcher);
+
+            Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+            Topic dest = session.createTopic(topicName);
+
+            MessageProducer producer = session.createProducer(dest);
+            producer.setDeliveryDelay(5000);
+            producer.send(session.createMessage());
+
+            testPeer.expectClose();
+            connection.close();
+
+            testPeer.waitForAllHandlersToComplete(1000);
+        }
+    }
+
+    @Test(timeout = 20000)
     public void testAsyncCompletionAfterSendMessageGetDispoation() throws Exception {
         try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
             Connection connection = testFixture.establishConnecton(testPeer);

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
index 75a1a78..055602d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
@@ -22,6 +22,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID;
+import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERYTIME;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION;
 import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_EXPIRATION;
@@ -1778,4 +1779,113 @@ public class JmsMessagePropertyIntercepterTest {
         JmsMessagePropertyIntercepter.clearProperties(message, true);
         assertFalse(JmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_ACK_TYPE));
     }
+
+    //---------- JMSDeliveryTime ---------------------------------------------//
+
+    @Test
+    public void testJMSDeliveryTimeInGetAllPropertyNames() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        assertTrue(JmsMessagePropertyIntercepter.getAllPropertyNames(message).contains(JMS_DELIVERYTIME));
+    }
+
+    @Test
+    public void testGetJMSDeliveryWhenNotSet() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        Mockito.when(facade.getDeliveryTime()).thenReturn(0L);
+        assertEquals(Long.valueOf(0L), JmsMessagePropertyIntercepter.getProperty(message, JMS_DELIVERYTIME));
+        Mockito.verify(facade).getDeliveryTime();
+    }
+
+    @Test
+    public void testGetJMSDeliveryTimeWhenSet() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        Mockito.when(facade.getDeliveryTime()).thenReturn(900L);
+        assertEquals(900L, JmsMessagePropertyIntercepter.getProperty(message, JMS_DELIVERYTIME));
+    }
+
+    @Test
+    public void testSetJMSDeliveryTime() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        JmsMessagePropertyIntercepter.setProperty(message, JMS_DELIVERYTIME, 65536L);
+        Mockito.verify(facade).setDeliveryTime(65536L);
+    }
+
+    @Test
+    public void testJMSDeliveryTimeInGetPropertyNamesWhenSet() throws JMSException {
+        doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(false);
+    }
+
+    @Test
+    public void testJMSDeliveryTimeNotInGetPropertyNamesWhenSetAndExcludingStandardJMSHeaders() throws JMSException {
+        doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(true);
+    }
+
+    private void doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(boolean excludeStandardJmsHeaders) throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        Mockito.when(facade.getDeliveryTime()).thenReturn(900L);
+        if (excludeStandardJmsHeaders) {
+            assertFalse(JmsMessagePropertyIntercepter.getPropertyNames(message, true).contains(JMS_DELIVERYTIME));
+        } else {
+            assertTrue(JmsMessagePropertyIntercepter.getPropertyNames(message, false).contains(JMS_DELIVERYTIME));
+        }
+    }
+
+    @Test
+    public void testJMSDeliveryTimeNotInGetPropertyNamesWhenNotSet() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        assertFalse(JmsMessagePropertyIntercepter.getPropertyNames(message, false).contains(JMS_DELIVERYTIME));
+    }
+
+    @Test
+    public void testJMSDeliveryTimePropertExistsWhenSet() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        Mockito.when(facade.getDeliveryTime()).thenReturn(900L);
+        assertTrue(JmsMessagePropertyIntercepter.propertyExists(message, JMS_DELIVERYTIME));
+    }
+
+    @Test
+    public void testJMSDeliveryTimePropertExistsWhenNotSet() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        Mockito.when(facade.getDeliveryTime()).thenReturn(0L);
+        assertFalse(JmsMessagePropertyIntercepter.propertyExists(message, JMS_DELIVERYTIME));
+    }
+
+    @Test
+    public void testSetJMSDeliveryTimeConversionChecks() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        try {
+            JmsMessagePropertyIntercepter.setProperty(message, JMS_DELIVERYTIME, new byte[1]);
+            fail("Should have thrown an exception for this call");
+        } catch (JMSException e) {
+        }
+    }
+
+    @Test
+    public void testJMSDeliveryTimeClearedWhenRequested() throws JMSException {
+        JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+        JmsMessage message = Mockito.mock(JmsMapMessage.class);
+        Mockito.when(message.getFacade()).thenReturn(facade);
+        JmsMessagePropertyIntercepter.clearProperties(message, true);
+        Mockito.verify(facade, Mockito.never()).setDeliveryTime(0);
+        JmsMessagePropertyIntercepter.clearProperties(message, false);
+        Mockito.verify(facade).setDeliveryTime(0);
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index e430f87..bd50a67 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -1518,6 +1518,16 @@ public class AmqpJmsMessageFacadeTest extends AmqpJmsMessageTypesTestCase  {
     }
 
     @Test
+    public void testNewMessageDoesNotHaveUnderlyingMessageAnnotationsSectionWithDeliveryTime() {
+        AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade();;
+
+        Message underlying = amqpMessageFacade.getAmqpMessage();
+        assertNotNull(underlying.getMessageAnnotations());
+        Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
+        assertNull(underlying.getMessageAnnotations().getValue().get(annotationKey));
+    }
+
+    @Test
     public void testMessageAnnotationExistsUsingReceivedMessageWithoutMessageAnnotationsSection() {
         String symbolKeyName = "myTestSymbolName";
 

http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
index 7c23d86..1525144 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.transports.netty;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONTAINER_ID;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.INVALID_FIELD;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PLATFORM;
 import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PRODUCT;
@@ -32,8 +33,13 @@ import java.util.Iterator;
 import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.IdGenerator;
 import org.apache.qpid.proton.Proton;
 import org.apache.qpid.proton.amqp.Symbol;
 import org.apache.qpid.proton.amqp.transport.AmqpError;
@@ -42,7 +48,9 @@ import org.apache.qpid.proton.engine.Collector;
 import org.apache.qpid.proton.engine.Connection;
 import org.apache.qpid.proton.engine.EndpointState;
 import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Receiver;
 import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
 import org.apache.qpid.proton.engine.Session;
 import org.apache.qpid.proton.engine.Transport;
 import org.apache.qpid.proton.engine.impl.CollectorImpl;
@@ -60,15 +68,19 @@ import io.netty.channel.SimpleChannelInboundHandler;
  * Simple Netty based server that can handle a small subset of AMQP events
  * using Proton-J as the protocol engine.
  */
+@SuppressWarnings( "unused" )
 public class NettySimpleAmqpServer extends NettyServer {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettySimpleAmqpServer.class);
 
+    private static final AtomicInteger SERVER_SEQUENCE = new AtomicInteger();
+
     private static final int CHANNEL_MAX = 32767;
     private static final int HEADER_SIZE = 8;
     private static final int SASL_PROTOCOL = 3;
 
     private final Map<String, List<Connection>> connections = new HashMap<String, List<Connection>>();
+    private final ScheduledExecutorService serializer;
 
     private boolean allowNonSaslConnections;
     private ConnectionIntercepter connectionIntercepter;
@@ -83,6 +95,18 @@ public class NettySimpleAmqpServer extends NettyServer {
 
     public NettySimpleAmqpServer(TransportOptions options, boolean needClientAuth, boolean webSocketServer) {
         super(options, needClientAuth, webSocketServer);
+
+        this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+            @Override
+            public Thread newThread(Runnable runner) {
+                Thread serial = new Thread(runner);
+                serial.setDaemon(true);
+                serial.setName(NettySimpleAmqpServer.this.getClass().getSimpleName() + ":(" +
+                               SERVER_SEQUENCE.incrementAndGet() + "):Worker");
+                return serial;
+            }
+        });
     }
 
     @Override
@@ -111,11 +135,15 @@ public class NettySimpleAmqpServer extends NettyServer {
 
     private final class ProtonConnection extends SimpleChannelInboundHandler<ByteBuf>  {
 
+        private final IdGenerator sessionIdGenerator = new IdGenerator();
+
         private final Transport protonTransport = Proton.transport();
         private final Connection protonConnection = Proton.connection();
         private final Collector eventCollector = new CollectorImpl();
         private SaslAuthenticator authenticator;
 
+        private final Map<String, ProtonSession> sessions = new HashMap<String, ProtonSession>();
+
         private boolean exclusiveContainerId;
         private boolean headerRead;
         private final ByteBuf headerBuf = Unpooled.buffer(HEADER_SIZE, HEADER_SIZE);
@@ -223,6 +251,21 @@ public class NettySimpleAmqpServer extends NettyServer {
                     case SESSION_REMOTE_CLOSE:
                         processSessionClose(event.getSession());
                         break;
+                    case LINK_REMOTE_OPEN:
+                        //processLinkOpen(event.getLink());
+                        break;
+                    case LINK_REMOTE_DETACH:
+                        //processLinkDetach(event.getLink());
+                        break;
+                    case LINK_REMOTE_CLOSE:
+                        //processLinkClose(event.getLink());
+                        break;
+                    case LINK_FLOW:
+                        //processLinkFlow(event.getLink());
+                        break;
+                    case DELIVERY:
+                        //processDelivery(event.getDelivery());
+                        break;
                     default:
                         break;
                 }
@@ -263,11 +306,17 @@ public class NettySimpleAmqpServer extends NettyServer {
         }
 
         private void processSessionClose(Session session) {
+            ProtonSession protonSession = (ProtonSession) session.getContext();
+
+            sessions.remove(protonSession.getId());
+
             session.close();
             session.free();
         }
 
         private void processSessionOpen(Session session) {
+            ProtonSession protonSession = new ProtonSession(sessionIdGenerator.generateId(), session);
+            sessions.put(protonSession.getId(), protonSession);
             session.open();
         }
 
@@ -387,7 +436,7 @@ public class NettySimpleAmqpServer extends NettyServer {
     }
 
     private Symbol[] getConnectionCapabilitiesOffered() {
-        return new Symbol[]{ ANONYMOUS_RELAY };
+        return new Symbol[]{ ANONYMOUS_RELAY, DELAYED_DELIVERY };
     }
 
     private Map<Symbol, Object> getConnetionProperties() {
@@ -453,7 +502,74 @@ public class NettySimpleAmqpServer extends NettyServer {
 
     }
 
-    //----- Internal Type Implementations ------------------------------------//
+    //----- Session Manager --------------------------------------------------//
+
+    private class ProtonSession {
+
+        private final String sessionId;
+        private final Session session;
+
+        private Map<String, ProtonSender> senders = new HashMap<String, ProtonSender>();
+        private Map<String, ProtonReceiver> receivers = new HashMap<String, ProtonReceiver>();
+
+        public ProtonSession(String sessionId, Session session) {
+            this.sessionId = sessionId;
+            this.session = session;
+            this.session.setContext(this);
+        }
+
+        public Session getSession() {
+            return session;
+        }
+
+        public String getId() {
+            return sessionId;
+        }
+    }
+
+    //----- Sender Manager ---------------------------------------------------//
+
+    private class ProtonSender {
+
+        private final String senderId;
+        private final Sender sender;
+
+        public ProtonSender(String senderId, Sender sender) {
+            this.senderId = senderId;
+            this.sender = sender;
+        }
+
+        public String getId() {
+            return senderId;
+        }
+
+        public Sender getSender() {
+            return sender;
+        }
+    }
+
+    //----- Receiver Manager ---------------------------------------------------//
+
+    private class ProtonReceiver {
+
+        private final String receiverId;
+        private final Receiver receiver;
+
+        public ProtonReceiver(String receiverId, Receiver receiver) {
+            this.receiverId = receiverId;
+            this.receiver = receiver;
+        }
+
+        public String getId() {
+            return receiverId;
+        }
+
+        public Receiver getReceiver() {
+            return receiver;
+        }
+    }
+
+    //----- SASL Authentication Manager --------------------------------------//
 
     private class SaslAuthenticator {
 
@@ -496,7 +612,8 @@ public class NettySimpleAmqpServer extends NettyServer {
         }
     }
 
-    @SuppressWarnings("unused")
+    //----- Simple AMQP Header Wrapper ---------------------------------------//
+
     private class AmqpHeader {
 
         private final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' };


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org