You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:08 UTC
[7/7] qpid-jms git commit: QPIDJMS-207 Adds support for the JMS 2.0
Delayed Delivery feature
QPIDJMS-207 Adds support for the JMS 2.0 Delayed Delivery feature
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/6e442f4c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/6e442f4c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/6e442f4c
Branch: refs/heads/master
Commit: 6e442f4c6aa1401a14031c6f2f05d7edbd58037c
Parents: 0c39522
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 12 15:20:25 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 12 15:20:25 2016 -0400
----------------------------------------------------------------------
.../message/JmsMessagePropertyIntercepter.java | 32 +++++
.../qpid/jms/message/JmsMessageSupport.java | 1 +
.../provider/amqp/AmqpConnectionProperties.java | 23 ++++
.../jms/provider/amqp/AmqpFixedProducer.java | 13 +-
.../qpid/jms/provider/amqp/AmqpSupport.java | 1 +
.../amqp/message/AmqpJmsMessageFacade.java | 16 ++-
.../amqp/message/AmqpMessageSupport.java | 5 +
.../integration/ProducerIntegrationTest.java | 82 +++++++++++++
.../JmsMessagePropertyIntercepterTest.java | 110 +++++++++++++++++
.../amqp/message/AmqpJmsMessageFacadeTest.java | 10 ++
.../transports/netty/NettySimpleAmqpServer.java | 123 ++++++++++++++++++-
11 files changed, 407 insertions(+), 9 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
index bb2eb0b..65c8c9a 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepter.java
@@ -24,6 +24,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID;
+import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERYTIME;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_EXPIRATION;
@@ -135,6 +136,7 @@ public class JmsMessagePropertyIntercepter {
STANDARD_HEADERS.add(JMS_TYPE);
STANDARD_HEADERS.add(JMS_EXPIRATION);
STANDARD_HEADERS.add(JMS_PRIORITY);
+ STANDARD_HEADERS.add(JMS_DELIVERYTIME);
VENDOR_PROPERTIES.add(JMS_AMQP_ACK_TYPE);
@@ -638,6 +640,36 @@ public class JmsMessagePropertyIntercepter {
return true;
}
});
+ PROPERTY_INTERCEPTERS.put(JMS_DELIVERYTIME, new PropertyIntercepter() {
+ @Override
+ public Object getProperty(JmsMessage message) throws JMSException {
+ return Long.valueOf(message.getFacade().getDeliveryTime());
+ }
+
+ @Override
+ public void setProperty(JmsMessage message, Object value) throws JMSException {
+ Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
+ if (rc == null) {
+ throw new JMSException("Property JMSDeliveryTime cannot be set from a " + value.getClass().getName() + ".");
+ }
+ message.getFacade().setDeliveryTime(rc.longValue());
+ }
+
+ @Override
+ public boolean propertyExists(JmsMessage message) {
+ return message.getFacade().getDeliveryTime() > 0;
+ }
+
+ @Override
+ public void clearProperty(JmsMessage message) {
+ message.getFacade().setDeliveryTime(0);
+ }
+
+ @Override
+ public boolean isAlwaysWritable() {
+ return false;
+ }
+ });
}
/**
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
index c3ce451..657542c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessageSupport.java
@@ -31,6 +31,7 @@ public class JmsMessageSupport {
public static final String JMS_CORRELATIONID = "JMSCorrelationID";
public static final String JMS_EXPIRATION = "JMSExpiration";
public static final String JMS_REDELIVERED = "JMSRedelivered";
+ public static final String JMS_DELIVERYTIME = "JMSDeliveryTime";
public static final String JMSX_GROUPID = "JMSXGroupID";
public static final String JMSX_GROUPSEQ = "JMSXGroupSeq";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
index 79a0d95..c090853 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpConnectionProperties.java
@@ -18,6 +18,7 @@ package org.apache.qpid.jms.provider.amqp;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.QUEUE_PREFIX;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.TOPIC_PREFIX;
@@ -41,6 +42,7 @@ public class AmqpConnectionProperties {
private final JmsConnectionInfo connectionInfo;
+ private boolean delayedDeliverySupported = false;
private boolean anonymousRelaySupported = false;
private boolean connectionOpenFailed = false;
@@ -78,6 +80,10 @@ public class AmqpConnectionProperties {
if (list.contains(ANONYMOUS_RELAY)) {
anonymousRelaySupported = true;
}
+
+ if (list.contains(DELAYED_DELIVERY)) {
+ delayedDeliverySupported = true;
+ }
}
protected void processProperties(Map<Symbol, Object> properties) {
@@ -104,6 +110,23 @@ public class AmqpConnectionProperties {
}
/**
+ * @return true if the connection supports sending message with delivery delays.
+ */
+ public boolean isDelayedDeliverySupported() {
+ return delayedDeliverySupported;
+ }
+
+ /**
+ * Sets if the connection supports sending message with assigned delivery delays.
+ *
+ * @param deliveryDelaySupported
+ * true if the delivery delay features is supported.
+ */
+ public void setDeliveryDelaySupported(boolean deliveryDelaySupported) {
+ this.delayedDeliverySupported = deliveryDelaySupported;
+ }
+
+ /**
* @return true if the connection supports sending to an anonymous relay.
*/
public boolean isAnonymousRelaySupported() {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
index 9233ce1..df39b78 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpFixedProducer.java
@@ -68,12 +68,16 @@ public class AmqpFixedProducer extends AmqpProducer {
private AsyncResult sendCompletionWatcher;
+ private final AmqpConnection connection;
+
public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info) {
- super(session, info);
+ this(session, info, null);
}
public AmqpFixedProducer(AmqpSession session, JmsProducerInfo info, Sender sender) {
super(session, info, sender);
+
+ connection = session.getConnection();
}
@Override
@@ -93,7 +97,12 @@ public class AmqpFixedProducer extends AmqpProducer {
request.onFailure(new IllegalStateException("The MessageProducer is closed"));
}
- if (getEndpoint().getCredit() <= 0) {
+ if (!connection.getProperties().isDelayedDeliverySupported() &&
+ envelope.getMessage().getJMSDeliveryTime() != 0) {
+
+ // Don't allow sends with delay if the remote said it can't handle them
+ request.onFailure(new JMSException("Remote does not support delayed message delivery"));
+ } else if (getEndpoint().getCredit() <= 0) {
LOG.trace("Holding Message send until credit is available.");
InFlightSend send = new InFlightSend(envelope, request);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
index 10ae94f..9738d68 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/AmqpSupport.java
@@ -43,6 +43,7 @@ public class AmqpSupport {
// Symbols used for connection capabilities
public static final Symbol SOLE_CONNECTION_CAPABILITY = Symbol.valueOf("sole-connection-for-container");
public static final Symbol ANONYMOUS_RELAY = Symbol.valueOf("ANONYMOUS-RELAY");
+ public static final Symbol DELAYED_DELIVERY = Symbol.valueOf("DELAYED_DELIVERY");
// Symbols used to announce connection error information
public static final Symbol CONNECTION_OPEN_FAILED = Symbol.valueOf("amqp:connection-establishment-failed");
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
index 82f63e0..89094a1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacade.java
@@ -17,6 +17,7 @@
package org.apache.qpid.jms.provider.amqp.message;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_AMQP_TTL;
+import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_DELIVERY_TIME;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MESSAGE;
import static org.apache.qpid.jms.provider.amqp.message.AmqpMessageSupport.JMS_MSG_TYPE;
@@ -555,14 +556,21 @@ public class AmqpJmsMessageFacade implements JmsMessageFacade {
@Override
public long getDeliveryTime() {
- // TODO Auto-generated method stub
- return 0;
+ Object deliveryTime = getMessageAnnotation(JMS_DELIVERY_TIME);
+ if (deliveryTime != null) {
+ return (long) deliveryTime;
+ }
+
+ return 0l;
}
@Override
public void setDeliveryTime(long deliveryTime) {
- // TODO Auto-generated method stub
-
+ if (deliveryTime != 0) {
+ setMessageAnnotation(JMS_DELIVERY_TIME, deliveryTime);
+ } else {
+ removeMessageAnnotation(JMS_DELIVERY_TIME);
+ }
}
/**
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
index 271481b..40987e1 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/provider/amqp/message/AmqpMessageSupport.java
@@ -37,6 +37,11 @@ public final class AmqpMessageSupport {
public static final String JMS_MSG_TYPE = "x-opt-jms-msg-type";
/**
+ * Attribute used to mark the Application defined delivery time assigned to the message
+ */
+ public static final String JMS_DELIVERY_TIME = "x-opt-delivery-time";
+
+ /**
* Value mapping for JMS_MSG_TYPE which indicates the message is a generic JMS Message
* which has no body.
*/
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
index 0dee795..0e6b445 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/integration/ProducerIntegrationTest.java
@@ -18,11 +18,13 @@
*/
package org.apache.qpid.jms.integration;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.hamcrest.Matchers.both;
import static org.hamcrest.Matchers.equalTo;
import static org.hamcrest.Matchers.greaterThanOrEqualTo;
import static org.hamcrest.Matchers.isA;
import static org.hamcrest.Matchers.lessThanOrEqualTo;
+import static org.hamcrest.Matchers.notNullValue;
import static org.hamcrest.Matchers.nullValue;
import static org.junit.Assert.assertEquals;
import static org.junit.Assert.assertNotNull;
@@ -86,6 +88,7 @@ import org.apache.qpid.jms.test.testpeer.matchers.sections.MessagePropertiesSect
import org.apache.qpid.jms.test.testpeer.matchers.sections.TransferPayloadCompositeMatcher;
import org.apache.qpid.jms.test.testpeer.matchers.types.EncodedAmqpValueMatcher;
import org.apache.qpid.proton.amqp.Binary;
+import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.UnsignedByte;
import org.apache.qpid.proton.amqp.UnsignedInteger;
import org.hamcrest.Matcher;
@@ -1834,6 +1837,85 @@ public class ProducerIntegrationTest extends QpidJmsTestCase {
}
@Test(timeout = 20000)
+ public void testSendFailsWhenDelayedDeliveryIsNotSupported() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ // DO NOT add capability to indicate server support for DELAYED-DELIVERY
+
+ Connection connection = testFixture.establishConnecton(testPeer);
+
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ String topicName = "myTopic";
+ Topic dest = session.createTopic(topicName);
+
+ MessageProducer producer = session.createProducer(dest);
+ producer.setDeliveryDelay(5000);
+
+ // Producer should fail to send when message has delivery delay since remote
+ // did not report that it supports that option.
+ Message message = session.createMessage();
+ try {
+ producer.send(message);
+ fail("Send should fail");
+ } catch (JMSException jmsEx) {
+ LOG.debug("Caught expected error from failed send.");
+ }
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
+ public void testSendWorksWhenDelayedDeliveryIsSupported() throws Exception {
+ try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
+
+ String topicName = "myTopic";
+
+ // DO add capability to indicate server support for DELAYED-DELIVERY
+
+ Connection connection = testFixture.establishConnecton(testPeer, new Symbol[]{ DELAYED_DELIVERY });
+
+ connection.start();
+
+ testPeer.expectBegin();
+ testPeer.expectSenderAttach();
+
+ MessageHeaderSectionMatcher headersMatcher = new MessageHeaderSectionMatcher(true).withDurable(equalTo(true));
+ MessageAnnotationsSectionMatcher msgAnnotationsMatcher = new MessageAnnotationsSectionMatcher(true);
+ Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
+ msgAnnotationsMatcher.withEntry(annotationKey, notNullValue());
+
+ TransferPayloadCompositeMatcher messageMatcher = new TransferPayloadCompositeMatcher();
+ messageMatcher.setHeadersMatcher(headersMatcher);
+ messageMatcher.setMessageAnnotationsMatcher(msgAnnotationsMatcher);
+
+ testPeer.expectTransfer(messageMatcher);
+
+ Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
+
+ Topic dest = session.createTopic(topicName);
+
+ MessageProducer producer = session.createProducer(dest);
+ producer.setDeliveryDelay(5000);
+ producer.send(session.createMessage());
+
+ testPeer.expectClose();
+ connection.close();
+
+ testPeer.waitForAllHandlersToComplete(1000);
+ }
+ }
+
+ @Test(timeout = 20000)
public void testAsyncCompletionAfterSendMessageGetDispoation() throws Exception {
try (TestAmqpPeer testPeer = new TestAmqpPeer();) {
Connection connection = testFixture.establishConnecton(testPeer);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
index 75a1a78..055602d 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/message/JmsMessagePropertyIntercepterTest.java
@@ -22,6 +22,7 @@ import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_GROUPSEQ;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMSX_USERID;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_AMQP_ACK_TYPE;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_CORRELATIONID;
+import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERYTIME;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DELIVERY_MODE;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_DESTINATION;
import static org.apache.qpid.jms.message.JmsMessageSupport.JMS_EXPIRATION;
@@ -1778,4 +1779,113 @@ public class JmsMessagePropertyIntercepterTest {
JmsMessagePropertyIntercepter.clearProperties(message, true);
assertFalse(JmsMessagePropertyIntercepter.propertyExists(message, JMS_AMQP_ACK_TYPE));
}
+
+ //---------- JMSDeliveryTime ---------------------------------------------//
+
+ @Test
+ public void testJMSDeliveryTimeInGetAllPropertyNames() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ assertTrue(JmsMessagePropertyIntercepter.getAllPropertyNames(message).contains(JMS_DELIVERYTIME));
+ }
+
+ @Test
+ public void testGetJMSDeliveryWhenNotSet() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ Mockito.when(facade.getDeliveryTime()).thenReturn(0L);
+ assertEquals(Long.valueOf(0L), JmsMessagePropertyIntercepter.getProperty(message, JMS_DELIVERYTIME));
+ Mockito.verify(facade).getDeliveryTime();
+ }
+
+ @Test
+ public void testGetJMSDeliveryTimeWhenSet() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ Mockito.when(facade.getDeliveryTime()).thenReturn(900L);
+ assertEquals(900L, JmsMessagePropertyIntercepter.getProperty(message, JMS_DELIVERYTIME));
+ }
+
+ @Test
+ public void testSetJMSDeliveryTime() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ JmsMessagePropertyIntercepter.setProperty(message, JMS_DELIVERYTIME, 65536L);
+ Mockito.verify(facade).setDeliveryTime(65536L);
+ }
+
+ @Test
+ public void testJMSDeliveryTimeInGetPropertyNamesWhenSet() throws JMSException {
+ doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(false);
+ }
+
+ @Test
+ public void testJMSDeliveryTimeNotInGetPropertyNamesWhenSetAndExcludingStandardJMSHeaders() throws JMSException {
+ doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(true);
+ }
+
+ private void doJMSDeliveryTimeInGetPropertyNamesWhenSetTestImpl(boolean excludeStandardJmsHeaders) throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ Mockito.when(facade.getDeliveryTime()).thenReturn(900L);
+ if (excludeStandardJmsHeaders) {
+ assertFalse(JmsMessagePropertyIntercepter.getPropertyNames(message, true).contains(JMS_DELIVERYTIME));
+ } else {
+ assertTrue(JmsMessagePropertyIntercepter.getPropertyNames(message, false).contains(JMS_DELIVERYTIME));
+ }
+ }
+
+ @Test
+ public void testJMSDeliveryTimeNotInGetPropertyNamesWhenNotSet() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ assertFalse(JmsMessagePropertyIntercepter.getPropertyNames(message, false).contains(JMS_DELIVERYTIME));
+ }
+
+ @Test
+ public void testJMSDeliveryTimePropertExistsWhenSet() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ Mockito.when(facade.getDeliveryTime()).thenReturn(900L);
+ assertTrue(JmsMessagePropertyIntercepter.propertyExists(message, JMS_DELIVERYTIME));
+ }
+
+ @Test
+ public void testJMSDeliveryTimePropertExistsWhenNotSet() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ Mockito.when(facade.getDeliveryTime()).thenReturn(0L);
+ assertFalse(JmsMessagePropertyIntercepter.propertyExists(message, JMS_DELIVERYTIME));
+ }
+
+ @Test
+ public void testSetJMSDeliveryTimeConversionChecks() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ try {
+ JmsMessagePropertyIntercepter.setProperty(message, JMS_DELIVERYTIME, new byte[1]);
+ fail("Should have thrown an exception for this call");
+ } catch (JMSException e) {
+ }
+ }
+
+ @Test
+ public void testJMSDeliveryTimeClearedWhenRequested() throws JMSException {
+ JmsMessageFacade facade = Mockito.mock(JmsMessageFacade.class);
+ JmsMessage message = Mockito.mock(JmsMapMessage.class);
+ Mockito.when(message.getFacade()).thenReturn(facade);
+ JmsMessagePropertyIntercepter.clearProperties(message, true);
+ Mockito.verify(facade, Mockito.never()).setDeliveryTime(0);
+ JmsMessagePropertyIntercepter.clearProperties(message, false);
+ Mockito.verify(facade).setDeliveryTime(0);
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
index e430f87..bd50a67 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/provider/amqp/message/AmqpJmsMessageFacadeTest.java
@@ -1518,6 +1518,16 @@ public class AmqpJmsMessageFacadeTest extends AmqpJmsMessageTypesTestCase {
}
@Test
+ public void testNewMessageDoesNotHaveUnderlyingMessageAnnotationsSectionWithDeliveryTime() {
+ AmqpJmsMessageFacade amqpMessageFacade = createNewMessageFacade();;
+
+ Message underlying = amqpMessageFacade.getAmqpMessage();
+ assertNotNull(underlying.getMessageAnnotations());
+ Symbol annotationKey = AmqpMessageSupport.getSymbol(AmqpMessageSupport.JMS_DELIVERY_TIME);
+ assertNull(underlying.getMessageAnnotations().getValue().get(annotationKey));
+ }
+
+ @Test
public void testMessageAnnotationExistsUsingReceivedMessageWithoutMessageAnnotationsSection() {
String symbolKeyName = "myTestSymbolName";
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/6e442f4c/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
index 7c23d86..1525144 100644
--- a/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
+++ b/qpid-jms-client/src/test/java/org/apache/qpid/jms/transports/netty/NettySimpleAmqpServer.java
@@ -19,6 +19,7 @@ package org.apache.qpid.jms.transports.netty;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.ANONYMOUS_RELAY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONNECTION_OPEN_FAILED;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.CONTAINER_ID;
+import static org.apache.qpid.jms.provider.amqp.AmqpSupport.DELAYED_DELIVERY;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.INVALID_FIELD;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PLATFORM;
import static org.apache.qpid.jms.provider.amqp.AmqpSupport.PRODUCT;
@@ -32,8 +33,13 @@ import java.util.Iterator;
import java.util.LinkedList;
import java.util.List;
import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicInteger;
import org.apache.qpid.jms.transports.TransportOptions;
+import org.apache.qpid.jms.util.IdGenerator;
import org.apache.qpid.proton.Proton;
import org.apache.qpid.proton.amqp.Symbol;
import org.apache.qpid.proton.amqp.transport.AmqpError;
@@ -42,7 +48,9 @@ import org.apache.qpid.proton.engine.Collector;
import org.apache.qpid.proton.engine.Connection;
import org.apache.qpid.proton.engine.EndpointState;
import org.apache.qpid.proton.engine.Event;
+import org.apache.qpid.proton.engine.Receiver;
import org.apache.qpid.proton.engine.Sasl;
+import org.apache.qpid.proton.engine.Sender;
import org.apache.qpid.proton.engine.Session;
import org.apache.qpid.proton.engine.Transport;
import org.apache.qpid.proton.engine.impl.CollectorImpl;
@@ -60,15 +68,19 @@ import io.netty.channel.SimpleChannelInboundHandler;
* Simple Netty based server that can handle a small subset of AMQP events
* using Proton-J as the protocol engine.
*/
+@SuppressWarnings( "unused" )
public class NettySimpleAmqpServer extends NettyServer {
private static final Logger LOG = LoggerFactory.getLogger(NettySimpleAmqpServer.class);
+ private static final AtomicInteger SERVER_SEQUENCE = new AtomicInteger();
+
private static final int CHANNEL_MAX = 32767;
private static final int HEADER_SIZE = 8;
private static final int SASL_PROTOCOL = 3;
private final Map<String, List<Connection>> connections = new HashMap<String, List<Connection>>();
+ private final ScheduledExecutorService serializer;
private boolean allowNonSaslConnections;
private ConnectionIntercepter connectionIntercepter;
@@ -83,6 +95,18 @@ public class NettySimpleAmqpServer extends NettyServer {
public NettySimpleAmqpServer(TransportOptions options, boolean needClientAuth, boolean webSocketServer) {
super(options, needClientAuth, webSocketServer);
+
+ this.serializer = Executors.newSingleThreadScheduledExecutor(new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable runner) {
+ Thread serial = new Thread(runner);
+ serial.setDaemon(true);
+ serial.setName(NettySimpleAmqpServer.this.getClass().getSimpleName() + ":(" +
+ SERVER_SEQUENCE.incrementAndGet() + "):Worker");
+ return serial;
+ }
+ });
}
@Override
@@ -111,11 +135,15 @@ public class NettySimpleAmqpServer extends NettyServer {
private final class ProtonConnection extends SimpleChannelInboundHandler<ByteBuf> {
+ private final IdGenerator sessionIdGenerator = new IdGenerator();
+
private final Transport protonTransport = Proton.transport();
private final Connection protonConnection = Proton.connection();
private final Collector eventCollector = new CollectorImpl();
private SaslAuthenticator authenticator;
+ private final Map<String, ProtonSession> sessions = new HashMap<String, ProtonSession>();
+
private boolean exclusiveContainerId;
private boolean headerRead;
private final ByteBuf headerBuf = Unpooled.buffer(HEADER_SIZE, HEADER_SIZE);
@@ -223,6 +251,21 @@ public class NettySimpleAmqpServer extends NettyServer {
case SESSION_REMOTE_CLOSE:
processSessionClose(event.getSession());
break;
+ case LINK_REMOTE_OPEN:
+ //processLinkOpen(event.getLink());
+ break;
+ case LINK_REMOTE_DETACH:
+ //processLinkDetach(event.getLink());
+ break;
+ case LINK_REMOTE_CLOSE:
+ //processLinkClose(event.getLink());
+ break;
+ case LINK_FLOW:
+ //processLinkFlow(event.getLink());
+ break;
+ case DELIVERY:
+ //processDelivery(event.getDelivery());
+ break;
default:
break;
}
@@ -263,11 +306,17 @@ public class NettySimpleAmqpServer extends NettyServer {
}
private void processSessionClose(Session session) {
+ ProtonSession protonSession = (ProtonSession) session.getContext();
+
+ sessions.remove(protonSession.getId());
+
session.close();
session.free();
}
private void processSessionOpen(Session session) {
+ ProtonSession protonSession = new ProtonSession(sessionIdGenerator.generateId(), session);
+ sessions.put(protonSession.getId(), protonSession);
session.open();
}
@@ -387,7 +436,7 @@ public class NettySimpleAmqpServer extends NettyServer {
}
private Symbol[] getConnectionCapabilitiesOffered() {
- return new Symbol[]{ ANONYMOUS_RELAY };
+ return new Symbol[]{ ANONYMOUS_RELAY, DELAYED_DELIVERY };
}
private Map<Symbol, Object> getConnetionProperties() {
@@ -453,7 +502,74 @@ public class NettySimpleAmqpServer extends NettyServer {
}
- //----- Internal Type Implementations ------------------------------------//
+ //----- Session Manager --------------------------------------------------//
+
+ private class ProtonSession {
+
+ private final String sessionId;
+ private final Session session;
+
+ private Map<String, ProtonSender> senders = new HashMap<String, ProtonSender>();
+ private Map<String, ProtonReceiver> receivers = new HashMap<String, ProtonReceiver>();
+
+ public ProtonSession(String sessionId, Session session) {
+ this.sessionId = sessionId;
+ this.session = session;
+ this.session.setContext(this);
+ }
+
+ public Session getSession() {
+ return session;
+ }
+
+ public String getId() {
+ return sessionId;
+ }
+ }
+
+ //----- Sender Manager ---------------------------------------------------//
+
+ private class ProtonSender {
+
+ private final String senderId;
+ private final Sender sender;
+
+ public ProtonSender(String senderId, Sender sender) {
+ this.senderId = senderId;
+ this.sender = sender;
+ }
+
+ public String getId() {
+ return senderId;
+ }
+
+ public Sender getSender() {
+ return sender;
+ }
+ }
+
+ //----- Receiver Manager ---------------------------------------------------//
+
+ private class ProtonReceiver {
+
+ private final String receiverId;
+ private final Receiver receiver;
+
+ public ProtonReceiver(String receiverId, Receiver receiver) {
+ this.receiverId = receiverId;
+ this.receiver = receiver;
+ }
+
+ public String getId() {
+ return receiverId;
+ }
+
+ public Receiver getReceiver() {
+ return receiver;
+ }
+ }
+
+ //----- SASL Authentication Manager --------------------------------------//
private class SaslAuthenticator {
@@ -496,7 +612,8 @@ public class NettySimpleAmqpServer extends NettyServer {
}
}
- @SuppressWarnings("unused")
+ //----- Simple AMQP Header Wrapper ---------------------------------------//
+
private class AmqpHeader {
private final byte[] PREFIX = new byte[] { 'A', 'M', 'Q', 'P' };
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org