You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2016/09/12 20:12:07 UTC
[6/7] qpid-jms git commit: QPIDJMS-207 Adds dependency on JMS 2.0 API
and initial implementation.
QPIDJMS-207 Adds dependency on JMS 2.0 API and initial implementation.
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms/commit/0c39522c
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms/tree/0c39522c
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms/diff/0c39522c
Branch: refs/heads/master
Commit: 0c39522cde1c845d8b57978dddfa931ee440e9e3
Parents: 3a03663
Author: Timothy Bish <ta...@gmail.com>
Authored: Mon Sep 12 14:39:42 2016 -0400
Committer: Timothy Bish <ta...@gmail.com>
Committed: Mon Sep 12 14:39:42 2016 -0400
----------------------------------------------------------------------
pom.xml | 7 +-
qpid-jms-client/pom.xml | 6 +-
.../apache/qpid/jms/JmsCompletionListener.java | 47 --
.../java/org/apache/qpid/jms/JmsConnection.java | 24 +
.../apache/qpid/jms/JmsConnectionFactory.java | 34 +-
.../java/org/apache/qpid/jms/JmsConsumer.java | 135 +++++
.../java/org/apache/qpid/jms/JmsContext.java | 510 +++++++++++++++++
.../org/apache/qpid/jms/JmsMessageConsumer.java | 53 ++
.../org/apache/qpid/jms/JmsMessageProducer.java | 106 ++--
.../java/org/apache/qpid/jms/JmsProducer.java | 454 +++++++++++++++
.../java/org/apache/qpid/jms/JmsSession.java | 80 ++-
.../jms/exceptions/JmsExceptionSupport.java | 66 +++
.../qpid/jms/message/JmsBytesMessage.java | 15 +
.../apache/qpid/jms/message/JmsMapMessage.java | 24 +
.../org/apache/qpid/jms/message/JmsMessage.java | 111 ++--
.../message/JmsMessagePropertyIntercepter.java | 73 +--
.../jms/message/JmsMessagePropertySupport.java | 124 ++++
.../qpid/jms/message/JmsObjectMessage.java | 19 +
.../qpid/jms/message/JmsStreamMessage.java | 5 +
.../apache/qpid/jms/message/JmsTextMessage.java | 11 +
.../message/facade/JmsBytesMessageFacade.java | 6 +
.../jms/message/facade/JmsMessageFacade.java | 26 +
.../amqp/message/AmqpJmsBytesMessageFacade.java | 24 +-
.../amqp/message/AmqpJmsMapMessageFacade.java | 5 +
.../amqp/message/AmqpJmsMessageFacade.java | 17 +
.../message/AmqpJmsObjectMessageFacade.java | 5 +
.../message/AmqpJmsStreamMessageFacade.java | 5 +
.../amqp/message/AmqpJmsTextMessageFacade.java | 9 +
.../amqp/message/AmqpObjectTypeDelegate.java | 3 +
.../message/AmqpSerializedObjectDelegate.java | 9 +
.../amqp/message/AmqpTypedObjectDelegate.java | 9 +
.../org/apache/qpid/jms/JmsConnectionTest.java | 10 +-
.../qpid/jms/JmsConnectionTestSupport.java | 9 +
.../integration/ConnectionIntegrationTest.java | 50 ++
.../jms/integration/IntegrationTestFixture.java | 73 ++-
.../integration/JMSConsumerIntegrationTest.java | 559 +++++++++++++++++++
.../integration/JMSContextIntegrationTest.java | 198 +++++++
.../integration/JMSProducerIntegrationTest.java | 200 +++++++
.../integration/MapMessageIntegrationTest.java | 43 +-
.../jms/integration/MessageIntegrationTest.java | 40 ++
.../ObjectMessageIntegrationTest.java | 116 +++-
.../PresettledProducerIntegrationTest.java | 4 +-
.../integration/ProducerIntegrationTest.java | 4 +-
.../jms/integration/SessionIntegrationTest.java | 31 +-
.../StreamMessageIntegrationTest.java | 63 ++-
.../integration/TextMessageIntegrationTest.java | 83 ++-
.../apache/qpid/jms/message/JmsMessageTest.java | 335 +++++++++++
.../facade/test/JmsTestBytesMessageFacade.java | 25 +-
.../facade/test/JmsTestMapMessageFacade.java | 5 +
.../facade/test/JmsTestMessageFacade.java | 16 +
.../facade/test/JmsTestObjectMessageFacade.java | 5 +
.../facade/test/JmsTestStreamMessageFacade.java | 5 +
.../facade/test/JmsTestTextMessageFacade.java | 5 +
.../jms/message/foreign/ForeignJmsMessage.java | 20 +
.../jms/producer/JmsMessageProducerTest.java | 4 +-
.../qpid/jms/producer/JmsProducerTest.java | 182 ++++++
.../failover/FailoverIntegrationTest.java | 4 +-
.../qpid/jms/JmsMessageIntegrityTest.java | 28 +
58 files changed, 3825 insertions(+), 314 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/pom.xml
----------------------------------------------------------------------
diff --git a/pom.xml b/pom.xml
index bd7a642..fbb9396 100644
--- a/pom.xml
+++ b/pom.xml
@@ -46,7 +46,8 @@
<proton-version>0.14.0</proton-version>
<netty-version>4.0.41.Final</netty-version>
<slf4j-version>1.7.21</slf4j-version>
- <geronimo-jms-1-1-spec-version>1.1.1</geronimo-jms-1-1-spec-version>
+ <!-- <geronimo-jms-1-1-spec-version>1.1.1</geronimo-jms-1-1-spec-version> -->
+ <geronimo.jms.2.spec.version>1.0-alpha-2</geronimo.jms.2.spec.version>
<!-- Test Dependency Versions for this Project -->
<activemq-version>5.14.0</activemq-version>
<junit-version>4.12</junit-version>
@@ -113,8 +114,8 @@
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
- <version>${geronimo-jms-1-1-spec-version}</version>
+ <artifactId>geronimo-jms_2.0_spec</artifactId>
+ <version>${geronimo.jms.2.spec.version}</version>
</dependency>
<dependency>
<groupId>io.netty</groupId>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/pom.xml
----------------------------------------------------------------------
diff --git a/qpid-jms-client/pom.xml b/qpid-jms-client/pom.xml
index a5d1dd6..24b647e 100644
--- a/qpid-jms-client/pom.xml
+++ b/qpid-jms-client/pom.xml
@@ -38,8 +38,12 @@
</dependency>
<dependency>
<groupId>org.apache.geronimo.specs</groupId>
- <artifactId>geronimo-jms_1.1_spec</artifactId>
+ <artifactId>geronimo-jms_2.0_spec</artifactId>
</dependency>
+ <!-- <dependency>
+ <groupId>org.apache.geronimo.specs</groupId>
+ <artifactId>geronimo-jms_1.1_spec</artifactId>
+ </dependency> -->
<dependency>
<groupId>org.apache.qpid</groupId>
<artifactId>proton-j</artifactId>
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
deleted file mode 100644
index 7a6c4d6..0000000
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsCompletionListener.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.qpid.jms;
-
-import javax.jms.Message;
-
-/**
- * Interface used to implement listeners for asynchronous {@link javax.jms.Message}
- * sends which will be notified on successful completion of a send or be notified of an
- * error that was encountered while attempting to send a {@link javax.jms.Message}.
- */
-public interface JmsCompletionListener {
-
- /**
- * Called when an asynchronous send operation completes successfully.
- *
- * @param message
- * the {@link javax.jms.Message} that was successfully sent.
- */
- void onCompletion(Message message);
-
- /**
- * Called when an asynchronous send operation fails to complete, the state
- * of the send is unknown at this point.
- *
- * @param message
- * the {@link javax.jms.Message} that was to be sent.
- * @param exception
- * the {@link java.lang.Exception} that describes the send error.
- */
- void onException(Message message, Exception exception);
-
-}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
index a04d1b3..79fe8d8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -257,6 +257,16 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
@Override
+ public Session createSession() throws JMSException {
+ return createSession(false, Session.AUTO_ACKNOWLEDGE);
+ }
+
+ @Override
+ public Session createSession(int acknowledgeMode) throws JMSException {
+ return createSession(acknowledgeMode == Session.SESSION_TRANSACTED ? true : false, acknowledgeMode);
+ }
+
+ @Override
public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
checkClosedOrFailed();
connect();
@@ -347,6 +357,20 @@ public class JmsConnection implements AutoCloseable, Connection, TopicConnection
}
@Override
+ public ConnectionConsumer createSharedConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ throw new JMSException("Not supported");
+ }
+
+ @Override
+ public ConnectionConsumer createSharedDurableConnectionConsumer(Topic topic, String subscriptionName, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ throw new JMSException("Not supported");
+ }
+
+ @Override
public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
checkClosedOrFailed();
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
index 217749c..4105842 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -25,6 +25,7 @@ import java.util.Map;
import javax.jms.Connection;
import javax.jms.ConnectionFactory;
import javax.jms.ExceptionListener;
+import javax.jms.JMSContext;
import javax.jms.JMSException;
import javax.jms.QueueConnection;
import javax.jms.QueueConnectionFactory;
@@ -234,6 +235,35 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
}
}
+ //----- JMSContext Creation methods --------------------------------------//
+
+ @Override
+ public JMSContext createContext() {
+ return createContext(getUsername(), getPassword(), JMSContext.AUTO_ACKNOWLEDGE);
+ }
+
+ @Override
+ public JMSContext createContext(int sessionMode) {
+ return createContext(getUsername(), getPassword(), sessionMode);
+ }
+
+ @Override
+ public JMSContext createContext(String username, String password) {
+ return createContext(username, password, JMSContext.AUTO_ACKNOWLEDGE);
+ }
+
+ @Override
+ public JMSContext createContext(String username, String password, int sessionMode) {
+ try {
+ JmsConnection connection = (JmsConnection) createConnection(username, password);
+ return new JmsContext(connection, sessionMode);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ //----- Internal Support Methods -----------------------------------------//
+
protected Provider createProvider(URI remoteURI) throws Exception {
if (remoteURI == null) {
remoteURI = new URI(getDefaultRemoteAddress());
@@ -279,9 +309,7 @@ public class JmsConnectionFactory extends JNDIStorable implements ConnectionFact
this.connectionIdGenerator = connectionIdGenerator;
}
- //////////////////////////////////////////////////////////////////////////
- // Property getters and setters
- //////////////////////////////////////////////////////////////////////////
+ //----- Property Access Methods ------------------------------------------//
/**
* @return the remoteURI
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java
new file mode 100644
index 0000000..19691e7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConsumer.java
@@ -0,0 +1,135 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSConsumer;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.MessageListener;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+
+@SuppressWarnings("unused")
+public class JmsConsumer implements JMSConsumer, AutoCloseable {
+
+ private final JmsSession session;
+ private final JmsMessageConsumer consumer;
+
+ public JmsConsumer(JmsSession session, JmsMessageConsumer consumer) {
+ this.session = session;
+ this.consumer = consumer;
+ }
+
+ @Override
+ public void close() {
+ try {
+ consumer.close();
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ //----- MessageConsumer Property Methods ---------------------------------//
+
+ @Override
+ public MessageListener getMessageListener() {
+ try {
+ return consumer.getMessageListener();
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ @Override
+ public String getMessageSelector() {
+ try {
+ return consumer.getMessageSelector();
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) {
+ try {
+ consumer.setMessageListener(listener);
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ //----- Receive Methods --------------------------------------------------//
+
+ @Override
+ public Message receive() {
+ try {
+ return consumer.receive();
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receive(long timeout) {
+ try {
+ return consumer.receive(timeout);
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ @Override
+ public Message receiveNoWait() {
+ try {
+ return consumer.receiveNoWait();
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T receiveBody(Class<T> desired) {
+ try {
+ return consumer.receiveBody(desired, -1);
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T receiveBody(Class<T> desired, long timeout) {
+ try {
+ // Configure for infinite wait when timeout is zero (JMS Spec)
+ if (timeout == 0) {
+ timeout = -1;
+ }
+
+ return consumer.receiveBody(desired, timeout);
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+
+ @Override
+ public <T> T receiveBodyNoWait(Class<T> desired) {
+ try {
+ return consumer.receiveBody(desired, 0);
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java
new file mode 100644
index 0000000..008da24
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsContext.java
@@ -0,0 +1,510 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Serializable;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.BytesMessage;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.JMSConsumer;
+import javax.jms.JMSContext;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.JMSRuntimeException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+public class JmsContext implements JMSContext, AutoCloseable {
+
+ private final JmsConnection connection;
+ private final AtomicLong connectionRefCount;
+ private final int sessionMode;
+
+ private JmsSession session;
+ private JmsMessageProducer sharedProducer;
+ private boolean autoStart = true;
+
+ public JmsContext(JmsConnection connection, int sessionMode) {
+ this(connection, sessionMode, new AtomicLong(1));
+ }
+
+ private JmsContext(JmsConnection connection, int sessionMode, AtomicLong connectionRefCount) {
+ this.connection = connection;
+ this.sessionMode = sessionMode;
+ this.connectionRefCount = connectionRefCount;
+ }
+
+ @Override
+ public void start() {
+ try {
+ connection.start();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public void stop() {
+ try {
+ connection.stop();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public void close() {
+ JMSRuntimeException failure = null;
+
+ synchronized (this) {
+ try {
+ if (session != null) {
+ session.close();
+ }
+ } catch (JMSException jmse) {
+ failure = JmsExceptionSupport.createRuntimeException(jmse);
+ }
+
+ if (connectionRefCount.decrementAndGet() == 0) {
+ try {
+ connection.close();
+ } catch (JMSException jmse) {
+ failure = JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+ }
+
+ if (failure != null) {
+ throw failure;
+ }
+ }
+
+ //----- Session state management -----------------------------------------//
+
+ @Override
+ public void acknowledge() {
+ if (getSessionMode() == Session.CLIENT_ACKNOWLEDGE) {
+ try {
+ getSession().acknowledge(ACK_TYPE.ACCEPTED);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+ }
+
+ @Override
+ public void commit() {
+ try {
+ getSession().commit();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public void rollback() {
+ try {
+ getSession().rollback();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public void recover() {
+ try {
+ getSession().recover();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public void unsubscribe(String name) {
+ try {
+ getSession().unsubscribe(name);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ //----- Message Factory methods ------------------------------------------//
+
+ @Override
+ public BytesMessage createBytesMessage() {
+ try {
+ return getSession().createBytesMessage();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public MapMessage createMapMessage() {
+ try {
+ return getSession().createMapMessage();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public Message createMessage() {
+ try {
+ return getSession().createMessage();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() {
+ try {
+ return getSession().createObjectMessage();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage(Serializable object) {
+ try {
+ return getSession().createObjectMessage(object);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() {
+ try {
+ return getSession().createStreamMessage();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public TextMessage createTextMessage() {
+ try {
+ return getSession().createTextMessage();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public TextMessage createTextMessage(String text) {
+ try {
+ return getSession().createTextMessage(text);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ //----- Destination Creation ---------------------------------------------//
+
+ @Override
+ public Queue createQueue(String queueName) {
+ try {
+ return getSession().createQueue(queueName);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public Topic createTopic(String topicName) {
+ try {
+ return getSession().createTopic(topicName);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public TemporaryQueue createTemporaryQueue() {
+ try {
+ return getSession().createTemporaryQueue();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public TemporaryTopic createTemporaryTopic() {
+ try {
+ return getSession().createTemporaryTopic();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ //----- JMSContext factory methods --------------------------------------//
+
+ @Override
+ public JMSContext createContext(int sessionMode) {
+ synchronized (this) {
+ if (connectionRefCount.get() == 0) {
+ throw new IllegalStateRuntimeException("The Connection is closed");
+ }
+
+ connectionRefCount.incrementAndGet();
+
+ return new JmsContext(connection, sessionMode, connectionRefCount);
+ }
+ }
+
+ //----- JMSProducer factory methods --------------------------------------//
+
+ @Override
+ public JMSProducer createProducer() {
+ try {
+ if (sharedProducer == null) {
+ synchronized (this) {
+ if (sharedProducer == null) {
+ sharedProducer = (JmsMessageProducer) getSession().createProducer(null);
+ }
+ }
+ }
+
+ return new JmsProducer(getSession(), sharedProducer);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ //----- JMSConsumer factory methods --------------------------------------//
+
+ @Override
+ public JMSConsumer createConsumer(Destination destination) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createConsumer(destination)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(Destination destination, String selector) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createConsumer(destination, selector)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createConsumer(Destination destination, String selector, boolean noLocal) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createConsumer(destination, selector, noLocal)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createDurableConsumer(Topic topic, String name) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createDurableConsumer(topic, name)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createDurableConsumer(Topic topic, String name, String selector, boolean noLocal) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createDurableConsumer(topic, name, selector, noLocal)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedConsumer(Topic topic, String name) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedConsumer(topic, name)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedConsumer(Topic topic, String name, String selector) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedConsumer(topic, name, selector)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedDurableConsumer(Topic topic, String name) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedDurableConsumer(topic, name)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSConsumer createSharedDurableConsumer(Topic topic, String name, String selector) {
+ try {
+ return startIfNeeded(new JmsConsumer(getSession(), (JmsMessageConsumer) getSession().createSharedDurableConsumer(topic, name, selector)));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ //----- QueueBrowser Factory Methods -------------------------------------//
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue) {
+ try {
+ return startIfNeeded(getSession().createBrowser(queue));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public QueueBrowser createBrowser(Queue queue, String selector) {
+ try {
+ return startIfNeeded(getSession().createBrowser(queue, selector));
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ //----- Get or Set Context and Session values ----------------------------//
+
+ @Override
+ public boolean getAutoStart() {
+ return autoStart;
+ }
+
+ @Override
+ public void setAutoStart(boolean autoStart) {
+ this.autoStart = autoStart;
+ }
+
+ @Override
+ public String getClientID() {
+ try {
+ return connection.getClientID();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public void setClientID(String clientID) {
+ try {
+ connection.setClientID(clientID);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public ExceptionListener getExceptionListener() {
+ try {
+ return connection.getExceptionListener();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public void setExceptionListener(ExceptionListener listener) {
+ try {
+ connection.setExceptionListener(listener);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public ConnectionMetaData getMetaData() {
+ try {
+ return connection.getMetaData();
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public int getSessionMode() {
+ return sessionMode;
+ }
+
+ @Override
+ public boolean getTransacted() {
+ return sessionMode == JMSContext.SESSION_TRANSACTED;
+ }
+
+ //----- Internal implementation methods ----------------------------------//
+
+ private JmsSession getSession() {
+ if (session == null) {
+ synchronized (this) {
+ if (session == null) {
+ try {
+ session = (JmsSession) connection.createSession(getSessionMode());
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+ }
+ }
+
+ return session;
+ }
+
+ private QueueBrowser startIfNeeded(QueueBrowser browser) throws JMSException {
+ if (getAutoStart()) {
+ connection.start();
+ }
+
+ return browser;
+ }
+
+ private JmsConsumer startIfNeeded(JmsConsumer consumer) throws JMSException {
+ if (getAutoStart()) {
+ connection.start();
+ }
+
+ return consumer;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
index 18fd764..ea93019 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageConsumer.java
@@ -26,6 +26,7 @@ import javax.jms.IllegalStateException;
import javax.jms.JMSException;
import javax.jms.Message;
import javax.jms.MessageConsumer;
+import javax.jms.MessageFormatException;
import javax.jms.MessageListener;
import javax.jms.Session;
@@ -213,6 +214,58 @@ public class JmsMessageConsumer implements AutoCloseable, MessageConsumer, JmsMe
}
/**
+ * Reads the next available message for this consumer and returns the body of that message
+ * if the type requested matches that of the message. The amount of time this method blocks
+ * is based on the timeout value.
+ *
+ * {@literal timeout < 0} then it blocks until a message is received.
+ * {@literal timeout = 0} then it returns the body immediately or null if none available.
+ * {@literal timeout > 0} then it blocks up to timeout amount of time.
+ *
+ * @param desired
+ * The type to assign the body of the message to for return.
+ * @param timeout
+ * The time to wait for an incoming message before this method returns null.
+ *
+ * @return the assigned body of the next available message or null if the consumer is closed
+ * or the specified timeout elapses.
+ *
+ * @throws MessageFormatException if the message body cannot be assigned to the requested type.
+ * @throws JMSException if an error occurs while receiving the next message.
+ */
+ public <T> T receiveBody(Class<T> desired, long timeout) throws JMSException {
+ checkClosed();
+ checkMessageListener();
+
+ T messageBody = null;
+ JmsInboundMessageDispatch envelope = null;
+
+ try {
+ envelope = dequeue(timeout, connection.isReceiveLocalOnly());
+ if (envelope != null) {
+ messageBody = envelope.getMessage().getBody(desired);
+ }
+ } catch (MessageFormatException mfe) {
+ // Should behave as if receiveBody never happened in these modes.
+ if (acknowledgementMode == Session.AUTO_ACKNOWLEDGE ||
+ acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE) {
+
+ envelope.setEnqueueFirst(true);
+ onInboundMessage(envelope);
+ envelope = null;
+ }
+
+ throw mfe;
+ } finally {
+ if (envelope != null) {
+ ackFromReceive(envelope);
+ }
+ }
+
+ return messageBody;
+ }
+
+ /**
* Used to get an enqueued message from the unconsumedMessages list. The
* amount of time this method blocks is based on the timeout value.
*
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
index 65812b7..6e9c96d 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageProducer.java
@@ -20,6 +20,7 @@ import java.util.concurrent.atomic.AtomicBoolean;
import java.util.concurrent.atomic.AtomicLong;
import java.util.concurrent.atomic.AtomicReference;
+import javax.jms.CompletionListener;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@@ -43,6 +44,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
protected final JmsConnection connection;
protected JmsProducerInfo producerInfo;
protected final boolean anonymousProducer;
+ protected long deliveryDelay = Message.DEFAULT_DELIVERY_DELAY;
protected int deliveryMode = DeliveryMode.PERSISTENT;
protected int priority = Message.DEFAULT_PRIORITY;
protected long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
@@ -110,44 +112,50 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
}
@Override
+ public long getDeliveryDelay() throws JMSException {
+ checkClosed();
+ return deliveryMode;
+ }
+
+ @Override
public int getDeliveryMode() throws JMSException {
checkClosed();
- return this.deliveryMode;
+ return deliveryMode;
}
@Override
public Destination getDestination() throws JMSException {
checkClosed();
- return this.producerInfo.getDestination();
+ return producerInfo.getDestination();
}
@Override
public boolean getDisableMessageID() throws JMSException {
checkClosed();
- return this.disableMessageId;
+ return disableMessageId;
}
@Override
public boolean getDisableMessageTimestamp() throws JMSException {
checkClosed();
- return this.disableTimestamp;
+ return disableTimestamp;
}
@Override
public int getPriority() throws JMSException {
checkClosed();
- return this.priority;
+ return priority;
}
@Override
public long getTimeToLive() throws JMSException {
checkClosed();
- return this.timeToLive;
+ return timeToLive;
}
@Override
public void send(Message message) throws JMSException {
- send(message, this.deliveryMode, this.priority, this.timeToLive);
+ send(message, deliveryMode, priority, timeToLive);
}
@Override
@@ -163,7 +171,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
@Override
public void send(Destination destination, Message message) throws JMSException {
- send(destination, message, this.deliveryMode, this.priority, this.timeToLive);
+ send(destination, message, deliveryMode, priority, timeToLive);
}
@Override
@@ -177,37 +185,13 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
sendMessage(destination, message, deliveryMode, priority, timeToLive, null);
}
- /**
- * Sends the message asynchronously and notifies the assigned listener on success or failure
- *
- * @param message
- * the {@link javax.jms.Message} to send.
- * @param listener
- * the {@link JmsCompletionListener} to notify on send success or failure.
- *
- * @throws JMSException if an error occurs while attempting to send the Message.
- */
- public void send(Message message, JmsCompletionListener listener) throws JMSException {
- send(message, this.deliveryMode, this.priority, this.timeToLive, listener);
+ @Override
+ public void send(Message message, CompletionListener listener) throws JMSException {
+ send(message, deliveryMode, priority, timeToLive, listener);
}
- /**
- * Sends the message asynchronously and notifies the assigned listener on success or failure
- *
- * @param message
- * the {@link javax.jms.Message} to send.
- * @param deliveryMode
- * the delivery mode to assign to the outbound Message.
- * @param priority
- * the priority to assign to the outbound Message.
- * @param timeToLive
- * the time to live value to assign to the outbound Message.
- * @param listener
- * the {@link JmsCompletionListener} to notify on send success or failure.
- *
- * @throws JMSException if an error occurs while attempting to send the Message.
- */
- public void send(Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+ @Override
+ public void send(Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
checkClosed();
if (anonymousProducer) {
@@ -221,41 +205,13 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
sendMessage(producerInfo.getDestination(), message, deliveryMode, priority, timeToLive, listener);
}
- /**
- * Sends the message asynchronously and notifies the assigned listener on success or failure
- *
- * @param destination
- * the Destination to send the given Message to.
- * @param message
- * the {@link javax.jms.Message} to send.
- * @param listener
- * the {@link JmsCompletionListener} to notify on send success or failure.
- *
- * @throws JMSException if an error occurs while attempting to send the Message.
- */
- public void send(Destination destination, Message message, JmsCompletionListener listener) throws JMSException {
+ @Override
+ public void send(Destination destination, Message message, CompletionListener listener) throws JMSException {
send(destination, message, this.deliveryMode, this.priority, this.timeToLive, listener);
}
- /**
- * Sends the message asynchronously and notifies the assigned listener on success or failure
- *
- * @param destination
- * the Destination to send the given Message to.
- * @param message
- * the {@link javax.jms.Message} to send.
- * @param deliveryMode
- * the delivery mode to assign to the outbound Message.
- * @param priority
- * the priority to assign to the outbound Message.
- * @param timeToLive
- * the time to live value to assign to the outbound Message.
- * @param listener
- * the {@link JmsCompletionListener} to notify on send success or failure.
- *
- * @throws JMSException if an error occurs while attempting to send the Message.
- */
- public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+ @Override
+ public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
checkClosed();
if (!anonymousProducer) {
@@ -269,12 +225,18 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
sendMessage(destination, message, deliveryMode, priority, timeToLive, listener);
}
- private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, JmsCompletionListener listener) throws JMSException {
+ private void sendMessage(Destination destination, Message message, int deliveryMode, int priority, long timeToLive, CompletionListener listener) throws JMSException {
if (destination == null) {
throw new InvalidDestinationException("Don't understand null destinations");
}
- this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, listener);
+ this.session.send(this, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, listener);
+ }
+
+ @Override
+ public void setDeliveryDelay(long deliveryDelay) throws JMSException {
+ checkClosed();
+ this.deliveryDelay = deliveryDelay;
}
@Override
@@ -318,7 +280,7 @@ public class JmsMessageProducer implements AutoCloseable, MessageProducer {
* @return the next logical sequence for a Message sent from this Producer.
*/
protected long getNextMessageSequence() {
- return this.messageSequence.incrementAndGet();
+ return messageSequence.incrementAndGet();
}
protected void checkClosed() throws IllegalStateException {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
new file mode 100644
index 0000000..a90ec23
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsProducer.java
@@ -0,0 +1,454 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkPropertyNameIsValid;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.checkValidObject;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.convertPropertyTo;
+
+import java.io.Serializable;
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Set;
+
+import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.JMSProducer;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.ObjectMessage;
+import javax.jms.TextMessage;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+
+public class JmsProducer implements JMSProducer {
+
+ private final JmsSession session;
+ private final JmsMessageProducer producer;
+
+ private CompletionListener completionListener;
+
+ // Message Headers
+ private String correlationId;
+ private String type;
+ private Destination replyTo;
+ private byte[] correlationIdBytes;
+
+ // Producer send configuration
+ private long deliveryDelay = Message.DEFAULT_DELIVERY_DELAY;
+ private int deliveryMode = DeliveryMode.PERSISTENT;
+ private int priority = Message.DEFAULT_PRIORITY;
+ private long timeToLive = Message.DEFAULT_TIME_TO_LIVE;
+ private boolean disableMessageId;
+ private boolean disableTimestamp;
+
+ // Message Properties
+ private final Map<String, Object> messageProperties = new HashMap<String, Object>();
+
+ /**
+ * Create a new JMSProducer instance.
+ *
+ * The producer is backed by the given Session object and uses the shared MessageProducer
+ * instance to send all of its messages.
+ *
+ * @param session
+ * The Session that created this JMSProducer
+ * @param producer
+ * The shared MessageProducer owned by the parent Session.
+ */
+ public JmsProducer(JmsSession session, JmsMessageProducer producer) {
+ this.session = session;
+ this.producer = producer;
+ }
+
+ //----- Send Methods -----------------------------------------------------//
+
+ @Override
+ public JMSProducer send(Destination destination, Message message) {
+ try {
+ doSend(destination, message);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, byte[] body) {
+ try {
+ BytesMessage message = session.createBytesMessage();
+ message.writeBytes(body);
+ doSend(destination, message);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, Map<String, Object> body) {
+ try {
+ MapMessage message = session.createMapMessage();
+ for (Map.Entry<String, Object> entry : body.entrySet()) {
+ message.setObject(entry.getKey(), entry.getValue());
+ }
+
+ doSend(destination, message);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, Serializable body) {
+ try {
+ ObjectMessage message = session.createObjectMessage();
+ message.setObject(body);
+ doSend(destination, message);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+
+ return this;
+ }
+
+ @Override
+ public JMSProducer send(Destination destination, String body) {
+ try {
+ TextMessage message = session.createTextMessage(body);
+ doSend(destination, message);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+
+ return this;
+ }
+
+ private void doSend(Destination destination, Message message) throws JMSException {
+
+ for (Map.Entry<String, Object> entry : messageProperties.entrySet()) {
+ message.setObjectProperty(entry.getKey(), entry.getValue());
+ }
+
+ if (correlationId != null) {
+ message.setJMSCorrelationID(correlationId);
+ }
+ if (correlationIdBytes != null) {
+ message.setJMSCorrelationIDAsBytes(correlationIdBytes);
+ }
+ if (type != null) {
+ message.setJMSType(type);
+ }
+ if (replyTo != null) {
+ message.setJMSReplyTo(replyTo);
+ }
+
+ session.send(producer, destination, message, deliveryMode, priority, timeToLive, disableMessageId, disableTimestamp, deliveryDelay, completionListener);
+ }
+
+ //----- Message Property Methods -----------------------------------------//
+
+ @Override
+ public JMSProducer clearProperties() {
+ messageProperties.clear();
+ return this;
+ }
+
+ @Override
+ public Set<String> getPropertyNames() {
+ return new HashSet<String>(messageProperties.keySet());
+ }
+
+ @Override
+ public boolean propertyExists(String name) {
+ return messageProperties.containsKey(name);
+ }
+
+ @Override
+ public boolean getBooleanProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), Boolean.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public byte getByteProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), Byte.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public double getDoubleProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), Double.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public float getFloatProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), Float.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public int getIntProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), Integer.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public long getLongProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), Long.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public Object getObjectProperty(String name) {
+ return messageProperties.get(name);
+ }
+
+ @Override
+ public short getShortProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), Short.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public String getStringProperty(String name) {
+ try {
+ return convertPropertyTo(name, messageProperties.get(name), String.class);
+ } catch (JMSException jmse) {
+ throw JmsExceptionSupport.createRuntimeException(jmse);
+ }
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, boolean value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, byte value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, double value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, float value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, int value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, long value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, Object value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, short value) {
+ return setObjectProperty(name, value);
+ }
+
+ @Override
+ public JMSProducer setProperty(String name, String value) {
+ return setObjectProperty(name, value);
+ }
+
+ //----- Message Headers --------------------------------------------------//
+
+ @Override
+ public String getJMSCorrelationID() {
+ return correlationId;
+ }
+
+ @Override
+ public JMSProducer setJMSCorrelationID(String correlationId) {
+ this.correlationId = correlationId;
+ return this;
+ }
+
+ @Override
+ public byte[] getJMSCorrelationIDAsBytes() {
+ return correlationIdBytes;
+ }
+
+ @Override
+ public JMSProducer setJMSCorrelationIDAsBytes(byte[] correlationIdBytes) {
+ this.correlationIdBytes = correlationIdBytes;
+ return this;
+ }
+
+ @Override
+ public Destination getJMSReplyTo() {
+ return replyTo;
+ }
+
+ @Override
+ public JMSProducer setJMSReplyTo(Destination replyTo) {
+ try {
+ JmsMessageTransformation.transformDestination(session.getConnection(), replyTo);
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+
+ return this;
+ }
+
+ @Override
+ public String getJMSType() {
+ return type;
+ }
+
+ @Override
+ public JMSProducer setJMSType(String type) {
+ this.type = type;
+ return this;
+ }
+
+ //----- Producer Send Configuration --------------------------------------//
+
+ @Override
+ public CompletionListener getAsync() {
+ return completionListener;
+ }
+
+ @Override
+ public JMSProducer setAsync(CompletionListener completionListener) {
+ this.completionListener = completionListener;
+ return this;
+ }
+
+ @Override
+ public long getDeliveryDelay() {
+ return deliveryDelay;
+ }
+
+ @Override
+ public JMSProducer setDeliveryDelay(long deliveryDelay) {
+ this.deliveryDelay = deliveryDelay;
+ return this;
+ }
+
+ @Override
+ public int getDeliveryMode() {
+ return deliveryMode;
+ }
+
+ @Override
+ public JMSProducer setDeliveryMode(int deliveryMode) {
+ this.deliveryMode = deliveryMode;
+ return this;
+ }
+
+ @Override
+ public boolean getDisableMessageID() {
+ return disableMessageId;
+ }
+
+ @Override
+ public JMSProducer setDisableMessageID(boolean disableMessageId) {
+ this.disableMessageId = disableMessageId;
+ return this;
+ }
+
+ @Override
+ public boolean getDisableMessageTimestamp() {
+ return disableTimestamp;
+ }
+
+ @Override
+ public JMSProducer setDisableMessageTimestamp(boolean disableTimestamp) {
+ this.disableTimestamp = disableTimestamp;
+ return this;
+ }
+
+ @Override
+ public int getPriority() {
+ return priority;
+ }
+
+ @Override
+ public JMSProducer setPriority(int priority) {
+ this.priority = priority;
+ return this;
+ }
+
+ @Override
+ public long getTimeToLive() {
+ return timeToLive;
+ }
+
+ @Override
+ public JMSProducer setTimeToLive(long timeToLive) {
+ this.timeToLive = timeToLive;
+ return this;
+ }
+
+ //----- Internal support methods -----------------------------------------//
+
+ private JMSProducer setObjectProperty(String name, Object value) {
+ try {
+ checkPropertyNameIsValid(name, session.getConnection().isValidatePropertyNames());
+ checkValidObject(value);
+ messageProperties.put(name, value);
+ return this;
+ } catch (JMSException e) {
+ throw JmsExceptionSupport.createRuntimeException(e);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
index 817f342..e740ba7 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -34,6 +34,7 @@ import java.util.concurrent.atomic.AtomicReference;
import java.util.concurrent.locks.ReentrantLock;
import javax.jms.BytesMessage;
+import javax.jms.CompletionListener;
import javax.jms.DeliveryMode;
import javax.jms.Destination;
import javax.jms.IllegalStateException;
@@ -471,6 +472,22 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return result;
}
+ /**
+ * @see javax.jms.Session#createDurableConsumer(javax.jms.Topic, java.lang.String)
+ */
+ @Override
+ public MessageConsumer createDurableConsumer(Topic topic, String name) throws JMSException {
+ return createDurableSubscriber(topic, name, null, false);
+ }
+
+ /**
+ * @see javax.jms.Session#createDurableConsumer(javax.jms.Topic, java.lang.String, java.lang.String, boolean)
+ */
+ @Override
+ public MessageConsumer createDurableConsumer(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+ return createDurableSubscriber(topic, name, messageSelector, noLocal);
+ }
+
protected void checkClientIDWasSetExplicitly() throws IllegalStateException {
if (!connection.isExplicitClientID()) {
throw new IllegalStateException("You must specify a unique clientID for the Connection to use a DurableSubscriber");
@@ -486,6 +503,46 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
connection.unsubscribe(name);
}
+ /**
+ * @see javax.jms.Session#createSharedConsumer(javax.jms.Topic, java.lang.String)
+ */
+ @Override
+ public MessageConsumer createSharedConsumer(Topic topic, String name) throws JMSException {
+ checkClosed();
+ // TODO Auto-generated method stub
+ throw new JMSException("Not yet implemented");
+ }
+
+ /**
+ * @see javax.jms.Session#createSharedConsumer(javax.jms.Topic, java.lang.String, java.lang.String)
+ */
+ @Override
+ public MessageConsumer createSharedConsumer(Topic topic, String name, String selector) throws JMSException {
+ checkClosed();
+ // TODO Auto-generated method stub
+ throw new JMSException("Not yet implemented");
+ }
+
+ /**
+ * @see javax.jms.Session#createSharedDurableConsumer(javax.jms.Topic, java.lang.String)
+ */
+ @Override
+ public MessageConsumer createSharedDurableConsumer(Topic topic, String name) throws JMSException {
+ checkClosed();
+ // TODO Auto-generated method stub
+ throw new JMSException("Not yet implemented");
+ }
+
+ /**
+ * @see javax.jms.Session#createSharedDurableConsumer(javax.jms.Topic, java.lang.String, java.lang.String)
+ */
+ @Override
+ public MessageConsumer createSharedDurableConsumer(Topic topic, String name, String selector) throws JMSException {
+ checkClosed();
+ // TODO Auto-generated method stub
+ throw new JMSException("Not yet implemented");
+ }
+
//////////////////////////////////////////////////////////////////////////
// Producer creation
//////////////////////////////////////////////////////////////////////////
@@ -653,17 +710,17 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
connection.onException(ex);
}
- protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
+ protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
if (destination.isTemporary() && ((JmsTemporaryDestination) destination).isDeleted()) {
throw new IllegalStateException("Temporary destination has been deleted");
}
- send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, listener);
+ send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp, deliveryDelay, listener);
}
- private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, JmsCompletionListener listener) throws JMSException {
+ private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp, long deliveryDelay, CompletionListener listener) throws JMSException {
sendLock.lock();
try {
original.setJMSDeliveryMode(deliveryMode);
@@ -672,7 +729,8 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
original.setJMSDestination(destination);
long timeStamp = System.currentTimeMillis();
- boolean hasTTL = timeToLive > 0;
+ boolean hasTTL = timeToLive > Message.DEFAULT_TIME_TO_LIVE;
+ boolean hasDelay = deliveryDelay > Message.DEFAULT_DELIVERY_DELAY;
if (!disableTimestamp) {
original.setJMSTimestamp(timeStamp);
@@ -686,6 +744,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
original.setJMSExpiration(0);
}
+ if (hasDelay) {
+ original.setJMSDeliveryTime(timeStamp + timeToLive);
+ } else {
+ original.setJMSDeliveryTime(0);
+ }
+
boolean isJmsMessage = original instanceof JmsMessage;
long messageSequence = producer.getNextMessageSequence();
@@ -966,6 +1030,10 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
return sessionInfo.getId();
}
+ protected int getSessionMode() {
+ return acknowledgementMode;
+ }
+
protected JmsConsumerId getNextConsumerId() {
return new JmsConsumerId(sessionInfo.getId(), consumerIdGenerator.incrementAndGet());
}
@@ -1272,12 +1340,12 @@ public class JmsSession implements AutoCloseable, Session, QueueSession, TopicSe
private final class SendCompletion {
private final JmsOutboundMessageDispatch envelope;
- private final JmsCompletionListener listener;
+ private final CompletionListener listener;
private Exception failureCause;
private boolean completed;
- public SendCompletion(JmsOutboundMessageDispatch envelope, JmsCompletionListener listener) {
+ public SendCompletion(JmsOutboundMessageDispatch envelope, CompletionListener listener) {
this.envelope = envelope;
this.listener = listener;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
index 48493bd..5f255f2 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
@@ -16,9 +16,29 @@
*/
package org.apache.qpid.jms.exceptions;
+import javax.jms.IllegalStateException;
+import javax.jms.IllegalStateRuntimeException;
+import javax.jms.InvalidClientIDException;
+import javax.jms.InvalidClientIDRuntimeException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidDestinationRuntimeException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.InvalidSelectorRuntimeException;
import javax.jms.JMSException;
+import javax.jms.JMSRuntimeException;
+import javax.jms.JMSSecurityException;
+import javax.jms.JMSSecurityRuntimeException;
import javax.jms.MessageEOFException;
import javax.jms.MessageFormatException;
+import javax.jms.MessageFormatRuntimeException;
+import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageNotWriteableRuntimeException;
+import javax.jms.ResourceAllocationException;
+import javax.jms.ResourceAllocationRuntimeException;
+import javax.jms.TransactionInProgressException;
+import javax.jms.TransactionInProgressRuntimeException;
+import javax.jms.TransactionRolledBackException;
+import javax.jms.TransactionRolledBackRuntimeException;
/**
* Exception support class.
@@ -143,4 +163,50 @@ public final class JmsExceptionSupport {
exception.initCause(cause);
return exception;
}
+
+ /**
+ * Creates the proper instance of a JMSRuntimeException based on the type
+ * of JMSException that is passed.
+ *
+ * @param exception
+ * The JMSException instance to convert to a JMSRuntimeException
+ *
+ * @return a new {@link JMSRuntimeException} instance that reflects the original error.
+ */
+ public static JMSRuntimeException createRuntimeException(Exception exception) {
+ JMSRuntimeException result = null;
+ JMSException source = null;
+
+ if (!(exception instanceof JMSException)) {
+ throw new JMSRuntimeException(exception.getMessage(), null, exception);
+ } else {
+ source = (JMSException) exception;
+ }
+
+ if (source instanceof IllegalStateException) {
+ result = new IllegalStateRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof InvalidClientIDException) {
+ result = new InvalidClientIDRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof InvalidDestinationException) {
+ result = new InvalidDestinationRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof InvalidSelectorException) {
+ result = new InvalidSelectorRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof JMSSecurityException) {
+ result = new JMSSecurityRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof MessageFormatException) {
+ result = new MessageFormatRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof MessageNotWriteableException) {
+ result = new MessageNotWriteableRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof ResourceAllocationException) {
+ result = new ResourceAllocationRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof TransactionInProgressException) {
+ result = new TransactionInProgressRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else if (source instanceof TransactionRolledBackException) {
+ result = new TransactionRolledBackRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ } else {
+ result = new JMSRuntimeException(source.getMessage(), source.getErrorCode(), source);
+ }
+
+ return result;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
index 95e51e0..75bc30c 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsBytesMessage.java
@@ -28,6 +28,7 @@ import javax.jms.MessageFormatException;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsBytesMessageFacade;
+@SuppressWarnings("unchecked")
public class JmsBytesMessage extends JmsMessage implements BytesMessage {
protected transient DataOutputStream dataOut;
@@ -396,6 +397,20 @@ public class JmsBytesMessage extends JmsMessage implements BytesMessage {
return "JmsBytesMessage { " + facade + " }";
}
+ @Override
+ public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+ return facade.hasBody() ? target.isAssignableFrom(byte[].class) : true;
+ }
+
+ @Override
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ if (!facade.hasBody()) {
+ return null;
+ }
+
+ return (T) facade.copyBody();
+ }
+
private void initializeWriting() throws JMSException {
checkReadOnlyBody();
if (this.dataOut == null) {
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
index add21f1..4ec02fe 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMapMessage.java
@@ -17,6 +17,8 @@
package org.apache.qpid.jms.message;
import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Map;
import javax.jms.JMSException;
import javax.jms.MapMessage;
@@ -27,6 +29,7 @@ import org.apache.qpid.jms.message.facade.JmsMapMessageFacade;
/**
* Implementation of the JMS MapMessage.
*/
+@SuppressWarnings("unchecked")
public class JmsMapMessage extends JmsMessage implements MapMessage {
JmsMapMessageFacade facade;
@@ -299,6 +302,27 @@ public class JmsMapMessage extends JmsMessage implements MapMessage {
return "JmsMapMessage { " + facade + " }";
}
+ @Override
+ public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+ return facade.hasBody() ? target.isAssignableFrom(Map.class) : true;
+ }
+
+ @Override
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ if (!facade.hasBody()) {
+ return null;
+ }
+
+ Map<String, Object> copy = new HashMap<String, Object>();
+ Enumeration<String> keys = facade.getMapNames();
+ while (keys.hasMoreElements()) {
+ String key = keys.nextElement();
+ copy.put(key, getObject(key));
+ }
+
+ return (T) copy;
+ }
+
private void put(String name, Object value) throws JMSException {
checkReadOnlyBody();
checkKeyNameIsValid(name);
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/0c39522c/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
index 68db061..05143d8 100644
--- a/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/message/JmsMessage.java
@@ -16,6 +16,8 @@
*/
package org.apache.qpid.jms.message;
+import static org.apache.qpid.jms.message.JmsMessagePropertySupport.convertPropertyTo;
+
import java.util.Collections;
import java.util.Enumeration;
import java.util.HashSet;
@@ -32,7 +34,6 @@ import org.apache.qpid.jms.JmsAcknowledgeCallback;
import org.apache.qpid.jms.JmsConnection;
import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
import org.apache.qpid.jms.message.facade.JmsMessageFacade;
-import org.apache.qpid.jms.util.TypeConversionSupport;
public class JmsMessage implements javax.jms.Message {
@@ -103,6 +104,24 @@ public class JmsMessage implements javax.jms.Message {
}
@Override
+ public boolean isBodyAssignableTo(@SuppressWarnings("rawtypes") Class target) throws JMSException {
+ return true;
+ }
+
+ @Override
+ public final <T> T getBody(Class<T> asType) throws JMSException {
+ if (isBodyAssignableTo(asType)) {
+ return doGetBody(asType);
+ }
+
+ throw new MessageFormatException("Message body cannot be read as type: " + asType);
+ }
+
+ protected <T> T doGetBody(Class<T> asType) throws JMSException {
+ return null;
+ }
+
+ @Override
public void clearBody() throws JMSException {
readOnlyBody = false;
facade.clearBody();
@@ -248,6 +267,16 @@ public class JmsMessage implements javax.jms.Message {
}
@Override
+ public long getJMSDeliveryTime() throws JMSException {
+ return facade.getDeliveryTime();
+ }
+
+ @Override
+ public void setJMSDeliveryTime(long deliveryTime) throws JMSException {
+ facade.setDeliveryTime(deliveryTime);
+ }
+
+ @Override
public void clearProperties() throws JMSException {
JmsMessagePropertyIntercepter.clearProperties(this, true);
}
@@ -288,106 +317,42 @@ public class JmsMessage implements javax.jms.Message {
@Override
public boolean getBooleanProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- return false;
- }
- Boolean rc = (Boolean) TypeConversionSupport.convert(value, Boolean.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a boolean");
- }
- return rc.booleanValue();
+ return convertPropertyTo(name, getObjectProperty(name), Boolean.class);
}
@Override
public byte getByteProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- throw new NumberFormatException("property " + name + " was null");
- }
- Byte rc = (Byte) TypeConversionSupport.convert(value, Byte.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a byte");
- }
- return rc.byteValue();
+ return convertPropertyTo(name, getObjectProperty(name), Byte.class);
}
@Override
public short getShortProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- throw new NumberFormatException("property " + name + " was null");
- }
- Short rc = (Short) TypeConversionSupport.convert(value, Short.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a short");
- }
- return rc.shortValue();
+ return convertPropertyTo(name, getObjectProperty(name), Short.class);
}
@Override
public int getIntProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- throw new NumberFormatException("property " + name + " was null");
- }
- Integer rc = (Integer) TypeConversionSupport.convert(value, Integer.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as an integer");
- }
- return rc.intValue();
+ return convertPropertyTo(name, getObjectProperty(name), Integer.class);
}
@Override
public long getLongProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- throw new NumberFormatException("property " + name + " was null");
- }
- Long rc = (Long) TypeConversionSupport.convert(value, Long.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a long");
- }
- return rc.longValue();
+ return convertPropertyTo(name, getObjectProperty(name), Long.class);
}
@Override
public float getFloatProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- throw new NullPointerException("property " + name + " was null");
- }
- Float rc = (Float) TypeConversionSupport.convert(value, Float.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a float");
- }
- return rc.floatValue();
+ return convertPropertyTo(name, getObjectProperty(name), Float.class);
}
@Override
public double getDoubleProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- throw new NullPointerException("property " + name + " was null");
- }
- Double rc = (Double) TypeConversionSupport.convert(value, Double.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a double");
- }
- return rc.doubleValue();
+ return convertPropertyTo(name, getObjectProperty(name), Double.class);
}
@Override
public String getStringProperty(String name) throws JMSException {
- Object value = getObjectProperty(name);
- if (value == null) {
- return null;
- }
- String rc = (String) TypeConversionSupport.convert(value, String.class);
- if (rc == null) {
- throw new MessageFormatException("Property " + name + " was a " + value.getClass().getName() + " and cannot be read as a String");
- }
- return rc;
+ return convertPropertyTo(name, getObjectProperty(name), String.class);
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org