You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:48 UTC
[24/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
new file mode 100644
index 0000000..a7ede86
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSession.java
@@ -0,0 +1,1002 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Serializable;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.Map;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.Executor;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+import java.util.concurrent.locks.ReentrantLock;
+
+import javax.jms.BytesMessage;
+import javax.jms.DeliveryMode;
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.InvalidDestinationException;
+import javax.jms.InvalidSelectorException;
+import javax.jms.JMSException;
+import javax.jms.MapMessage;
+import javax.jms.Message;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
+import javax.jms.MessageProducer;
+import javax.jms.ObjectMessage;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.QueueSession;
+import javax.jms.Session;
+import javax.jms.StreamMessage;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.TextMessage;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+import javax.jms.TopicSession;
+import javax.jms.TopicSubscriber;
+
+import org.apache.activemq.apollo.filter.FilterException;
+import org.apache.activemq.apollo.selector.SelectorParser;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsMessageTransformation;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsMessageId;
+import org.apache.qpid.jms.meta.JmsProducerId;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsSessionInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+
+/**
+ * JMS Session implementation
+ */
+@SuppressWarnings("static-access")
+public class JmsSession implements Session, QueueSession, TopicSession, JmsMessageDispatcher {
+
+ private final JmsConnection connection;
+ private final int acknowledgementMode;
+ private final List<JmsMessageProducer> producers = new CopyOnWriteArrayList<JmsMessageProducer>();
+ private final Map<JmsConsumerId, JmsMessageConsumer> consumers = new ConcurrentHashMap<JmsConsumerId, JmsMessageConsumer>();
+ private MessageListener messageListener;
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicBoolean started = new AtomicBoolean();
+ private final LinkedBlockingQueue<JmsInboundMessageDispatch> stoppedMessages =
+ new LinkedBlockingQueue<JmsInboundMessageDispatch>(10000);
+ private JmsPrefetchPolicy prefetchPolicy;
+ private JmsSessionInfo sessionInfo;
+ private ExecutorService executor;
+ private final ReentrantLock sendLock = new ReentrantLock();
+
+ private final AtomicLong consumerIdGenerator = new AtomicLong();
+ private final AtomicLong producerIdGenerator = new AtomicLong();
+ private JmsLocalTransactionContext transactionContext;
+ private JmsMessageFactory messageFactory;
+
+ protected JmsSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
+ this.connection = connection;
+ this.acknowledgementMode = acknowledgementMode;
+ this.prefetchPolicy = new JmsPrefetchPolicy(connection.getPrefetchPolicy());
+
+ setTransactionContext(new JmsLocalTransactionContext(this));
+
+ this.sessionInfo = new JmsSessionInfo(sessionId);
+ this.sessionInfo.setAcknowledgementMode(acknowledgementMode);
+ this.sessionInfo.setSendAcksAsync(connection.isSendAcksAsync());
+
+ this.sessionInfo = connection.createResource(sessionInfo);
+ this.messageFactory = connection.getMessageFactory();
+ }
+
+ int acknowledgementMode() {
+ return this.acknowledgementMode;
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // Session methods
+ //////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public int getAcknowledgeMode() throws JMSException {
+ checkClosed();
+ return this.acknowledgementMode;
+ }
+
+ @Override
+ public boolean getTransacted() throws JMSException {
+ checkClosed();
+ return isTransacted();
+ }
+
+ @Override
+ public MessageListener getMessageListener() throws JMSException {
+ checkClosed();
+ return this.messageListener;
+ }
+
+ @Override
+ public void setMessageListener(MessageListener listener) throws JMSException {
+ checkClosed();
+ this.messageListener = listener;
+ }
+
+ @Override
+ public void recover() throws JMSException {
+ checkClosed();
+ if (getTransacted()) {
+ throw new javax.jms.IllegalStateException("Cannot call recover() on a transacted session");
+ }
+
+ this.connection.recover(getSessionId());
+ }
+
+ @Override
+ public void commit() throws JMSException {
+ checkClosed();
+
+ if (!getTransacted()) {
+ throw new javax.jms.IllegalStateException("Not a transacted session");
+ }
+
+ this.transactionContext.commit();
+ }
+
+ @Override
+ public void rollback() throws JMSException {
+ checkClosed();
+ if (!getTransacted()) {
+ throw new javax.jms.IllegalStateException("Not a transacted session");
+ }
+
+ this.transactionContext.rollback();
+
+ getExecutor().execute(new Runnable() {
+ @Override
+ public void run() {
+ for (JmsMessageConsumer c : consumers.values()) {
+ c.drainMessageQueueToListener();
+ }
+ }
+ });
+ }
+
+ @Override
+ public void run() {
+ try {
+ checkClosed();
+ } catch (IllegalStateException e) {
+ throw new RuntimeException(e);
+ }
+
+ throw new UnsupportedOperationException();
+ }
+
+ @Override
+ public void close() throws JMSException {
+ if (!closed.get()) {
+ doClose();
+ }
+ }
+
+ /**
+ * Shutdown the Session and release all resources. Once completed the Session can
+ * request that the Provider destroy the Session and it's child resources.
+ *
+ * @throws JMSException
+ */
+ protected void doClose() throws JMSException {
+ boolean interrupted = Thread.interrupted();
+ shutdown();
+ this.connection.removeSession(this);
+ this.connection.destroyResource(sessionInfo);
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+
+ /**
+ * This method should terminate all Session resources and prepare for disposal of the
+ * Session. It is called either from the Session close method or from the Connection
+ * when a close request is made and the Connection wants to cleanup all Session resources.
+ *
+ * This method should not attempt to send a destroy request to the Provider as that
+ * will either be done by another session method or is not needed when done by the parent
+ * Connection.
+ *
+ * @throws JMSException
+ */
+ protected void shutdown() throws JMSException {
+ if (closed.compareAndSet(false, true)) {
+ stop();
+ for (JmsMessageConsumer consumer : new ArrayList<JmsMessageConsumer>(this.consumers.values())) {
+ consumer.shutdown();
+ }
+
+ for (JmsMessageProducer producer : this.producers) {
+ producer.shutdown();
+ }
+
+ try {
+ if (getTransactionContext().isInTransaction()) {
+ rollback();
+ }
+ } catch (JMSException e) {
+ }
+ }
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // Consumer creation
+ //////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @param destination
+ * @return a MessageConsumer
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination)
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ return createConsumer(destination, null);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return MessageConsumer
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+ * java.lang.String)
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+ return createConsumer(destination, messageSelector, false);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @param NoLocal
+ * @return the MessageConsumer
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+ * java.lang.String, boolean)
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean NoLocal) throws JMSException {
+ checkClosed();
+ checkDestination(destination);
+ messageSelector = checkSelector(messageSelector);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
+ JmsTopicSubscriber result = new JmsTopicSubscriber(getNextConsumerId(), this, dest, NoLocal, messageSelector);
+ result.init();
+ return result;
+ }
+
+ /**
+ * @param queue
+ * @return QueueRecevier
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
+ */
+ @Override
+ public QueueReceiver createReceiver(Queue queue) throws JMSException {
+ checkClosed();
+ checkDestination(queue);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
+ JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, "");
+ result.init();
+ return result;
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return QueueReceiver
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue,
+ * java.lang.String)
+ */
+ @Override
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
+ checkClosed();
+ checkDestination(queue);
+ messageSelector = checkSelector(messageSelector);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
+ JmsQueueReceiver result = new JmsQueueReceiver(getNextConsumerId(), this, dest, messageSelector);
+ result.init();
+ return result;
+ }
+
+ /**
+ * @param destination
+ * @return QueueBrowser
+ * @throws JMSException
+ * @see javax.jms.Session#createBrowser(javax.jms.Queue)
+ */
+ @Override
+ public QueueBrowser createBrowser(Queue destination) throws JMSException {
+ return createBrowser(destination, null);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return QueueBrowser
+ * @throws JMSException
+ * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String)
+ */
+ @Override
+ public QueueBrowser createBrowser(Queue destination, String messageSelector) throws JMSException {
+ checkClosed();
+ checkDestination(destination);
+ messageSelector = checkSelector(messageSelector);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
+ JmsQueueBrowser result = new JmsQueueBrowser(this, dest, messageSelector);
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @return TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic)
+ */
+ @Override
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
+ return createSubscriber(topic, null, false);
+ }
+
+ /**
+ * @param topic
+ * @param messageSelector
+ * @param noLocal
+ * @return TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createSubscriber(javax.jms.Topic,
+ * java.lang.String, boolean)
+ */
+ @Override
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
+ checkClosed();
+ checkDestination(topic);
+ messageSelector = checkSelector(messageSelector);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+ JmsTopicSubscriber result = new JmsTopicSubscriber(getNextConsumerId(), this, dest, noLocal, messageSelector);
+ result.init();
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @return a TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+ * java.lang.String)
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException {
+ return createDurableSubscriber(topic, name, null, false);
+ }
+
+ /**
+ * @param topic
+ * @param name
+ * @param messageSelector
+ * @param noLocal
+ * @return TopicSubscriber
+ * @throws JMSException
+ * @see javax.jms.Session#createDurableSubscriber(javax.jms.Topic,
+ * java.lang.String, java.lang.String, boolean)
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name, String messageSelector, boolean noLocal) throws JMSException {
+ checkClosed();
+ checkDestination(topic);
+ messageSelector = checkSelector(messageSelector);
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+ JmsTopicSubscriber result = new JmsDurableTopicSubscriber(getNextConsumerId(), this, dest, name, false, messageSelector);
+ result.init();
+ return result;
+ }
+
+ /**
+ * @param name
+ * @throws JMSException
+ * @see javax.jms.Session#unsubscribe(java.lang.String)
+ */
+ @Override
+ public void unsubscribe(String name) throws JMSException {
+ checkClosed();
+ this.connection.unsubscribe(name);
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // Producer creation
+ //////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @param destination
+ * @return MessageProducer
+ * @throws JMSException
+ * @see javax.jms.Session#createProducer(javax.jms.Destination)
+ */
+ @Override
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ checkClosed();
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, destination);
+ JmsMessageProducer result = new JmsMessageProducer(getNextProducerId(), this, dest);
+ add(result);
+ return result;
+ }
+
+ /**
+ * @param queue
+ * @return QueueSender
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createSender(javax.jms.Queue)
+ */
+ @Override
+ public QueueSender createSender(Queue queue) throws JMSException {
+ checkClosed();
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, queue);
+ JmsQueueSender result = new JmsQueueSender(getNextProducerId(), this, dest);
+ return result;
+ }
+
+ /**
+ * @param topic
+ * @return TopicPublisher
+ * @throws JMSException
+ * @see javax.jms.TopicSession#createPublisher(javax.jms.Topic)
+ */
+ @Override
+ public TopicPublisher createPublisher(Topic topic) throws JMSException {
+ checkClosed();
+ JmsDestination dest = JmsMessageTransformation.transformDestination(connection, topic);
+ JmsTopicPublisher result = new JmsTopicPublisher(getNextProducerId(), this, dest);
+ add(result);
+ return result;
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // Message creation
+ //////////////////////////////////////////////////////////////////////////
+
+ @Override
+ public BytesMessage createBytesMessage() throws JMSException {
+ checkClosed();
+ return init(messageFactory.createBytesMessage());
+ }
+
+ @Override
+ public MapMessage createMapMessage() throws JMSException {
+ checkClosed();
+ return init(messageFactory.createMapMessage());
+ }
+
+ @Override
+ public Message createMessage() throws JMSException {
+ checkClosed();
+ return init(messageFactory.createMessage());
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage() throws JMSException {
+ checkClosed();
+ return init(messageFactory.createObjectMessage(null));
+ }
+
+ @Override
+ public ObjectMessage createObjectMessage(Serializable object) throws JMSException {
+ checkClosed();
+ return init(messageFactory.createObjectMessage(object));
+ }
+
+ @Override
+ public StreamMessage createStreamMessage() throws JMSException {
+ checkClosed();
+ return init(messageFactory.createStreamMessage());
+ }
+
+ @Override
+ public TextMessage createTextMessage() throws JMSException {
+ checkClosed();
+ return init(messageFactory.createTextMessage(null));
+ }
+
+ @Override
+ public TextMessage createTextMessage(String text) throws JMSException {
+ checkClosed();
+ return init(messageFactory.createTextMessage(text));
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // Destination creation
+ //////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @param queueName
+ * @return Queue
+ * @throws JMSException
+ * @see javax.jms.Session#createQueue(java.lang.String)
+ */
+ @Override
+ public Queue createQueue(String queueName) throws JMSException {
+ checkClosed();
+ return new JmsQueue(queueName);
+ }
+
+ /**
+ * @param topicName
+ * @return Topic
+ * @throws JMSException
+ * @see javax.jms.Session#createTopic(java.lang.String)
+ */
+ @Override
+ public Topic createTopic(String topicName) throws JMSException {
+ checkClosed();
+ return new JmsTopic(topicName);
+ }
+
+ /**
+ * @return TemporaryQueue
+ * @throws JMSException
+ * @see javax.jms.Session#createTemporaryQueue()
+ */
+ @Override
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ checkClosed();
+ return connection.createTemporaryQueue();
+ }
+
+ /**
+ * @return TemporaryTopic
+ * @throws JMSException
+ * @see javax.jms.Session#createTemporaryTopic()
+ */
+ @Override
+ public TemporaryTopic createTemporaryTopic() throws JMSException {
+ checkClosed();
+ return connection.createTemporaryTopic();
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // Session Implementation methods
+ //////////////////////////////////////////////////////////////////////////
+
+ protected void add(JmsMessageConsumer consumer) throws JMSException {
+ this.consumers.put(consumer.getConsumerId(), consumer);
+ this.connection.addDispatcher(consumer.getConsumerId(), this);
+
+ if (started.get()) {
+ consumer.start();
+ }
+ }
+
+ protected void remove(JmsMessageConsumer consumer) throws JMSException {
+ this.connection.removeDispatcher(consumer.getConsumerId());
+ this.consumers.remove(consumer.getConsumerId());
+ }
+
+ protected void add(JmsMessageProducer producer) {
+ this.producers.add(producer);
+ }
+
+ protected void remove(MessageProducer producer) {
+ this.producers.remove(producer);
+ }
+
+ protected void onException(Exception ex) {
+ this.connection.onException(ex);
+ }
+
+ protected void onException(JMSException ex) {
+ this.connection.onException(ex);
+ }
+
+ protected void send(JmsMessageProducer producer, Destination dest, Message msg, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+ JmsDestination destination = JmsMessageTransformation.transformDestination(connection, dest);
+ send(producer, destination, msg, deliveryMode, priority, timeToLive, disableMsgId, disableTimestamp);
+ }
+
+ private void send(JmsMessageProducer producer, JmsDestination destination, Message original, int deliveryMode, int priority, long timeToLive, boolean disableMsgId, boolean disableTimestamp) throws JMSException {
+ sendLock.lock();
+ try {
+ startNextTransaction();
+
+ original.setJMSDeliveryMode(deliveryMode);
+ original.setJMSPriority(priority);
+ original.setJMSRedelivered(false);
+
+ long timeStamp = 0;
+ boolean hasTTL = timeToLive > 0;
+ if (!disableTimestamp || hasTTL) {
+ timeStamp = System.currentTimeMillis();
+ }
+
+ original.setJMSTimestamp(timeStamp);
+
+ if (hasTTL) {
+ original.setJMSExpiration(timeStamp + timeToLive);
+ }
+
+ JmsMessageId msgId = null;
+ if (!disableMsgId) {
+ msgId = getNextMessageId(producer);
+ }
+
+ boolean isJmsMessageType = original instanceof JmsMessage;
+ if (isJmsMessageType) {
+ ((JmsMessage) original).setConnection(connection);
+ if (!disableMsgId) {
+ ((JmsMessage) original).setJMSMessageID(msgId);
+ }
+ original.setJMSDestination(destination);
+ }
+
+ JmsMessage copy = JmsMessageTransformation.transformMessage(connection, original);
+
+ // Ensure original message gets the destination and message ID as per spec.
+ if (!isJmsMessageType) {
+ if (!disableMsgId) {
+ original.setJMSMessageID(msgId.toString());
+ copy.setJMSMessageID(msgId);
+ }
+ original.setJMSDestination(destination);
+ copy.setJMSDestination(destination);
+ }
+
+ boolean sync = connection.isAlwaysSyncSend() ||
+ (!connection.isForceAsyncSend() && deliveryMode == DeliveryMode.PERSISTENT && !getTransacted());
+
+ copy.onSend();
+ JmsOutboundMessageDispatch envelope = new JmsOutboundMessageDispatch();
+ envelope.setMessage(copy);
+ envelope.setProducerId(producer.getProducerId());
+ envelope.setDestination(destination);
+ envelope.setSendAsync(!sync);
+
+ this.connection.send(envelope);
+ } finally {
+ sendLock.unlock();
+ }
+ }
+
+ void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+ startNextTransaction();
+ this.connection.acknowledge(envelope, ackType);
+ }
+
+ /**
+ * Acknowledge all previously delivered messages in this Session as consumed. This
+ * method is usually only called when the Session is in the CLIENT_ACKNOWLEDGE mode.
+ *
+ * @throws JMSException if an error occurs while the acknowledge is processed.
+ */
+ void acknowledge() throws JMSException {
+ this.connection.acknowledge(sessionInfo.getSessionId());
+ }
+
+ public boolean isClosed() {
+ return this.closed.get();
+ }
+
+ /**
+ * Checks whether the session uses transactions.
+ *
+ * @return true - if the session uses transactions.
+ */
+ public boolean isTransacted() {
+ return this.acknowledgementMode == Session.SESSION_TRANSACTED;
+ }
+
+ /**
+ * Checks whether the session used client acknowledgment.
+ *
+ * @return true - if the session uses client acknowledgment.
+ */
+ protected boolean isClientAcknowledge() {
+ return this.acknowledgementMode == Session.CLIENT_ACKNOWLEDGE;
+ }
+
+ /**
+ * Checks whether the session used auto acknowledgment.
+ *
+ * @return true - if the session uses client acknowledgment.
+ */
+ public boolean isAutoAcknowledge() {
+ return acknowledgementMode == Session.AUTO_ACKNOWLEDGE;
+ }
+
+ /**
+ * Checks whether the session used dup ok acknowledgment.
+ *
+ * @return true - if the session uses client acknowledgment.
+ */
+ public boolean isDupsOkAcknowledge() {
+ return acknowledgementMode == Session.DUPS_OK_ACKNOWLEDGE;
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (this.closed.get()) {
+ throw new IllegalStateException("The MessageProducer is closed");
+ }
+ }
+
+ // This extra wrapping class around SelectorParser is used to avoid
+ // ClassNotFoundException if SelectorParser is not in the class path.
+ static class OptionalSectorParser {
+ public static void check(String selector) throws InvalidSelectorException {
+ try {
+ SelectorParser.parse(selector);
+ } catch (FilterException e) {
+ throw new InvalidSelectorException(e.getMessage());
+ }
+ }
+ }
+
+ static final OptionalSectorParser SELECTOR_PARSER;
+ static {
+ OptionalSectorParser parser;
+ try {
+ // lets verify it's working..
+ parser = new OptionalSectorParser();
+ parser.check("x=1");
+ } catch (Throwable e) {
+ parser = null;
+ }
+ SELECTOR_PARSER = parser;
+ }
+
+ public static String checkSelector(String selector) throws InvalidSelectorException {
+ if (selector != null) {
+ if (selector.trim().length() == 0) {
+ return null;
+ }
+ if (SELECTOR_PARSER != null) {
+ SELECTOR_PARSER.check(selector);
+ }
+ }
+ return selector;
+ }
+
+ public static void checkDestination(Destination dest) throws InvalidDestinationException {
+ if (dest == null) {
+ throw new InvalidDestinationException("Destination cannot be null");
+ }
+ }
+
+ protected void start() throws JMSException {
+ if (started.compareAndSet(false, true)) {
+ JmsInboundMessageDispatch message = null;
+ while ((message = this.stoppedMessages.poll()) != null) {
+ deliver(message);
+ }
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ consumer.start();
+ }
+ }
+ }
+
+ protected void stop() throws JMSException {
+ started.set(false);
+ if (executor != null) {
+ executor.shutdown();
+ executor = null;
+ }
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ consumer.stop();
+ }
+ }
+
+ protected boolean isStarted() {
+ return this.started.get();
+ }
+
+ public JmsConnection getConnection() {
+ return this.connection;
+ }
+
+ Executor getExecutor() {
+ if (executor == null) {
+ executor = Executors.newSingleThreadExecutor(new ThreadFactory() {
+
+ @Override
+ public Thread newThread(Runnable runner) {
+ Thread executor = new Thread(runner);
+ executor.setName("JmsSession ["+ sessionInfo.getSessionId() + "] dispatcher");
+ executor.setDaemon(true);
+ return executor;
+ }
+ });
+ }
+ return executor;
+ }
+
+ protected JmsSessionInfo getSessionInfo() {
+ return this.sessionInfo;
+ }
+
+ protected JmsSessionId getSessionId() {
+ return this.sessionInfo.getSessionId();
+ }
+
+ protected JmsConsumerId getNextConsumerId() {
+ return new JmsConsumerId(sessionInfo.getSessionId(), consumerIdGenerator.incrementAndGet());
+ }
+
+ protected JmsProducerId getNextProducerId() {
+ return new JmsProducerId(sessionInfo.getSessionId(), producerIdGenerator.incrementAndGet());
+ }
+
+ private JmsMessageId getNextMessageId(JmsMessageProducer producer) {
+ return new JmsMessageId(producer.getProducerId(), producer.getNextMessageSequence());
+ }
+
+ private <T extends JmsMessage> T init(T message) {
+ message.setConnection(connection);
+ return message;
+ }
+
+ private synchronized void startNextTransaction() throws JMSException {
+ if (getTransacted()) {
+ transactionContext.begin();
+ }
+ }
+
+ boolean isDestinationInUse(JmsDestination destination) {
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ if (consumer.isUsingDestination(destination)) {
+ return true;
+ }
+ }
+ return false;
+ }
+
+ void checkMessageListener() throws JMSException {
+ if (messageListener != null) {
+ throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
+ }
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ if (consumer.hasMessageListener()) {
+ throw new IllegalStateException("Cannot synchronously receive a message when a MessageListener is set");
+ }
+ }
+ }
+
+ public JmsPrefetchPolicy getPrefetchPolicy() {
+ return prefetchPolicy;
+ }
+
+ public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+ this.prefetchPolicy = prefetchPolicy;
+ }
+
+ @Override
+ public void onMessage(JmsInboundMessageDispatch envelope) {
+ if (started.get()) {
+ deliver(envelope);
+ } else {
+ this.stoppedMessages.add(envelope);
+ }
+ }
+
+ protected void onConnectionInterrupted() {
+ for (JmsMessageProducer producer : producers) {
+ producer.onConnectionInterrupted();
+ }
+
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ consumer.onConnectionInterrupted();
+ }
+ }
+
+ protected void onConnectionRecovery(Provider provider) throws Exception {
+
+ ProviderFuture request = new ProviderFuture();
+ provider.create(sessionInfo, request);
+ request.sync();
+
+ if (this.acknowledgementMode == SESSION_TRANSACTED) {
+ if (transactionContext.isInTransaction()) {
+ transactionContext.clear();
+ transactionContext.begin();
+ }
+ }
+
+ for (JmsMessageProducer producer : producers) {
+ producer.onConnectionRecovery(provider);
+ }
+
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ consumer.onConnectionRecovery(provider);
+ }
+ }
+
+ protected void onConnectionRecovered(Provider provider) throws Exception {
+
+ this.messageFactory = provider.getMessageFactory();
+
+ for (JmsMessageProducer producer : producers) {
+ producer.onConnectionRecovered(provider);
+ }
+
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ consumer.onConnectionRecovered(provider);
+ }
+ }
+
+ protected void onConnectionRestored() {
+ for (JmsMessageProducer producer : producers) {
+ producer.onConnectionRestored();
+ }
+
+ for (JmsMessageConsumer consumer : consumers.values()) {
+ consumer.onConnectionRestored();
+ }
+ }
+
+ private void deliver(JmsInboundMessageDispatch envelope) {
+ JmsConsumerId id = envelope.getConsumerId();
+ if (id == null) {
+ this.connection.onException(new JMSException("No ConsumerId set for " + envelope.getMessage()));
+ }
+ if (this.messageListener != null) {
+ this.messageListener.onMessage(envelope.getMessage());
+ } else {
+ JmsMessageConsumer consumer = this.consumers.get(id);
+ if (consumer != null) {
+ consumer.onMessage(envelope);
+ }
+ }
+ }
+
+ /**
+ * Sets the transaction context of the session.
+ *
+ * @param transactionContext
+ * provides the means to control a JMS transaction.
+ */
+ public void setTransactionContext(JmsLocalTransactionContext transactionContext) {
+ this.transactionContext = transactionContext;
+ }
+
+ /**
+ * Returns the transaction context of the session.
+ *
+ * @return transactionContext
+ * session's transaction context.
+ */
+ public JmsLocalTransactionContext getTransactionContext() {
+ return transactionContext;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java
new file mode 100644
index 0000000..cefe491
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslConnectionFactory.java
@@ -0,0 +1,93 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.provider.Provider;
+
+/**
+ * SSL Aware Factory class that allows for configuration of the SSL values used
+ * in the Provider transports that are SSL aware.
+ */
+public class JmsSslConnectionFactory extends JmsConnectionFactory {
+
+ private final JmsSslContext configured = JmsSslContext.getCurrentSslContext();
+
+ public JmsSslConnectionFactory() {
+ }
+
+ public JmsSslConnectionFactory(String username, String password) {
+ super(username, password);
+ }
+
+ public JmsSslConnectionFactory(String brokerURI) {
+ super(brokerURI);
+ }
+
+ public JmsSslConnectionFactory(URI brokerURI) {
+ super(brokerURI);
+ }
+
+ public JmsSslConnectionFactory(String username, String password, URI brokerURI) {
+ super(username, password, brokerURI);
+ }
+
+ public JmsSslConnectionFactory(String username, String password, String brokerURI) {
+ super(username, password, brokerURI);
+ }
+
+ @Override
+ protected Provider createProvider(URI brokerURI) throws Exception {
+ // Create and set a new instance as the current JmsSslContext for this thread
+ // based on current configuration settings.
+ JmsSslContext.setCurrentSslContext(configured.copy());
+ return super.createProvider(brokerURI);
+ }
+
+ public String getKeyStoreLocation() {
+ return configured.getKeyStoreLocation();
+ }
+
+ public void setKeyStoreLocation(String keyStoreLocation) {
+ this.configured.setKeyStoreLocation(keyStoreLocation);
+ }
+
+ public String getKeyStorePassword() {
+ return configured.getKeyStorePassword();
+ }
+
+ public void setKeyStorePassword(String keyStorePassword) {
+ this.configured.setKeyStorePassword(keyStorePassword);
+ }
+
+ public String getTrustStoreLocation() {
+ return configured.getTrustStoreLocation();
+ }
+
+ public void setTrustStoreLocation(String trustStoreLocation) {
+ this.configured.setTrustStoreLocation(trustStoreLocation);
+ }
+
+ public String getTrustStorePassword() {
+ return configured.getTrustStorePassword();
+ }
+
+ public void setTrustStorePassword(String trustStorePassword) {
+ this.configured.setTrustStorePassword(trustStorePassword);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java
new file mode 100644
index 0000000..feca77c
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsSslContext.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Provides a wrapper around the SSL settings that are used by Provider transport
+ * instances that use an SSL encryption layer.
+ */
+public class JmsSslContext {
+
+ private String keyStoreLocation;
+ private String keyStorePassword;
+ private String trustStoreLocation;
+ private String trustStorePassword;
+
+ private static final JmsSslContext initial = new JmsSslContext();
+ private static final ThreadLocal<JmsSslContext> current;
+
+ static {
+
+ initial.setKeyStoreLocation(System.getProperty("javax.net.ssl.keyStore"));
+ initial.setKeyStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+ initial.setTrustStoreLocation(System.getProperty("javax.net.ssl.trustStore"));
+ initial.setTrustStorePassword(System.getProperty("javax.net.ssl.keyStorePassword"));
+
+ current = new ThreadLocal<JmsSslContext>() {
+
+ @Override
+ protected JmsSslContext initialValue() {
+ return initial;
+ }
+ };
+ }
+
+ protected JmsSslContext() {
+ }
+
+ public JmsSslContext copy() {
+ JmsSslContext result = new JmsSslContext();
+ result.setKeyStoreLocation(keyStoreLocation);
+ result.setKeyStorePassword(keyStorePassword);
+ result.setTrustStoreLocation(trustStoreLocation);
+ result.setTrustStorePassword(trustStorePassword);
+ return result;
+ }
+
+ static public void setCurrentSslContext(JmsSslContext bs) {
+ current.set(bs);
+ }
+
+ static public JmsSslContext getCurrentSslContext() {
+ return current.get();
+ }
+
+ public String getKeyStoreLocation() {
+ return keyStoreLocation;
+ }
+
+ public void setKeyStoreLocation(String keyStoreLocation) {
+ this.keyStoreLocation = keyStoreLocation;
+ }
+
+ public String getKeyStorePassword() {
+ return keyStorePassword;
+ }
+
+ public void setKeyStorePassword(String keyStorePassword) {
+ this.keyStorePassword = keyStorePassword;
+ }
+
+ public String getTrustStoreLocation() {
+ return trustStoreLocation;
+ }
+
+ public void setTrustStoreLocation(String trustStoreLocation) {
+ this.trustStoreLocation = trustStoreLocation;
+ }
+
+ public String getTrustStorePassword() {
+ return trustStorePassword;
+ }
+
+ public void setTrustStorePassword(String trustStorePassword) {
+ this.trustStorePassword = trustStorePassword;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java
new file mode 100644
index 0000000..cff489b
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryQueue.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+
+/**
+ * Temporary Queue Object
+ */
+public class JmsTemporaryQueue extends JmsDestination implements TemporaryQueue {
+
+ public JmsTemporaryQueue() {
+ this(null);
+ }
+
+ public JmsTemporaryQueue(String name) {
+ super(name, false, true);
+ }
+
+ @Override
+ public JmsTemporaryQueue copy() {
+ final JmsTemporaryQueue copy = new JmsTemporaryQueue();
+ copy.setProperties(getProperties());
+ return copy;
+ }
+
+ /**
+ * @see javax.jms.TemporaryQueue#delete()
+ */
+ @Override
+ public void delete() {
+ try {
+ tryDelete();
+ } catch (JMSException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @return name
+ * @see javax.jms.Queue#getQueueName()
+ */
+ @Override
+ public String getQueueName() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java
new file mode 100644
index 0000000..46dfed3
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTemporaryTopic.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+import javax.jms.TemporaryTopic;
+
+/**
+ * Temporary Topic Object
+ */
+public class JmsTemporaryTopic extends JmsDestination implements TemporaryTopic {
+
+ public JmsTemporaryTopic() {
+ super(null, true, true);
+ }
+
+ public JmsTemporaryTopic(String name) {
+ super(name, true, true);
+ }
+
+ @Override
+ public JmsTemporaryTopic copy() {
+ final JmsTemporaryTopic copy = new JmsTemporaryTopic();
+ copy.setProperties(getProperties());
+ return copy;
+ }
+
+ /**
+ * @see javax.jms.TemporaryTopic#delete()
+ */
+ @Override
+ public void delete() {
+ try {
+ tryDelete();
+ } catch (JMSException e) {
+ throw new RuntimeException(e);
+ }
+ }
+
+ /**
+ * @return name
+ * @see javax.jms.Topic#getTopicName()
+ */
+ @Override
+ public String getTopicName() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java
new file mode 100644
index 0000000..1840cc7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopic.java
@@ -0,0 +1,49 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Topic;
+
+/**
+ * JMS Topic object.
+ */
+public class JmsTopic extends JmsDestination implements Topic {
+
+ public JmsTopic() {
+ this(null);
+ }
+
+ public JmsTopic(String name) {
+ super(name, true, false);
+ }
+
+ @Override
+ public JmsTopic copy() {
+ final JmsTopic copy = new JmsTopic();
+ copy.setProperties(getProperties());
+ return copy;
+ }
+
+ /**
+ * @return the name
+ * @see javax.jms.Topic#getTopicName()
+ */
+ @Override
+ public String getTopicName() {
+ return getName();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
new file mode 100644
index 0000000..c8fcaba
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicConnection.java
@@ -0,0 +1,43 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.ConnectionConsumer;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.util.IdGenerator;
+
+public class JmsTopicConnection extends JmsConnection {
+
+ public JmsTopicConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+ super(connectionId, provider, clientIdGenerator);
+ }
+
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ throw new javax.jms.IllegalStateException("Operation not supported by a TopicConnection");
+ }
+
+ @Override
+ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
+ throw new javax.jms.IllegalStateException("Operation not supported by a TopicConnection");
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java
new file mode 100644
index 0000000..47d5088
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicPublisher.java
@@ -0,0 +1,100 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.Topic;
+import javax.jms.TopicPublisher;
+
+import org.apache.qpid.jms.meta.JmsProducerId;
+
+/**
+ * Implementation of a TopicPublisher
+ */
+public class JmsTopicPublisher extends JmsMessageProducer implements TopicPublisher {
+
+ /**
+ * Constructor
+ *
+ * @param s
+ * @param destination
+ */
+ protected JmsTopicPublisher(JmsProducerId id, JmsSession session, JmsDestination destination) throws JMSException {
+ super(id, session, destination);
+ }
+
+ /**
+ * @return the Topic
+ * @throws IllegalStateException
+ * @see javax.jms.TopicPublisher#getTopic()
+ */
+ @Override
+ public Topic getTopic() throws IllegalStateException {
+ checkClosed();
+ return (Topic) this.producerInfo.getDestination();
+ }
+
+ /**
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Message)
+ */
+ @Override
+ public void publish(Message message) throws JMSException {
+ super.send(message);
+ }
+
+ /**
+ * @param topic
+ * @param message
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message)
+ */
+ @Override
+ public void publish(Topic topic, Message message) throws JMSException {
+ super.send(topic, message);
+ }
+
+ /**
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Message, int, int, long)
+ */
+ @Override
+ public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * @param topic
+ * @param message
+ * @param deliveryMode
+ * @param priority
+ * @param timeToLive
+ * @throws JMSException
+ * @see javax.jms.TopicPublisher#publish(javax.jms.Topic, javax.jms.Message, int, int, long)
+ */
+ @Override
+ public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
+ super.send(topic, message, deliveryMode, priority, timeToLive);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java
new file mode 100644
index 0000000..ff834aa
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSession.java
@@ -0,0 +1,163 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.Destination;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.MessageConsumer;
+import javax.jms.MessageProducer;
+import javax.jms.Queue;
+import javax.jms.QueueBrowser;
+import javax.jms.QueueReceiver;
+import javax.jms.QueueSender;
+import javax.jms.TemporaryQueue;
+
+import org.apache.qpid.jms.meta.JmsSessionId;
+
+/**
+ * Implementation of a TopicSession
+ */
+public class JmsTopicSession extends JmsSession {
+
+ protected JmsTopicSession(JmsConnection connection, JmsSessionId sessionId, int acknowledgementMode) throws JMSException {
+ super(connection, sessionId, acknowledgementMode);
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createBrowser(javax.jms.Queue)
+ */
+ @Override
+ public QueueBrowser createBrowser(Queue queue) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createBrowser(javax.jms.Queue, java.lang.String)
+ */
+ @Override
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination)
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination) throws JMSException {
+ if (destination instanceof Queue) {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+ return super.createConsumer(destination);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createConsumer(javax.jms.Destination,
+ * java.lang.String)
+ */
+ @Override
+ public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
+ if (destination instanceof Queue) {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+ return super.createConsumer(destination, messageSelector);
+ }
+
+ /**
+ * @param destination
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createProducer(javax.jms.Destination)
+ */
+ @Override
+ public MessageProducer createProducer(Destination destination) throws JMSException {
+ if (destination instanceof Queue) {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+ return super.createProducer(destination);
+ }
+
+ /**
+ * @param queueName
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createQueue(java.lang.String)
+ */
+ @Override
+ public Queue createQueue(String queueName) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @return
+ * @throws JMSException
+ * @see javax.jms.Session#createTemporaryQueue()
+ */
+ @Override
+ public TemporaryQueue createTemporaryQueue() throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue)
+ */
+ @Override
+ public QueueReceiver createReceiver(Queue queue) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @return
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createReceiver(javax.jms.Queue,
+ * java.lang.String)
+ */
+ @Override
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+
+ /**
+ * @param queue
+ * @return
+ * @throws JMSException
+ * @see javax.jms.QueueSession#createSender(javax.jms.Queue)
+ */
+ @Override
+ public QueueSender createSender(Queue queue) throws JMSException {
+ throw new IllegalStateException("Operation not supported by a TopicSession");
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java
new file mode 100644
index 0000000..0ef463a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTopicSubscriber.java
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a TopicSubscriber
+ */
+public class JmsTopicSubscriber extends JmsMessageConsumer implements TopicSubscriber {
+
+ /**
+ * Creates a non-durable TopicSubscriber
+ *
+ * @param id
+ * @param s
+ * @param destination
+ * @param noLocal
+ * @param selector
+ * @throws JMSException
+ */
+ JmsTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, boolean noLocal, String selector) throws JMSException {
+ super(id, s, destination, selector, noLocal);
+ }
+
+ /**
+ * Creates a TopicSubscriber that is durable.
+ *
+ * @param id
+ * @param s
+ * @param destination
+ * @param name
+ * @param noLocal
+ * @param selector
+ * @throws JMSException
+ */
+ JmsTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, boolean noLocal, String selector) throws JMSException {
+ super(id, s, destination, name, selector, noLocal);
+ }
+
+ /**
+ * @return the Topic
+ * @throws IllegalStateException
+ * @see javax.jms.TopicSubscriber#getTopic()
+ */
+ @Override
+ public Topic getTopic() throws IllegalStateException {
+ checkClosed();
+ return (Topic) this.getDestination();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java
new file mode 100644
index 0000000..c0704a1
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTransactionListener.java
@@ -0,0 +1,31 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Allows for a listener to be notified when a transaction is started, commits
+ * or is rolled back.
+ */
+public interface JmsTransactionListener {
+
+ void onTransactionStarted();
+
+ void onTransactionCommitted();
+
+ void onTransactionRolledBack();
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
new file mode 100644
index 0000000..bda7979
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsTxSynchronization.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+/**
+ * Interface for JmsResources that are part of a running transaction to use
+ * to register for notifications of transaction commit and rollback in order
+ * to execute specific actions.
+ *
+ * One such use of this might be for a consumer to register a synchronization
+ * when it is closed while it's parent session is still operating inside a
+ * transaction. The Consumer can close itself following the commit or rollback
+ * of the running Transaction.
+ */
+public abstract class JmsTxSynchronization {
+
+ /**
+ * Called after a successful commit of the current Transaction.
+ *
+ * @throws Exception
+ */
+ public void afterCommit() throws Exception {
+ }
+
+ /**
+ * Called after the current transaction has been rolled back either
+ * by a call to rollback or by a failure to complete a commit operation.
+ *
+ * @throws Exception
+ */
+ public void afterRollback() throws Exception {
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java
new file mode 100644
index 0000000..b04b988
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/IdConversionException.java
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.exceptions;
+
+public class IdConversionException extends QpidJmsException
+{
+ private static final long serialVersionUID = -2349723813650476823L;
+
+ public IdConversionException(String reason)
+ {
+ super(reason);
+ }
+
+ public IdConversionException(String reason, Exception cause)
+ {
+ super(reason, cause);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java
new file mode 100644
index 0000000..e58c54f
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionClosedException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.exceptions;
+
+import java.io.IOException;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when attempt is made to use a connection when the connection has been closed.
+ */
+public class JmsConnectionClosedException extends IllegalStateException {
+ private static final long serialVersionUID = -7975982446284065025L;
+
+
+ public JmsConnectionClosedException(IOException cause) {
+ super("The JMS connection has been closed: " + extractMessage(cause));
+ initCause(cause);
+ setLinkedException(cause);
+ }
+
+ public JmsConnectionClosedException() {
+ super("The JMS connection has been closed", "AlreadyClosed");
+ }
+
+ private static String extractMessage(IOException cause) {
+ String m = cause.getMessage();
+ if (m == null || m.length() == 0) {
+ m = cause.toString();
+ }
+ return m;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java
new file mode 100644
index 0000000..e9b7068
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsConnectionFailedException.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.exceptions;
+
+import java.io.IOException;
+
+import javax.jms.IllegalStateException;
+
+/**
+ * An exception thrown when attempt is made to use a connection when the connection has already failed.
+ */
+public class JmsConnectionFailedException extends IllegalStateException {
+
+ private static final long serialVersionUID = -3386897790274799220L;
+
+ public JmsConnectionFailedException(IOException cause) {
+ super("The JMS connection has failed: " + extractMessage(cause));
+ initCause(cause);
+ setLinkedException(cause);
+ }
+
+ public JmsConnectionFailedException() {
+ super("The JMS connection has failed due to a Transport problem", "Connection Failed");
+ }
+
+ private static String extractMessage(IOException cause) {
+ String m = cause.getMessage();
+ if (m == null || m.length() == 0) {
+ m = cause.toString();
+ }
+ return m;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
new file mode 100644
index 0000000..81f9ca8
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/JmsExceptionSupport.java
@@ -0,0 +1,103 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.exceptions;
+
+import javax.jms.JMSException;
+import javax.jms.MessageEOFException;
+import javax.jms.MessageFormatException;
+
+/**
+ * Exception support class.
+ *
+ * Factory class for creating JMSException instances based on String messages or by
+ * wrapping other non-JMS exception.
+ *
+ * @since 1.0
+ */
+public final class JmsExceptionSupport {
+
+ private JmsExceptionSupport() {}
+
+ public static JMSException create(String msg, Throwable cause) {
+ JMSException exception = new JMSException(msg);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static JMSException create(String msg, Exception cause) {
+ JMSException exception = new JMSException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static JMSException create(Throwable cause) {
+ if (cause instanceof JMSException) {
+ return (JMSException) cause;
+ }
+ if (cause.getCause() instanceof JMSException) {
+ return (JMSException) cause.getCause();
+ }
+
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ JMSException exception = new JMSException(msg);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static JMSException create(Exception cause) {
+ if (cause instanceof JMSException) {
+ return (JMSException) cause;
+ }
+ if (cause.getCause() instanceof JMSException) {
+ return (JMSException) cause.getCause();
+ }
+
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ JMSException exception = new JMSException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static MessageEOFException createMessageEOFException(Exception cause) {
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ MessageEOFException exception = new MessageEOFException(msg);
+ exception.setLinkedException(cause);
+ exception.initCause(cause);
+ return exception;
+ }
+
+ public static MessageFormatException createMessageFormatException(Throwable cause) {
+ String msg = cause.getMessage();
+ if (msg == null || msg.length() == 0) {
+ msg = cause.toString();
+ }
+ MessageFormatException exception = new MessageFormatException(msg);
+ exception.initCause(cause);
+ return exception;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java
new file mode 100644
index 0000000..a922530
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/exceptions/QpidJmsException.java
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.jms.exceptions;
+
+import javax.jms.JMSException;
+
+public class QpidJmsException extends JMSException
+{
+ private static final long serialVersionUID = 751932967255393054L;
+
+ public QpidJmsException(String reason)
+ {
+ this(reason, null);
+ }
+
+ public QpidJmsException(String reason, Exception cause)
+ {
+ super(reason);
+ if (cause != null)
+ {
+ setLinkedException(cause);
+ initCause(cause);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java
new file mode 100644
index 0000000..5d0d04a
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/jndi/JNDIReferenceFactory.java
@@ -0,0 +1,125 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms.jndi;
+
+import java.util.Enumeration;
+import java.util.HashMap;
+import java.util.Hashtable;
+import java.util.Map;
+
+import javax.naming.Context;
+import javax.naming.Name;
+import javax.naming.NamingException;
+import javax.naming.RefAddr;
+import javax.naming.Reference;
+import javax.naming.StringRefAddr;
+import javax.naming.spi.ObjectFactory;
+
+/**
+ * Converts objects implementing JNDIStorable into a property fields so they can be
+ * stored and regenerated from JNDI
+ *
+ * @since 1.0
+ */
+public class JNDIReferenceFactory implements ObjectFactory {
+
+ /**
+ * This will be called by a JNDIprovider when a Reference is retrieved from
+ * a JNDI store - and generates the original instance
+ *
+ * @param object
+ * the Reference object
+ * @param name
+ * the JNDI name
+ * @param nameCtx
+ * the context
+ * @param environment
+ * the environment settings used by JNDI
+ *
+ * @return the instance built from the Reference object
+ *
+ * @throws Exception
+ * if building the instance from Reference fails (usually class not
+ * found)
+ */
+ @Override
+ public Object getObjectInstance(Object object, Name name, Context nameCtx, Hashtable<?, ?> environment)
+ throws Exception {
+ Object result = null;
+ if (object instanceof Reference) {
+ Reference reference = (Reference) object;
+ Class<?> theClass = loadClass(this, reference.getClassName());
+ if (JNDIStorable.class.isAssignableFrom(theClass)) {
+ JNDIStorable store = (JNDIStorable) theClass.newInstance();
+ Map<String, String> properties = new HashMap<String, String>();
+ for (Enumeration<RefAddr> iter = reference.getAll(); iter.hasMoreElements();) {
+ StringRefAddr addr = (StringRefAddr) iter.nextElement();
+ properties.put(addr.getType(), (addr.getContent() == null) ? "" : addr.getContent().toString());
+ }
+ store.setProperties(properties);
+ result = store;
+ }
+ } else {
+ throw new RuntimeException("Object " + object + " is not a reference");
+ }
+ return result;
+ }
+
+ /**
+ * Create a Reference instance from a JNDIStorable object
+ *
+ * @param instanceClassName
+ * @param po
+ * @return Reference
+ * @throws NamingException
+ */
+ public static Reference createReference(String instanceClassName, JNDIStorable po) throws NamingException {
+ Reference result = new Reference(instanceClassName, JNDIReferenceFactory.class.getName(), null);
+ try {
+ Map<String, String> props = po.getProperties();
+ for (Map.Entry<String, String> entry : props.entrySet()) {
+ javax.naming.StringRefAddr addr = new javax.naming.StringRefAddr(entry.getKey(), entry.getValue());
+ result.add(addr);
+ }
+ } catch (Exception e) {
+ throw new NamingException(e.getMessage());
+ }
+ return result;
+ }
+
+ /**
+ * Retrieve the class loader for a named class
+ *
+ * @param thisObj
+ * @param className
+ * @return the class
+ * @throws ClassNotFoundException
+ */
+ public static Class<?> loadClass(Object thisObj, String className) throws ClassNotFoundException {
+ // try local ClassLoader first.
+ ClassLoader loader = thisObj.getClass().getClassLoader();
+ Class<?> theClass;
+ if (loader != null) {
+ theClass = loader.loadClass(className);
+ } else {
+ // Will be null in jdk1.1.8
+ // use default classLoader
+ theClass = Class.forName(className);
+ }
+ return theClass;
+ }
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org