You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ta...@apache.org on 2014/09/23 20:20:50 UTC
[26/27] Initial drop of donated AMQP Client Code.
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
new file mode 100644
index 0000000..b4d03af
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnection.java
@@ -0,0 +1,1128 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.IOException;
+import java.net.URI;
+import java.util.List;
+import java.util.Map;
+import java.util.Set;
+import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
+import java.util.concurrent.CopyOnWriteArraySet;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.ThreadFactory;
+import java.util.concurrent.ThreadPoolExecutor;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicBoolean;
+import java.util.concurrent.atomic.AtomicLong;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueConnection;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.TemporaryQueue;
+import javax.jms.TemporaryTopic;
+import javax.jms.Topic;
+import javax.jms.TopicConnection;
+import javax.jms.TopicSession;
+import javax.net.ssl.SSLContext;
+
+import org.apache.qpid.jms.exceptions.JmsConnectionFailedException;
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+import org.apache.qpid.jms.message.JmsMessage;
+import org.apache.qpid.jms.message.JmsMessageFactory;
+import org.apache.qpid.jms.message.JmsOutboundMessageDispatch;
+import org.apache.qpid.jms.meta.JmsConnectionId;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.meta.JmsConsumerId;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsSessionId;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderClosedException;
+import org.apache.qpid.jms.provider.ProviderFuture;
+import org.apache.qpid.jms.provider.ProviderListener;
+import org.apache.qpid.jms.provider.ProviderConstants.ACK_TYPE;
+import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.jms.util.ThreadPoolUtils;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Implementation of a JMS Connection
+ */
+public class JmsConnection implements Connection, TopicConnection, QueueConnection, ProviderListener {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsConnection.class);
+
+ private JmsConnectionInfo connectionInfo;
+
+ private final IdGenerator clientIdGenerator;
+ private boolean clientIdSet;
+ private boolean sendAcksAsync;
+ private ExceptionListener exceptionListener;
+ private final List<JmsSession> sessions = new CopyOnWriteArrayList<JmsSession>();
+ private final Map<JmsConsumerId, JmsMessageDispatcher> dispatchers =
+ new ConcurrentHashMap<JmsConsumerId, JmsMessageDispatcher>();
+ private final AtomicBoolean connected = new AtomicBoolean();
+ private final AtomicBoolean closed = new AtomicBoolean();
+ private final AtomicBoolean closing = new AtomicBoolean();
+ private final AtomicBoolean started = new AtomicBoolean();
+ private final AtomicBoolean failed = new AtomicBoolean();
+ private final Object connectLock = new Object();
+ private IOException firstFailureError;
+ private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+ private boolean messagePrioritySupported;
+
+ private final ThreadPoolExecutor executor;
+
+ private URI brokerURI;
+ private URI localURI;
+ private SSLContext sslContext;
+ private Provider provider;
+ private final Set<JmsConnectionListener> connectionListeners =
+ new CopyOnWriteArraySet<JmsConnectionListener>();
+ private final Map<JmsDestination, JmsDestination> tempDestinations =
+ new ConcurrentHashMap<JmsDestination, JmsDestination>();
+ private final AtomicLong sessionIdGenerator = new AtomicLong();
+ private final AtomicLong tempDestIdGenerator = new AtomicLong();
+ private final AtomicLong transactionIdGenerator = new AtomicLong();
+ private JmsMessageFactory messageFactory;
+
+ protected JmsConnection(String connectionId, Provider provider, IdGenerator clientIdGenerator) throws JMSException {
+
+ // This executor can be used for dispatching asynchronous tasks that might block or result
+ // in reentrant calls to this Connection that could block. The thread in this executor
+ // will also serve as a means of preventing JVM shutdown should a client application
+ // not have it's own mechanism for doing so.
+ executor = new ThreadPoolExecutor(1, 1, 5, TimeUnit.SECONDS, new LinkedBlockingQueue<Runnable>(), new ThreadFactory() {
+ @Override
+ public Thread newThread(Runnable r) {
+ Thread thread = new Thread(r, "QpidJMS Connection Executor: ");
+ return thread;
+ }
+ });
+
+ this.provider = provider;
+ this.provider.setProviderListener(this);
+ try {
+ this.provider.start();
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+
+ this.clientIdGenerator = clientIdGenerator;
+ this.connectionInfo = new JmsConnectionInfo(new JmsConnectionId(connectionId));
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.Connection#close()
+ */
+ @Override
+ public void close() throws JMSException {
+ boolean interrupted = Thread.interrupted();
+
+ try {
+
+ if (!closed.get() && !failed.get()) {
+ // do not fail if already closed as specified by the JMS specification.
+ doStop(false);
+ }
+
+ synchronized (this) {
+
+ if (closed.get()) {
+ return;
+ }
+
+ closing.set(true);
+
+ for (JmsSession session : this.sessions) {
+ session.shutdown();
+ }
+
+ this.sessions.clear();
+ this.tempDestinations.clear();
+
+ if (isConnected() && !failed.get()) {
+ ProviderFuture request = new ProviderFuture();
+ try {
+ provider.destroy(connectionInfo, request);
+
+ try {
+ request.sync();
+ } catch (Exception ex) {
+ // TODO - Spec is a bit vague here, we don't fail if already closed but
+ // in this case we really aren't closed yet so there could be an
+ // argument that at this point an exception is still valid.
+ if (ex.getCause() instanceof InterruptedException) {
+ throw (InterruptedException) ex.getCause();
+ }
+ LOG.debug("Failed destroying Connection resource: {}", ex.getMessage());
+ }
+ } catch(ProviderClosedException pce) {
+ LOG.debug("Ignoring provider closed exception during connection close");
+ }
+ }
+
+ connected.set(false);
+ started.set(false);
+ closing.set(false);
+ closed.set(true);
+ }
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ } finally {
+ try {
+ ThreadPoolUtils.shutdown(executor);
+ } catch (Throwable e) {
+ LOG.warn("Error shutting down thread pool: " + executor + ". This exception will be ignored.", e);
+ }
+
+ if (provider != null) {
+ provider.close();
+ provider = null;
+ }
+
+ if (interrupted) {
+ Thread.currentThread().interrupt();
+ }
+ }
+ }
+
+ /**
+ * Called to free all Connection resources.
+ */
+ protected void shutdown() throws JMSException {
+
+ // TODO - Once ConnectionConsumer is added we must shutdown those as well.
+
+ for (JmsSession session : this.sessions) {
+ session.shutdown();
+ }
+
+ if (isConnected() && !failed.get() && !closing.get()) {
+ destroyResource(connectionInfo);
+ }
+
+ if (clientIdSet) {
+ connectionInfo.setClientId(null);
+ clientIdSet = false;
+ }
+
+ tempDestinations.clear();
+ started.set(false);
+ connected.set(false);
+ }
+
+ /**
+ * @param destination
+ * @param messageSelector
+ * @param sessionPool
+ * @param maxMessages
+ * @return ConnectionConsumer
+ * @throws JMSException
+ * @see javax.jms.Connection#createConnectionConsumer(javax.jms.Destination,
+ * java.lang.String, javax.jms.ServerSessionPool, int)
+ */
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ throw new JMSException("Not supported");
+ }
+
+ /**
+ * @param topic
+ * @param subscriptionName
+ * @param messageSelector
+ * @param sessionPool
+ * @param maxMessages
+ * @return ConnectionConsumer
+ * @throws JMSException
+ *
+ * @see javax.jms.Connection#createDurableConnectionConsumer(javax.jms.Topic,
+ * java.lang.String, java.lang.String, javax.jms.ServerSessionPool, int)
+ */
+ @Override
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+ String messageSelector, ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ throw new JMSException("Not supported");
+ }
+
+ /**
+ * @param transacted
+ * @param acknowledgeMode
+ * @return Session
+ * @throws JMSException
+ * @see javax.jms.Connection#createSession(boolean, int)
+ */
+ @Override
+ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+ JmsSession result = new JmsSession(this, getNextSessionId(), ackMode);
+ addSession(result);
+ if (started.get()) {
+ result.start();
+ }
+ return result;
+ }
+
+ /**
+ * @return clientId
+ * @see javax.jms.Connection#getClientID()
+ */
+ @Override
+ public String getClientID() throws JMSException {
+ checkClosedOrFailed();
+ return this.connectionInfo.getClientId();
+ }
+
+ /**
+ * @return connectionInfoData
+ * @see javax.jms.Connection#getMetaData()
+ */
+ @Override
+ public ConnectionMetaData getMetaData() throws JMSException {
+ checkClosedOrFailed();
+ return JmsConnectionMetaData.INSTANCE;
+ }
+
+ /**
+ * @param clientID
+ * @throws JMSException
+ * @see javax.jms.Connection#setClientID(java.lang.String)
+ */
+ @Override
+ public synchronized void setClientID(String clientID) throws JMSException {
+ checkClosedOrFailed();
+
+ if (this.clientIdSet) {
+ throw new IllegalStateException("The clientID has already been set");
+ }
+ if (clientID == null) {
+ throw new IllegalStateException("Cannot have a null clientID");
+ }
+ if (connected.get()) {
+ throw new IllegalStateException("Cannot set the client id once connected.");
+ }
+
+ this.connectionInfo.setClientId(clientID);
+ this.clientIdSet = true;
+
+ //We weren't connected if we got this far, we should now connect now to ensure the clientID is valid.
+ //TODO: determine if any resulting failure is only the result of the ClientID value, or other reasons such as auth.
+ connect();
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.Connection#start()
+ */
+ @Override
+ public void start() throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ if (this.started.compareAndSet(false, true)) {
+ try {
+ for (JmsSession s : this.sessions) {
+ s.start();
+ }
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+ }
+
+ /**
+ * @throws JMSException
+ * @see javax.jms.Connection#stop()
+ */
+ @Override
+ public void stop() throws JMSException {
+ doStop(true);
+ }
+
+ /**
+ * @see #stop()
+ * @param checkClosed <tt>true</tt> to check for already closed and throw
+ * {@link java.lang.IllegalStateException} if already closed,
+ * <tt>false</tt> to skip this check
+ * @throws JMSException if the JMS provider fails to stop message delivery due to some internal error.
+ */
+ void doStop(boolean checkClosed) throws JMSException {
+ if (checkClosed) {
+ checkClosedOrFailed();
+ }
+ if (started.compareAndSet(true, false)) {
+ synchronized(sessions) {
+ for (JmsSession s : this.sessions) {
+ s.stop();
+ }
+ }
+ }
+ }
+
+ /**
+ * @param topic
+ * @param messageSelector
+ * @param sessionPool
+ * @param maxMessages
+ * @return ConnectionConsumer
+ * @throws JMSException
+ * @see javax.jms.TopicConnection#createConnectionConsumer(javax.jms.Topic,
+ * java.lang.String, javax.jms.ServerSessionPool, int)
+ */
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ return null;
+ }
+
+ /**
+ * @param transacted
+ * @param acknowledgeMode
+ * @return TopicSession
+ * @throws JMSException
+ * @see javax.jms.TopicConnection#createTopicSession(boolean, int)
+ */
+ @Override
+ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+ JmsTopicSession result = new JmsTopicSession(this, getNextSessionId(), ackMode);
+ addSession(result);
+ if (started.get()) {
+ result.start();
+ }
+ return result;
+ }
+
+ /**
+ * @param queue
+ * @param messageSelector
+ * @param sessionPool
+ * @param maxMessages
+ * @return ConnectionConsumer
+ * @throws JMSException
+ * @see javax.jms.QueueConnection#createConnectionConsumer(javax.jms.Queue,
+ * java.lang.String, javax.jms.ServerSessionPool, int)
+ */
+ @Override
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ return null;
+ }
+
+ /**
+ * @param transacted
+ * @param acknowledgeMode
+ * @return QueueSession
+ * @throws JMSException
+ * @see javax.jms.QueueConnection#createQueueSession(boolean, int)
+ */
+ @Override
+ public QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+ int ackMode = getSessionAcknowledgeMode(transacted, acknowledgeMode);
+ JmsQueueSession result = new JmsQueueSession(this, getNextSessionId(), ackMode);
+ addSession(result);
+ if (started.get()) {
+ result.start();
+ }
+ return result;
+ }
+
+ /**
+ * @param ex
+ */
+ public void onException(Exception ex) {
+ onException(JmsExceptionSupport.create(ex));
+ }
+
+ /**
+ * @param ex
+ */
+ public void onException(JMSException ex) {
+ ExceptionListener l = this.exceptionListener;
+ if (l != null) {
+ l.onException(JmsExceptionSupport.create(ex));
+ }
+ }
+
+ protected int getSessionAcknowledgeMode(boolean transacted, int acknowledgeMode) throws JMSException {
+ int result = acknowledgeMode;
+ if (!transacted && acknowledgeMode == Session.SESSION_TRANSACTED) {
+ throw new JMSException("acknowledgeMode SESSION_TRANSACTED cannot be used for an non-transacted Session");
+ }
+ if (transacted) {
+ result = Session.SESSION_TRANSACTED;
+ }
+ return result;
+ }
+
+ protected void removeSession(JmsSession session) throws JMSException {
+ this.sessions.remove(session);
+ }
+
+ protected void addSession(JmsSession s) {
+ this.sessions.add(s);
+ }
+
+ protected void addDispatcher(JmsConsumerId consumerId, JmsMessageDispatcher dispatcher) {
+ dispatchers.put(consumerId, dispatcher);
+ }
+
+ protected void removeDispatcher(JmsConsumerId consumerId) {
+ dispatchers.remove(consumerId);
+ }
+
+ private void connect() throws JMSException {
+ synchronized(this.connectLock) {
+ if (isConnected() || closed.get()) {
+ return;
+ }
+
+ if (connectionInfo.getClientId() == null || connectionInfo.getClientId().trim().isEmpty()) {
+ connectionInfo.setClientId(clientIdGenerator.generateId());
+ }
+
+ this.connectionInfo = createResource(connectionInfo);
+ this.connected.set(true);
+ this.messageFactory = provider.getMessageFactory();
+
+ // TODO - Advisory Support.
+ //
+ // Providers should have an interface for adding a listener for temporary
+ // destination advisory messages for create / destroy so we can track them
+ // and throw exceptions when producers try to send to deleted destinations.
+ }
+ }
+
+ /**
+ * @return a newly initialized TemporaryQueue instance.
+ */
+ protected TemporaryQueue createTemporaryQueue() throws JMSException {
+ String destinationName = connectionInfo.getConnectionId() + ":" + tempDestIdGenerator.incrementAndGet();
+ JmsTemporaryQueue queue = new JmsTemporaryQueue(destinationName);
+ queue = createResource(queue);
+ tempDestinations.put(queue, queue);
+ return queue;
+ }
+
+ /**
+ * @return a newly initialized TemporaryTopic instance.
+ */
+ protected TemporaryTopic createTemporaryTopic() throws JMSException {
+ String destinationName = connectionInfo.getConnectionId() + ":" + tempDestIdGenerator.incrementAndGet();
+ JmsTemporaryTopic topic = new JmsTemporaryTopic(destinationName);
+ topic = createResource(topic);
+ tempDestinations.put(topic, topic);
+ return topic;
+ }
+
+ protected void deleteDestination(JmsDestination destination) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+
+ for (JmsSession session : this.sessions) {
+ if (session.isDestinationInUse(destination)) {
+ throw new JMSException("A consumer is consuming from the temporary destination");
+ }
+ }
+
+ if (destination.isTemporary()) {
+ tempDestinations.remove(destination);
+ }
+
+ destroyResource(destination);
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ protected void checkClosedOrFailed() throws JMSException {
+ checkClosed();
+ if (failed.get()) {
+ throw new JmsConnectionFailedException(firstFailureError);
+ }
+ }
+
+ protected void checkClosed() throws IllegalStateException {
+ if (this.closed.get()) {
+ throw new IllegalStateException("The Connection is closed");
+ }
+ }
+
+ protected JmsSessionId getNextSessionId() {
+ return new JmsSessionId(connectionInfo.getConnectionId(), sessionIdGenerator.incrementAndGet());
+ }
+
+ protected JmsTransactionId getNextTransactionId() {
+ return new JmsTransactionId(connectionInfo.getConnectionId(), transactionIdGenerator.incrementAndGet());
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Provider interface methods
+ ////////////////////////////////////////////////////////////////////////////
+
+ <T extends JmsResource> T createResource(T resource) throws JMSException {
+ checkClosedOrFailed();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.create(resource, request);
+ request.sync();
+ return resource;
+ } catch (Exception ex) {
+ throw JmsExceptionSupport.create(ex);
+ }
+ }
+
+ void startResource(JmsResource resource) throws JMSException {
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.start(resource, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void destroyResource(JmsResource resource) throws JMSException {
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.destroy(resource, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void send(JmsOutboundMessageDispatch envelope) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ // TODO - We don't currently have a way to say that an operation
+ // should be done asynchronously. A send can be done async
+ // in many cases, such as non-persistent delivery. We probably
+ // don't need to do anything here though just have a way to
+ // configure the provider for async sends which we do in the
+ // JmsConnectionInfo. Here we just need to register a listener
+ // on the request to know when it completes if we want to do
+ // JMS 2.0 style async sends where we signal a callback, then
+ // we can manage order of callback events to async senders at
+ // this level.
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.send(envelope, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void acknowledge(JmsInboundMessageDispatch envelope, ACK_TYPE ackType) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.acknowledge(envelope, ackType, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void acknowledge(JmsSessionId sessionId) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.acknowledge(sessionId, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void unsubscribe(String name) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.unsubscribe(name, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void commit(JmsSessionId sessionId) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.commit(sessionId, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void rollback(JmsSessionId sessionId) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.rollback(sessionId, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void recover(JmsSessionId sessionId) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.recover(sessionId, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ void pull(JmsConsumerId consumerId, long timeout) throws JMSException {
+ checkClosedOrFailed();
+ connect();
+
+ try {
+ ProviderFuture request = new ProviderFuture();
+ provider.pull(consumerId, timeout, request);
+ request.sync();
+ } catch (Exception ioe) {
+ throw JmsExceptionSupport.create(ioe);
+ }
+ }
+
+ ////////////////////////////////////////////////////////////////////////////
+ // Property setters and getters
+ ////////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @return ExceptionListener
+ * @see javax.jms.Connection#getExceptionListener()
+ */
+ @Override
+ public ExceptionListener getExceptionListener() throws JMSException {
+ checkClosedOrFailed();
+ return this.exceptionListener;
+ }
+
+ /**
+ * @param listener
+ * @see javax.jms.Connection#setExceptionListener(javax.jms.ExceptionListener)
+ */
+ @Override
+ public void setExceptionListener(ExceptionListener listener) throws JMSException {
+ checkClosedOrFailed();
+ this.exceptionListener = listener;
+ }
+
+ /**
+ * Adds a JmsConnectionListener so that a client can be notified of events in
+ * the underlying protocol provider.
+ *
+ * @param listener
+ * the new listener to add to the collection.
+ */
+ public void addConnectionListener(JmsConnectionListener listener) {
+ this.connectionListeners.add(listener);
+ }
+
+ /**
+ * Removes a JmsConnectionListener that was previously registered.
+ *
+ * @param listener
+ * the listener to remove from the collection.
+ */
+ public void removeTransportListener(JmsConnectionListener listener) {
+ this.connectionListeners.remove(listener);
+ }
+
+ public boolean isForceAsyncSend() {
+ return connectionInfo.isForceAsyncSend();
+ }
+
+ public void setForceAsyncSend(boolean forceAsyncSend) {
+ connectionInfo.setForceAsyncSends(forceAsyncSend);
+ }
+
+ public boolean isAlwaysSyncSend() {
+ return connectionInfo.isAlwaysSyncSend();
+ }
+
+ public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+ this.connectionInfo.setAlwaysSyncSend(alwaysSyncSend);
+ }
+
+ public String getTopicPrefix() {
+ return connectionInfo.getTopicPrefix();
+ }
+
+ public void setTopicPrefix(String topicPrefix) {
+ connectionInfo.setTopicPrefix(topicPrefix);
+ }
+
+ public String getTempTopicPrefix() {
+ return connectionInfo.getTempTopicPrefix();
+ }
+
+ public void setTempTopicPrefix(String tempTopicPrefix) {
+ connectionInfo.setTempTopicPrefix(tempTopicPrefix);
+ }
+
+ public String getTempQueuePrefix() {
+ return connectionInfo.getTempQueuePrefix();
+ }
+
+ public void setTempQueuePrefix(String tempQueuePrefix) {
+ connectionInfo.setTempQueuePrefix(tempQueuePrefix);
+ }
+
+ public String getQueuePrefix() {
+ return connectionInfo.getQueuePrefix();
+ }
+
+ public void setQueuePrefix(String queuePrefix) {
+ connectionInfo.setQueuePrefix(queuePrefix);
+ }
+
+ public boolean isOmitHost() {
+ return connectionInfo.isOmitHost();
+ }
+
+ public void setOmitHost(boolean omitHost) {
+ connectionInfo.setOmitHost(omitHost);
+ }
+
+ public JmsPrefetchPolicy getPrefetchPolicy() {
+ return prefetchPolicy;
+ }
+
+ public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+ this.prefetchPolicy = prefetchPolicy;
+ }
+
+ public boolean isMessagePrioritySupported() {
+ return messagePrioritySupported;
+ }
+
+ public void setMessagePrioritySupported(boolean messagePrioritySupported) {
+ this.messagePrioritySupported = messagePrioritySupported;
+ }
+
+ public long getCloseTimeout() {
+ return connectionInfo.getCloseTimeout();
+ }
+
+ public void setCloseTimeout(long closeTimeout) {
+ connectionInfo.setCloseTimeout(closeTimeout);
+ }
+
+ public long getConnectTimeout() {
+ return this.connectionInfo.getConnectTimeout();
+ }
+
+ public void setConnectTimeout(long connectTimeout) {
+ this.connectionInfo.setConnectTimeout(connectTimeout);
+ }
+
+ public long getSendTimeout() {
+ return connectionInfo.getSendTimeout();
+ }
+
+ public void setSendTimeout(long sendTimeout) {
+ connectionInfo.setSendTimeout(sendTimeout);
+ }
+
+ public long getRequestTimeout() {
+ return connectionInfo.getRequestTimeout();
+ }
+
+ public void setRequestTimeout(long requestTimeout) {
+ connectionInfo.setRequestTimeout(requestTimeout);
+ }
+
+ public URI getBrokerURI() {
+ return brokerURI;
+ }
+
+ public void setBrokerURI(URI brokerURI) {
+ this.brokerURI = brokerURI;
+ }
+
+ public URI getLocalURI() {
+ return localURI;
+ }
+
+ public void setLocalURI(URI localURI) {
+ this.localURI = localURI;
+ }
+
+ public SSLContext getSslContext() {
+ return sslContext;
+ }
+
+ public void setSslContext(SSLContext sslContext) {
+ this.sslContext = sslContext;
+ }
+
+ public String getUsername() {
+ return this.connectionInfo.getUsername();
+ }
+
+ public void setUsername(String username) {
+ this.connectionInfo.setUsername(username);;
+ }
+
+ public String getPassword() {
+ return this.connectionInfo.getPassword();
+ }
+
+ public void setPassword(String password) {
+ this.connectionInfo.setPassword(password);
+ }
+
+ public Provider getProvider() {
+ return provider;
+ }
+
+ void setProvider(Provider provider) {
+ this.provider = provider;
+ }
+
+ public boolean isConnected() {
+ return this.connected.get();
+ }
+
+ public boolean isStarted() {
+ return this.started.get();
+ }
+
+ public boolean isClosed() {
+ return this.closed.get();
+ }
+
+ JmsConnectionId getConnectionId() {
+ return this.connectionInfo.getConnectionId();
+ }
+
+ public boolean isWatchRemoteDestinations() {
+ return this.connectionInfo.isWatchRemoteDestinations();
+ }
+
+ public void setWatchRemoteDestinations(boolean watchRemoteDestinations) {
+ this.connectionInfo.setWatchRemoteDestinations(watchRemoteDestinations);
+ }
+
+ public JmsMessageFactory getMessageFactory() {
+ return messageFactory;
+ }
+
+ public boolean isSendAcksAsync() {
+ return sendAcksAsync;
+ }
+
+ public void setSendAcksAsync(boolean sendAcksAsync) {
+ this.sendAcksAsync = sendAcksAsync;
+ }
+
+ @Override
+ public void onMessage(JmsInboundMessageDispatch envelope) {
+
+ JmsMessage incoming = envelope.getMessage();
+ // Ensure incoming Messages are in readonly mode.
+ if (incoming != null) {
+ incoming.setReadOnlyBody(true);
+ incoming.setReadOnlyProperties(true);
+ }
+
+ JmsMessageDispatcher dispatcher = dispatchers.get(envelope.getConsumerId());
+ if (dispatcher != null) {
+ dispatcher.onMessage(envelope);
+ }
+ for (JmsConnectionListener listener : connectionListeners) {
+ listener.onMessage(envelope);
+ }
+ }
+
+ @Override
+ public void onConnectionInterrupted(URI remoteURI) {
+ for (JmsSession session : sessions) {
+ session.onConnectionInterrupted();
+ }
+
+ for (JmsConnectionListener listener : connectionListeners) {
+ listener.onConnectionInterrupted(remoteURI);
+ }
+ }
+
+ @Override
+ public void onConnectionRecovery(Provider provider) throws Exception {
+ // TODO - Recover Advisory Consumer once we can support it.
+
+ LOG.debug("Connection {} is starting recovery.", connectionInfo.getConnectionId());
+
+ ProviderFuture request = new ProviderFuture();
+ provider.create(connectionInfo, request);
+ request.sync();
+
+ for (JmsDestination tempDestination : tempDestinations.values()) {
+ createResource(tempDestination);
+ }
+
+ for (JmsSession session : sessions) {
+ session.onConnectionRecovery(provider);
+ }
+ }
+
+ @Override
+ public void onConnectionRecovered(Provider provider) throws Exception {
+ LOG.debug("Connection {} is finalizing recovery.", connectionInfo.getConnectionId());
+
+ this.messageFactory = provider.getMessageFactory();
+
+ for (JmsSession session : sessions) {
+ session.onConnectionRecovered(provider);
+ }
+ }
+
+ @Override
+ public void onConnectionRestored(URI remoteURI) {
+ for (JmsSession session : sessions) {
+ session.onConnectionRestored();
+ }
+
+ for (JmsConnectionListener listener : connectionListeners) {
+ listener.onConnectionRestored(remoteURI);
+ }
+ }
+
+ @Override
+ public void onConnectionFailure(final IOException ex) {
+ onAsyncException(ex);
+ if (!closing.get() && !closed.get()) {
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ providerFailed(ex);
+ if (provider != null) {
+ try {
+ provider.close();
+ } catch (Throwable error) {
+ LOG.debug("Error while closing failed Provider: {}", error.getMessage());
+ }
+ }
+
+ try {
+ shutdown();
+ } catch (JMSException e) {
+ LOG.warn("Exception during connection cleanup, " + e, e);
+ }
+
+ for (JmsConnectionListener listener : connectionListeners) {
+ listener.onConnectionFailure(ex);
+ }
+ }
+ });
+ }
+ }
+
+ /**
+ * Handles any asynchronous errors that occur from the JMS framework classes.
+ *
+ * If any listeners are registered they will be notified of the error from a thread
+ * in the Connection's Executor service.
+ *
+ * @param error
+ * The exception that triggered this error.
+ */
+ public void onAsyncException(Throwable error) {
+ if (!closed.get() && !closing.get()) {
+ if (this.exceptionListener != null) {
+
+ if (!(error instanceof JMSException)) {
+ error = JmsExceptionSupport.create(error);
+ }
+ final JMSException jmsError = (JMSException)error;
+
+ executor.execute(new Runnable() {
+ @Override
+ public void run() {
+ JmsConnection.this.exceptionListener.onException(jmsError);
+ }
+ });
+ } else {
+ LOG.debug("Async exception with no exception listener: " + error, error);
+ }
+ }
+ }
+
+ protected void providerFailed(IOException error) {
+ failed.set(true);
+ if (firstFailureError == null) {
+ firstFailureError = error;
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
new file mode 100644
index 0000000..1333a5e
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionFactory.java
@@ -0,0 +1,664 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.net.URI;
+import java.net.URISyntaxException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.Properties;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.ExceptionListener;
+import javax.jms.JMSException;
+import javax.jms.QueueConnection;
+import javax.jms.QueueConnectionFactory;
+import javax.jms.TopicConnection;
+import javax.jms.TopicConnectionFactory;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.jndi.JNDIStorable;
+import org.apache.qpid.jms.meta.JmsConnectionInfo;
+import org.apache.qpid.jms.provider.Provider;
+import org.apache.qpid.jms.provider.ProviderFactory;
+import org.apache.qpid.jms.util.IdGenerator;
+import org.apache.qpid.jms.util.PropertyUtil;
+import org.apache.qpid.jms.util.URISupport;
+import org.apache.qpid.jms.util.URISupport.CompositeData;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * JMS ConnectionFactory Implementation.
+ */
+public class JmsConnectionFactory extends JNDIStorable implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsConnectionFactory.class);
+
+ private URI brokerURI;
+ private URI localURI;
+ private String username;
+ private String password;
+ private boolean forceAsyncSend;
+ private boolean alwaysSyncSend;
+ private boolean sendAcksAsync;
+ private boolean omitHost;
+ private boolean messagePrioritySupported = true;
+ private String queuePrefix = "queue://";
+ private String topicPrefix = "topic://";
+ private String tempQueuePrefix = "temp-queue://";
+ private String tempTopicPrefix = "temp-topic://";
+ private long sendTimeout = JmsConnectionInfo.DEFAULT_SEND_TIMEOUT;
+ private long requestTimeout = JmsConnectionInfo.DEFAULT_REQUEST_TIMEOUT;
+ private long closeTimeout = JmsConnectionInfo.DEFAULT_CLOSE_TIMEOUT;
+ private long connectTimeout = JmsConnectionInfo.DEFAULT_CONNECT_TIMEOUT;
+ private boolean watchRemoteDestinations = true;
+ private IdGenerator clientIdGenerator;
+ private String clientIDPrefix;
+ private IdGenerator connectionIdGenerator;
+ private String connectionIDPrefix;
+ private ExceptionListener exceptionListener;
+
+ private JmsPrefetchPolicy prefetchPolicy = new JmsPrefetchPolicy();
+
+ public JmsConnectionFactory() {
+ }
+
+ public JmsConnectionFactory(String username, String password) {
+ setUsername(username);
+ setPassword(password);
+ }
+
+ public JmsConnectionFactory(String brokerURI) {
+ this(createURI(brokerURI));
+ }
+
+ public JmsConnectionFactory(URI brokerURI) {
+ setBrokerURI(brokerURI.toString());
+ }
+
+ public JmsConnectionFactory(String userName, String password, URI brokerURI) {
+ setUsername(userName);
+ setPassword(password);
+ setBrokerURI(brokerURI.toString());
+ }
+
+ public JmsConnectionFactory(String userName, String password, String brokerURI) {
+ setUsername(userName);
+ setPassword(password);
+ setBrokerURI(brokerURI);
+ }
+
+ /**
+ * Set properties
+ *
+ * @param props
+ */
+ public void setProperties(Properties props) {
+ Map<String, String> map = new HashMap<String, String>();
+ for (Map.Entry<Object, Object> entry : props.entrySet()) {
+ map.put(entry.getKey().toString(), entry.getValue().toString());
+ }
+ setProperties(map);
+ }
+
+ @Override
+ public void setProperties(Map<String, String> map) {
+ buildFromProperties(map);
+ }
+
+ /**
+ * @param map
+ */
+ @Override
+ protected void buildFromProperties(Map<String, String> map) {
+ PropertyUtil.setProperties(this, map);
+ }
+
+ /**
+ * @param map
+ */
+ @Override
+ protected void populateProperties(Map<String, String> map) {
+ try {
+ Map<String, String> result = PropertyUtil.getProperties(this);
+ map.putAll(result);
+ } catch (Exception e) {
+ e.printStackTrace();
+ }
+ }
+
+ /**
+ * @return a TopicConnection
+ * @throws JMSException
+ * @see javax.jms.TopicConnectionFactory#createTopicConnection()
+ */
+ @Override
+ public TopicConnection createTopicConnection() throws JMSException {
+ return createTopicConnection(getUsername(), getPassword());
+ }
+
+ /**
+ * @param userName
+ * @param password
+ * @return a TopicConnection
+ * @throws JMSException
+ * @see javax.jms.TopicConnectionFactory#createTopicConnection(java.lang.String,
+ * java.lang.String)
+ */
+ @Override
+ public TopicConnection createTopicConnection(String username, String password) throws JMSException {
+ try {
+ String connectionId = getConnectionIdGenerator().generateId();
+ Provider provider = createProvider(brokerURI);
+ JmsTopicConnection result = new JmsTopicConnection(connectionId, provider, getClientIdGenerator());
+ return configureConnection(result, username, password);
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ /**
+ * @return a Connection
+ * @throws JMSException
+ * @see javax.jms.ConnectionFactory#createConnection()
+ */
+ @Override
+ public Connection createConnection() throws JMSException {
+ return createConnection(getUsername(), getPassword());
+ }
+
+ /**
+ * @param userName
+ * @param password
+ * @return Connection
+ * @throws JMSException
+ * @see javax.jms.ConnectionFactory#createConnection(java.lang.String, java.lang.String)
+ */
+ @Override
+ public Connection createConnection(String username, String password) throws JMSException {
+ try {
+ String connectionId = getConnectionIdGenerator().generateId();
+ Provider provider = createProvider(brokerURI);
+ JmsConnection result = new JmsConnection(connectionId, provider, getClientIdGenerator());
+ return configureConnection(result, username, password);
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ /**
+ * @return a QueueConnection
+ * @throws JMSException
+ * @see javax.jms.QueueConnectionFactory#createQueueConnection()
+ */
+ @Override
+ public QueueConnection createQueueConnection() throws JMSException {
+ return createQueueConnection(getUsername(), getPassword());
+ }
+
+ /**
+ * @param userName
+ * @param password
+ * @return a QueueConnection
+ * @throws JMSException
+ * @see javax.jms.QueueConnectionFactory#createQueueConnection(java.lang.String,
+ * java.lang.String)
+ */
+ @Override
+ public QueueConnection createQueueConnection(String username, String password) throws JMSException {
+ try {
+ String connectionId = getConnectionIdGenerator().generateId();
+ Provider provider = createProvider(brokerURI);
+ JmsQueueConnection result = new JmsQueueConnection(connectionId, provider, getClientIdGenerator());
+ return configureConnection(result, username, password);
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ protected <T extends JmsConnection> T configureConnection(T connection, String username, String password) throws JMSException {
+ try {
+ PropertyUtil.setProperties(connection, PropertyUtil.getProperties(this));
+ connection.setExceptionListener(exceptionListener);
+ connection.setUsername(username);
+ connection.setPassword(password);
+ connection.setBrokerURI(brokerURI);
+ return connection;
+ } catch (Exception e) {
+ throw JmsExceptionSupport.create(e);
+ }
+ }
+
+ protected Provider createProvider(URI brokerURI) throws Exception {
+ Provider result = null;
+
+ try {
+ result = ProviderFactory.createAsync(brokerURI);
+ } catch (Exception ex) {
+ LOG.error("Failed to create JMS Provider instance for: {}", brokerURI.getScheme());
+ LOG.trace("Error: ", ex);
+ throw ex;
+ }
+
+ return result;
+ }
+
+ protected static URI createURI(String name) {
+ if (name != null && name.trim().isEmpty() == false) {
+ try {
+ return new URI(name);
+ } catch (URISyntaxException e) {
+ throw (IllegalArgumentException) new IllegalArgumentException("Invalid broker URI: " + name).initCause(e);
+ }
+ }
+ return null;
+ }
+
+ protected synchronized IdGenerator getConnectionIdGenerator() {
+ if (connectionIdGenerator == null) {
+ if (connectionIDPrefix != null) {
+ connectionIdGenerator = new IdGenerator(connectionIDPrefix);
+ } else {
+ connectionIdGenerator = new IdGenerator();
+ }
+ }
+ return connectionIdGenerator;
+ }
+
+ protected void setConnectionIdGenerator(IdGenerator connectionIdGenerator) {
+ this.connectionIdGenerator = connectionIdGenerator;
+ }
+
+ //////////////////////////////////////////////////////////////////////////
+ // Property getters and setters
+ //////////////////////////////////////////////////////////////////////////
+
+ /**
+ * @return the brokerURI
+ */
+ public String getBrokerURI() {
+ return this.brokerURI != null ? this.brokerURI.toString() : "";
+ }
+
+ /**
+ * @param brokerURI
+ * the brokerURI to set
+ */
+ public void setBrokerURI(String brokerURI) {
+ if (brokerURI == null) {
+ throw new IllegalArgumentException("brokerURI cannot be null");
+ }
+ this.brokerURI = createURI(brokerURI);
+
+ try {
+ if (this.brokerURI.getQuery() != null) {
+ Map<String, String> map = PropertyUtil.parseQuery(this.brokerURI.getQuery());
+ Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(map, "jms.");
+
+ if (!PropertyUtil.setProperties(this, jmsOptionsMap)) {
+ String msg = ""
+ + " Not all jms options could be set on the ConnectionFactory."
+ + " Check the options are spelled correctly."
+ + " Given parameters=[" + jmsOptionsMap + "]."
+ + " This connection factory cannot be started.";
+ throw new IllegalArgumentException(msg);
+ } else {
+ this.brokerURI = PropertyUtil.replaceQuery(this.brokerURI, map);
+ }
+ } else if (URISupport.isCompositeURI(this.brokerURI)) {
+ CompositeData data = URISupport.parseComposite(this.brokerURI);
+ Map<String, String> jmsOptionsMap = PropertyUtil.filterProperties(data.getParameters(), "jms.");
+ if (!PropertyUtil.setProperties(this, jmsOptionsMap)) {
+ String msg = ""
+ + " Not all jms options could be set on the ConnectionFactory."
+ + " Check the options are spelled correctly."
+ + " Given parameters=[" + jmsOptionsMap + "]."
+ + " This connection factory cannot be started.";
+ throw new IllegalArgumentException(msg);
+ } else {
+ this.brokerURI = data.toURI();
+ }
+ }
+ } catch (Exception e) {
+ throw new IllegalArgumentException(e.getMessage());
+ }
+ }
+
+ /**
+ * @return the localURI
+ */
+ public String getLocalURI() {
+ return this.localURI != null ? this.localURI.toString() : "";
+ }
+
+ /**
+ * @param localURI
+ * the localURI to set
+ */
+ public void setLocalURI(String localURI) {
+ this.localURI = createURI(localURI);
+ }
+
+ /**
+ * @return the username
+ */
+ public String getUsername() {
+ return this.username;
+ }
+
+ /**
+ * @param username
+ * the username to set
+ */
+ public void setUsername(String username) {
+ this.username = username;
+ }
+
+ /**
+ * @return the password
+ */
+ public String getPassword() {
+ return this.password;
+ }
+
+ /**
+ * @param password
+ * the password to set
+ */
+ public void setPassword(String password) {
+ this.password = password;
+ }
+
+ public boolean isForceAsyncSend() {
+ return forceAsyncSend;
+ }
+
+ public void setForceAsyncSend(boolean forceAsyncSend) {
+ this.forceAsyncSend = forceAsyncSend;
+ }
+
+ public boolean isOmitHost() {
+ return omitHost;
+ }
+
+ public void setOmitHost(boolean omitHost) {
+ this.omitHost = omitHost;
+ }
+
+ /**
+ * @return the messagePrioritySupported configuration option.
+ */
+ public boolean isMessagePrioritySupported() {
+ return this.messagePrioritySupported;
+ }
+
+ /**
+ * Enables message priority support in MessageConsumer instances. This results
+ * in all prefetched messages being dispatched in priority order.
+ *
+ * @param messagePrioritySupported the messagePrioritySupported to set
+ */
+ public void setMessagePrioritySupported(boolean messagePrioritySupported) {
+ this.messagePrioritySupported = messagePrioritySupported;
+ }
+
+ /**
+ * Returns the prefix applied to Queues that are created by the client.
+ *
+ * @return the currently configured Queue prefix.
+ */
+ public String getQueuePrefix() {
+ return queuePrefix;
+ }
+
+ public void setQueuePrefix(String queuePrefix) {
+ this.queuePrefix = queuePrefix;
+ }
+
+ /**
+ * Returns the prefix applied to Temporary Queues that are created by the client.
+ *
+ * @return the currently configured Temporary Queue prefix.
+ */
+ public String getTempQueuePrefix() {
+ return tempQueuePrefix;
+ }
+
+ public void setTempQueuePrefix(String tempQueuePrefix) {
+ this.tempQueuePrefix = tempQueuePrefix;
+ }
+
+ /**
+ * Returns the prefix applied to Temporary Topics that are created by the client.
+ *
+ * @return the currently configured Temporary Topic prefix.
+ */
+ public String getTempTopicPrefix() {
+ return tempTopicPrefix;
+ }
+
+ public void setTempTopicPrefix(String tempTopicPrefix) {
+ this.tempTopicPrefix = tempTopicPrefix;
+ }
+
+ /**
+ * Returns the prefix applied to Topics that are created by the client.
+ *
+ * @return the currently configured Topic prefix.
+ */
+ public String getTopicPrefix() {
+ return topicPrefix;
+ }
+
+ public void setTopicPrefix(String topicPrefix) {
+ this.topicPrefix = topicPrefix;
+ }
+
+ /**
+ * Gets the currently set close timeout.
+ *
+ * @return the currently set close timeout.
+ */
+ public long getCloseTimeout() {
+ return closeTimeout;
+ }
+
+ /**
+ * Sets the close timeout used to control how long a Connection close will wait for
+ * clean shutdown of the connection before giving up. A negative value means wait
+ * forever.
+ *
+ * Care should be taken in that a very short close timeout can cause the client to
+ * not cleanly shutdown the connection and it's resources.
+ *
+ * @param closeTimeout
+ * time in milliseconds to wait for a clean connection close.
+ */
+ public void setCloseTimeout(long closeTimeout) {
+ this.closeTimeout = closeTimeout;
+ }
+
+ /**
+ * Returns the currently configured wire level connect timeout.
+ *
+ * @return the currently configured wire level connect timeout.
+ */
+ public long getConnectTimeout() {
+ return this.connectTimeout;
+ }
+
+ /**
+ * Sets the timeout value used to control how long a client will wait for a successful
+ * connection to the remote peer to be established before considering the attempt to
+ * have failed. This value does not control socket level connection timeout but rather
+ * connection handshake at the wire level, to control the socket level timeouts use the
+ * standard socket options configuration values.
+ *
+ * @param connectTimeout
+ * the time in milliseconds to wait for the protocol connection handshake to complete.
+ */
+ public void setConnectTimeout(long connectTimeout) {
+ this.connectTimeout = connectTimeout;
+ }
+
+ public long getSendTimeout() {
+ return sendTimeout;
+ }
+
+ public void setSendTimeout(long sendTimeout) {
+ this.sendTimeout = sendTimeout;
+ }
+
+ public long getRequestTimeout() {
+ return requestTimeout;
+ }
+
+ public void setRequestTimeout(long requestTimeout) {
+ this.requestTimeout = requestTimeout;
+ }
+
+ public JmsPrefetchPolicy getPrefetchPolicy() {
+ return prefetchPolicy;
+ }
+
+ public void setPrefetchPolicy(JmsPrefetchPolicy prefetchPolicy) {
+ this.prefetchPolicy = prefetchPolicy;
+ }
+
+ public String getClientIDPrefix() {
+ return clientIDPrefix;
+ }
+
+ /**
+ * Sets the prefix used by auto-generated JMS Client ID values which are used if the JMS
+ * client does not explicitly specify on.
+ *
+ * @param clientIDPrefix
+ */
+ public void setClientIDPrefix(String clientIDPrefix) {
+ this.clientIDPrefix = clientIDPrefix;
+ }
+
+ protected synchronized IdGenerator getClientIdGenerator() {
+ if (clientIdGenerator == null) {
+ if (clientIDPrefix != null) {
+ clientIdGenerator = new IdGenerator(clientIDPrefix);
+ } else {
+ clientIdGenerator = new IdGenerator();
+ }
+ }
+ return clientIdGenerator;
+ }
+
+ protected void setClientIdGenerator(IdGenerator clientIdGenerator) {
+ this.clientIdGenerator = clientIdGenerator;
+ }
+
+ /**
+ * Sets the prefix used by connection id generator.
+ *
+ * @param connectionIDPrefix
+ * The string prefix used on all connection Id's created by this factory.
+ */
+ public void setConnectionIDPrefix(String connectionIDPrefix) {
+ this.connectionIDPrefix = connectionIDPrefix;
+ }
+
+ /**
+ * Gets the currently configured JMS ExceptionListener that will be set on all
+ * new Connection objects created from this factory.
+ *
+ * @return the currently configured JMS ExceptionListener.
+ */
+ public ExceptionListener getExceptionListener() {
+ return exceptionListener;
+ }
+
+ /**
+ * Sets the JMS ExceptionListener that will be set on all new Connection objects
+ * created from this factory.
+ *
+ * @param exceptionListener
+ * the JMS ExceptionListenenr to apply to new Connection's or null to clear.
+ */
+ public void setExceptionListener(ExceptionListener exceptionListener) {
+ this.exceptionListener = exceptionListener;
+ }
+
+ /**
+ * Indicates if the Connection's created from this factory will watch for updates
+ * from the remote peer informing of temporary destination creation and destruction.
+ *
+ * @return true if destination monitoring is enabled.
+ */
+ public boolean isWatchRemoteDestinations() {
+ return watchRemoteDestinations;
+ }
+
+ /**
+ * Enable or disable monitoring of remote temporary destination life-cycles.
+ *
+ * @param watchRemoteDestinations
+ * true if connection instances should monitor remote destination life-cycles.
+ */
+ public void setWatchRemoteDestinations(boolean watchRemoteDestinations) {
+ this.watchRemoteDestinations = watchRemoteDestinations;
+ }
+
+ /**
+ * Returns true if the client should always send messages using a synchronous
+ * send operation regardless of persistence mode, or inside a transaction.
+ *
+ * @return true if sends should always be done synchronously.
+ */
+ public boolean isAlwaysSyncSend() {
+ return alwaysSyncSend;
+ }
+
+ /**
+ * Configures whether or not the client will always send messages synchronously or not
+ * regardless of other factors that might result in an asynchronous send.
+ *
+ * @param alwaysSyncSend
+ * if true sends are always done synchronously.
+ */
+ public void setAlwaysSyncSend(boolean alwaysSyncSend) {
+ this.alwaysSyncSend = alwaysSyncSend;
+ }
+
+ /**
+ * @return true if consumer acknowledgments are sent asynchronously or not.
+ */
+ public boolean isSendAcksAsync() {
+ return sendAcksAsync;
+ }
+
+ /**
+ * Should the message acknowledgments from a consumer be sent synchronously or
+ * asynchronously. Sending the acknowledgments asynchronously can increase the
+ * performance of a consumer but opens up the possibility of a missed message
+ * acknowledge should the connection be unstable.
+ *
+ * @param sendAcksAsync
+ * true to have the client send all message acknowledgments asynchronously.
+ */
+ public void setSendAcksAsync(boolean sendAcksAsync) {
+ this.sendAcksAsync = sendAcksAsync;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
new file mode 100644
index 0000000..2439760
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionListener.java
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.net.URI;
+
+import org.apache.qpid.jms.message.JmsInboundMessageDispatch;
+
+/**
+ * Providers an interface for client's to listener to events related to
+ * an JmsConnection.
+ */
+public interface JmsConnectionListener {
+
+ /**
+ * Called when an unrecoverable error occurs and the Connection must be closed.
+ *
+ * @param error
+ * The error that triggered the failure.
+ */
+ void onConnectionFailure(Throwable error);
+
+ /**
+ * Called when the Connection to the remote peer is lost.
+ *
+ * @param remoteURI
+ * The URI of the Broker previously connected to.
+ */
+ void onConnectionInterrupted(URI remoteURI);
+
+ /**
+ * Called when normal communication has been restored to a remote peer.
+ *
+ * @param remoteURI
+ * The URI of the Broker that this client is now connected to.
+ */
+ void onConnectionRestored(URI remoteURI);
+
+ /**
+ * Called when a Connection is notified that a new Message has arrived for
+ * one of it's currently active subscriptions.
+ *
+ * @param envelope
+ * The envelope that contains the incoming message and it's delivery information.
+ */
+ void onMessage(JmsInboundMessageDispatch envelope);
+
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java
new file mode 100644
index 0000000..f674320
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsConnectionMetaData.java
@@ -0,0 +1,164 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.BufferedReader;
+import java.io.InputStream;
+import java.io.InputStreamReader;
+import java.util.Enumeration;
+import java.util.Vector;
+import java.util.regex.Matcher;
+import java.util.regex.Pattern;
+
+import javax.jms.ConnectionMetaData;
+
+/**
+ * A <CODE>ConnectionMetaData</CODE> object provides information describing
+ * the <CODE>Connection</CODE> object.
+ */
+public final class JmsConnectionMetaData implements ConnectionMetaData {
+
+ public static final String PROVIDER_VERSION;
+ public static final int PROVIDER_MAJOR_VERSION;
+ public static final int PROVIDER_MINOR_VERSION;
+
+ public static final JmsConnectionMetaData INSTANCE = new JmsConnectionMetaData();
+
+ static {
+ String version = null;
+ int major = 0;
+ int minor = 0;
+ try {
+ Package p = Package.getPackage("org.apache.qpid.jms");
+ if (p != null) {
+ version = p.getImplementationVersion();
+ Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+ Matcher m = pattern.matcher(version);
+ if (m.matches()) {
+ major = Integer.parseInt(m.group(1));
+ minor = Integer.parseInt(m.group(2));
+ }
+ }
+ } catch (Throwable e) {
+ InputStream in = null;
+ if ((in = JmsConnectionMetaData.class.getResourceAsStream("/org/apache/qpid/jms/version.txt")) != null) {
+ try {
+ BufferedReader reader = new BufferedReader(new InputStreamReader(in));
+ version = reader.readLine();
+ Pattern pattern = Pattern.compile("(\\d+)\\.(\\d+).*");
+ Matcher m = pattern.matcher(version);
+ if (m.matches()) {
+ major = Integer.parseInt(m.group(1));
+ minor = Integer.parseInt(m.group(2));
+ }
+ reader.close();
+ } catch(Throwable err) {
+ }
+ }
+ }
+ PROVIDER_VERSION = version;
+ PROVIDER_MAJOR_VERSION = major;
+ PROVIDER_MINOR_VERSION = minor;
+ }
+
+ private JmsConnectionMetaData() {}
+
+ /**
+ * Gets the JMS API version.
+ *
+ * @return the JMS API version
+ */
+ @Override
+ public String getJMSVersion() {
+ return "1.1";
+ }
+
+ /**
+ * Gets the JMS major version number.
+ *
+ * @return the JMS API major version number
+ */
+ @Override
+ public int getJMSMajorVersion() {
+ return 1;
+ }
+
+ /**
+ * Gets the JMS minor version number.
+ *
+ * @return the JMS API minor version number
+ */
+ @Override
+ public int getJMSMinorVersion() {
+ return 1;
+ }
+
+ /**
+ * Gets the JMS provider name.
+ *
+ * @return the JMS provider name
+ */
+ @Override
+ public String getJMSProviderName() {
+ return "QpidJMS";
+ }
+
+ /**
+ * Gets the JMS provider version.
+ *
+ * @return the JMS provider version
+ */
+ @Override
+ public String getProviderVersion() {
+ return PROVIDER_VERSION;
+ }
+
+ /**
+ * Gets the JMS provider major version number.
+ *
+ * @return the JMS provider major version number
+ */
+ @Override
+ public int getProviderMajorVersion() {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ /**
+ * Gets the JMS provider minor version number.
+ *
+ * @return the JMS provider minor version number
+ */
+ @Override
+ public int getProviderMinorVersion() {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ /**
+ * Gets an enumeration of the JMSX property names.
+ *
+ * @return an Enumeration of JMSX property names
+ */
+ @Override
+ public Enumeration<String> getJMSXPropertyNames() {
+ Vector<String> jmxProperties = new Vector<String>();
+ jmxProperties.add("JMSXUserID");
+ jmxProperties.add("JMSXGroupID");
+ jmxProperties.add("JMSXGroupSeq");
+ jmxProperties.add("JMSXDeliveryCount");
+ return jmxProperties.elements();
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java
new file mode 100644
index 0000000..a86e0f7
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDestination.java
@@ -0,0 +1,189 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.io.Externalizable;
+import java.io.IOException;
+import java.io.ObjectInput;
+import java.io.ObjectOutput;
+import java.util.Map;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.jndi.JNDIStorable;
+import org.apache.qpid.jms.meta.JmsResource;
+import org.apache.qpid.jms.meta.JmsResourceVistor;
+
+/**
+ * Jms Destination
+ */
+public abstract class JmsDestination extends JNDIStorable implements JmsResource, Externalizable, javax.jms.Destination, Comparable<JmsDestination> {
+
+ protected transient String name;
+ protected transient boolean topic;
+ protected transient boolean temporary;
+ protected transient int hashValue;
+ protected transient JmsConnection connection;
+
+ protected JmsDestination(String name, boolean topic, boolean temporary) {
+ this.name = name;
+ this.topic = topic;
+ this.temporary = temporary;
+ }
+
+ public abstract JmsDestination copy();
+
+ @Override
+ public String toString() {
+ return name;
+ }
+
+ /**
+ * @return name of destination
+ */
+ public String getName() {
+ return this.name;
+ }
+
+ public void setName(String name) {
+ this.name = name;
+ }
+
+ /**
+ * @return the topic
+ */
+ public boolean isTopic() {
+ return this.topic;
+ }
+
+ /**
+ * @return the temporary
+ */
+ public boolean isTemporary() {
+ return this.temporary;
+ }
+
+ /**
+ * @return true if a Topic
+ */
+ public boolean isQueue() {
+ return !this.topic;
+ }
+
+ /**
+ * @param props
+ */
+ @Override
+ protected void buildFromProperties(Map<String, String> props) {
+ setName(getProperty(props, "name", ""));
+ Boolean bool = Boolean.valueOf(getProperty(props, "topic", Boolean.TRUE.toString()));
+ this.topic = bool.booleanValue();
+ bool = Boolean.valueOf(getProperty(props, "temporary", Boolean.FALSE.toString()));
+ this.temporary = bool.booleanValue();
+ }
+
+ /**
+ * @param props
+ */
+ @Override
+ protected void populateProperties(Map<String, String> props) {
+ props.put("name", getName());
+ props.put("topic", Boolean.toString(isTopic()));
+ props.put("temporary", Boolean.toString(isTemporary()));
+ }
+
+ /**
+ * @param other
+ * the Object to be compared.
+ * @return a negative integer, zero, or a positive integer as this object is
+ * less than, equal to, or greater than the specified object.
+ * @see java.lang.Comparable#compareTo(java.lang.Object)
+ */
+ @Override
+ public int compareTo(JmsDestination other) {
+ if (other != null) {
+ if (this == other) {
+ return 0;
+ }
+ if (isTemporary() == other.isTemporary()) {
+ return getName().compareTo(other.getName());
+ }
+ return -1;
+ }
+ return -1;
+ }
+
+ @Override
+ public boolean equals(Object o) {
+ if (this == o) {
+ return true;
+ }
+ if (o == null || getClass() != o.getClass()) {
+ return false;
+ }
+
+ JmsDestination d = (JmsDestination) o;
+ return getName().equals(d.getName());
+ }
+
+ @Override
+ public int hashCode() {
+ if (hashValue == 0) {
+ hashValue = getName().hashCode();
+ }
+ return hashValue;
+ }
+
+ @Override
+ public void writeExternal(ObjectOutput out) throws IOException {
+ out.writeUTF(getName());
+ out.writeBoolean(isTopic());
+ out.writeBoolean(isTemporary());
+ }
+
+ @Override
+ public void readExternal(ObjectInput in) throws IOException, ClassNotFoundException {
+ setName(in.readUTF());
+ this.topic = in.readBoolean();
+ this.temporary = in.readBoolean();
+ }
+
+ void setConnection(JmsConnection connection) {
+ this.connection = connection;
+ }
+
+ JmsConnection getConnection() {
+ return this.connection;
+ }
+
+ /**
+ * Attempts to delete the destination if there is an assigned Connection object.
+ *
+ * @throws JMSException if an error occurs or the provider doesn't support
+ * delete of destinations from the client.
+ */
+ protected void tryDelete() throws JMSException {
+ if (connection != null) {
+ connection.deleteDestination(this);
+ }
+ }
+
+ @Override
+ public void visit(JmsResourceVistor visitor) throws Exception {
+ visitor.processDestination(this);
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java
new file mode 100644
index 0000000..b7ae1b9
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsDurableTopicSubscriber.java
@@ -0,0 +1,47 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.meta.JmsConsumerId;
+
+/**
+ * Implementation of a TopicSubscriber that is Durable
+ */
+public class JmsDurableTopicSubscriber extends JmsTopicSubscriber {
+
+ /**
+ * Creates a durable TopicSubscriber
+ *
+ * @param id
+ * @param s
+ * @param destination
+ * @param name
+ * @param noLocal
+ * @param selector
+ * @throws JMSException
+ */
+ public JmsDurableTopicSubscriber(JmsConsumerId id, JmsSession s, JmsDestination destination, String name, boolean noLocal, String selector) throws JMSException {
+ super(id, s, destination, name, noLocal, selector);
+ }
+
+ @Override
+ public boolean isDurableSubscription() {
+ return true;
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
new file mode 100644
index 0000000..c9395ba
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsLocalTransactionContext.java
@@ -0,0 +1,216 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import java.util.ArrayList;
+import java.util.List;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.jms.exceptions.JmsExceptionSupport;
+import org.apache.qpid.jms.meta.JmsTransactionId;
+import org.apache.qpid.jms.meta.JmsTransactionInfo;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+/**
+ * Manages the details of a Session operating inside of a local JMS transaction.
+ */
+public class JmsLocalTransactionContext {
+
+ private static final Logger LOG = LoggerFactory.getLogger(JmsLocalTransactionContext.class);
+
+ private List<JmsTxSynchronization> synchronizations;
+ private final JmsSession session;
+ private final JmsConnection connection;
+ private JmsTransactionId transactionId;
+ private JmsTransactionListener listener;
+
+ public JmsLocalTransactionContext(JmsSession session) {
+ this.session = session;
+ this.connection = session.getConnection();
+ }
+
+ /**
+ * Adds the given Transaction synchronization to the current list.
+ *
+ * @param synchronization
+ * the transaction synchronization to add.
+ */
+ public void addSynchronization(JmsTxSynchronization s) {
+ if (synchronizations == null) {
+ synchronizations = new ArrayList<JmsTxSynchronization>(10);
+ }
+ synchronizations.add(s);
+ }
+
+ /**
+ * Clears the current Transacted state. This is usually done when the client
+ * detects that a failover has occurred and needs to create a new Transaction
+ * for a Session that was previously enlisted in a transaction.
+ */
+ public void clear() {
+ this.transactionId = null;
+ this.synchronizations = null;
+ }
+
+ /**
+ * Start a local transaction.
+ *
+ * @throws javax.jms.JMSException on internal error
+ */
+ public void begin() throws JMSException {
+ if (transactionId == null) {
+ synchronizations = null;
+
+ transactionId = connection.getNextTransactionId();
+ JmsTransactionInfo transaction = new JmsTransactionInfo(session.getSessionId(), transactionId);
+ connection.createResource(transaction);
+
+ if (listener != null) {
+ listener.onTransactionStarted();
+ }
+
+ LOG.debug("Begin: {}", transactionId);
+ }
+ }
+
+ /**
+ * Rolls back any work done in this transaction and releases any locks
+ * currently held.
+ *
+ * @throws JMSException
+ * if the JMS provider fails to roll back the transaction due to some internal error.
+ */
+ public void rollback() throws JMSException {
+ if (transactionId != null) {
+ LOG.debug("Rollback: {} syncCount: {}", transactionId,
+ (synchronizations != null ? synchronizations.size() : 0));
+
+ transactionId = null;
+ connection.rollback(session.getSessionId());
+
+ if (listener != null) {
+ listener.onTransactionRolledBack();
+ }
+ }
+
+ afterRollback();
+ }
+
+ /**
+ * Commits all work done in this transaction and releases any locks
+ * currently held.
+ *
+ * @throws JMSException
+ * if the JMS provider fails to roll back the transaction due to some internal error.
+ */
+ public void commit() throws JMSException {
+ if (transactionId != null) {
+ LOG.debug("Commit: {} syncCount: {}", transactionId,
+ (synchronizations != null ? synchronizations.size() : 0));
+
+ JmsTransactionId oldTransactionId = this.transactionId;
+ transactionId = null;
+ try {
+ connection.commit(session.getSessionId());
+ if (listener != null) {
+ listener.onTransactionCommitted();
+ }
+ afterCommit();
+ } catch (JMSException cause) {
+ LOG.info("Commit failed for transaction: {}", oldTransactionId);
+ if (listener != null) {
+ listener.onTransactionRolledBack();
+ }
+ afterRollback();
+ throw cause;
+ }
+ }
+ }
+
+ @Override
+ public String toString() {
+ return "JmsLocalTransactionContext{transactionId=" + transactionId + "}";
+ }
+
+ //------------- Getters and Setters --------------------------------------//
+
+ public JmsTransactionId getTransactionId() {
+ return this.transactionId;
+ }
+
+ public JmsTransactionListener getListener() {
+ return listener;
+ }
+
+ public void setListener(JmsTransactionListener listener) {
+ this.listener = listener;
+ }
+
+ public boolean isInTransaction() {
+ return this.transactionId != null;
+ }
+
+ //------------- Implementation methods -----------------------------------//
+
+ private void afterRollback() throws JMSException {
+ if (synchronizations == null) {
+ return;
+ }
+
+ Throwable firstException = null;
+ int size = synchronizations.size();
+ for (int i = 0; i < size; i++) {
+ try {
+ synchronizations.get(i).afterRollback();
+ } catch (Throwable thrown) {
+ LOG.debug("Exception from afterRollback on " + synchronizations.get(i), thrown);
+ if (firstException == null) {
+ firstException = thrown;
+ }
+ }
+ }
+ synchronizations = null;
+ if (firstException != null) {
+ throw JmsExceptionSupport.create(firstException);
+ }
+ }
+
+ private void afterCommit() throws JMSException {
+ if (synchronizations == null) {
+ return;
+ }
+
+ Throwable firstException = null;
+ int size = synchronizations.size();
+ for (int i = 0; i < size; i++) {
+ try {
+ synchronizations.get(i).afterCommit();
+ } catch (Throwable thrown) {
+ LOG.debug("Exception from afterCommit on " + synchronizations.get(i), thrown);
+ if (firstException == null) {
+ firstException = thrown;
+ }
+ }
+ }
+ synchronizations = null;
+ if (firstException != null) {
+ throw JmsExceptionSupport.create(firstException);
+ }
+ }
+}
http://git-wip-us.apache.org/repos/asf/qpid-jms/blob/e4decdc1/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java
----------------------------------------------------------------------
diff --git a/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java
new file mode 100644
index 0000000..31f53c6
--- /dev/null
+++ b/qpid-jms-client/src/main/java/org/apache/qpid/jms/JmsMessageAvailableConsumer.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.jms;
+
+import javax.jms.MessageConsumer;
+
+/**
+ * Marker interface used for MessageConsumer instances that support sending
+ * a notification event when a message has arrived when the consumer is not
+ * in asynchronous dispatch mode.
+ */
+public interface JmsMessageAvailableConsumer {
+
+ /**
+ * Sets the listener used to notify synchronous consumers that there is a message
+ * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+ *
+ * @param availableListener
+ * the JmsMessageAvailableListener instance to signal.
+ */
+ void setAvailableListener(JmsMessageAvailableListener availableListener);
+
+ /**
+ * Gets the listener used to notify synchronous consumers that there is a message
+ * available so that the {@link MessageConsumer#receiveNoWait()} can be called.
+ *
+ * @return the currently configured message available listener instance.
+ */
+ JmsMessageAvailableListener getAvailableListener();
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org