You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 13:36:26 UTC
svn commit: r577253 [2/7] - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity:
nclient/ nclient/impl/ nclient/util/ njms/ njms/message/
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,34 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient.util;
+
+import org.apache.qpidity.api.Message;
+
+/**
+ *A message listener
+ */
+public interface MessageListener
+{
+ /**
+ * Process an incoming message.
+ *
+ * @param message The incoming message.
+ */
+ public void onMessage(Message message);
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,59 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.nclient.MessagePartListener;
+
+/**
+ * This is a simple message assembler.
+ * Will call onMessage method of the adaptee
+ * when all message data is read.
+ *
+ * This is a good convinience utility for handling
+ * small messages
+ */
+public class MessagePartListenerAdapter implements MessagePartListener
+{
+ MessageListener _adaptee;
+ ByteBufferMessage _currentMsg;
+
+ public MessagePartListenerAdapter(MessageListener listener)
+ {
+ _adaptee = listener;
+ }
+
+ public void messageTransfer(long transferId)
+ {
+ _currentMsg = new ByteBufferMessage(transferId);
+ }
+
+ public void data(ByteBuffer src)
+ {
+ try
+ {
+ _currentMsg.appendData(src);
+ }
+ catch(IOException e)
+ {
+ // A chance for IO exception
+ // doesn't occur as we are using
+ // a ByteBuffer
+ }
+ }
+
+ public void messageHeader(Header header)
+ {
+ _currentMsg.setDeliveryProperties(header.get(DeliveryProperties.class));
+ _currentMsg.setMessageProperties(header.get(MessageProperties.class));
+ }
+
+ public void messageReceived()
+ {
+ _adaptee.onMessage(_currentMsg);
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,38 @@
+package org.apache.qpidity.nclient.util;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public abstract class ReadOnlyMessage implements Message
+{
+ MessageProperties _messageProperties;
+ DeliveryProperties _deliveryProperties;
+
+ public void appendData(byte[] src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public void appendData(ByteBuffer src)
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source");
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _deliveryProperties;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ return _messageProperties;
+ }
+
+ public void clearData()
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data");
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,59 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SocketChannel;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+public class StreamingMessage extends ReadOnlyMessage implements Message
+{
+ SocketChannel _socChannel;
+ private int _chunkSize;
+ private ByteBuffer _readBuf;
+
+ public StreamingMessage(SocketChannel in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
+ {
+ _messageProperties = messageProperties;
+ _deliveryProperties = deliveryProperties;
+
+ _socChannel = in;
+ _chunkSize = chunkSize;
+ _readBuf = ByteBuffer.allocate(_chunkSize);
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if(_socChannel.isConnected() && _socChannel.isOpen())
+ {
+ _readBuf.clear();
+ _socChannel.read(_readBuf);
+ }
+ else
+ {
+ throw new EOFException("The underlying socket/channel has closed");
+ }
+
+ return _readBuf.duplicate();
+ }
+
+ /**
+ * This message is used by an application user to
+ * provide data to the client library using pull style
+ * semantics. Since the message is not transfered yet, it
+ * does not have a transfer id. Hence this method is not
+ * applicable to this implementation.
+ */
+ public long getMessageTransferId()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,521 @@
+package org.apache.qpidity.njms;
+
+import javax.jms.*;
+import javax.naming.*;
+import javax.naming.spi.ObjectFactory;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.BrokerDetails;
+import org.apache.qpidity.url.QpidURLImpl;
+import org.apache.qpidity.url.QpidURL;
+import org.apache.qpidity.url.BindingURLImpl;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import java.util.Hashtable;
+import java.net.MalformedURLException;
+
+/**
+ * Implements all the JMS connection factories.
+ * <p> In all the implementations in our code base
+ * when we create a Reference we pass in <code>ConnectionFactoryImpl</code> as the
+ * factory for creating the objects. This is the factory (or
+ * {@link ObjectFactory}) that is used to turn the description in to a real object.
+ * <p>In our construction of the Reference the last param. is null,
+ * we could put a url to a jar that contains our {@link ObjectFactory} so that
+ * any of our objects stored in JNDI can be recreated without even having
+ * the classes locally. As it is the <code>ConnectionFactoryImpl</code> must be on the
+ * classpath when you do a lookup in a JNDI context.. else you'll get a
+ * ClassNotFoundEx.
+ */
+public class ConnectionFactoryImpl implements ConnectionFactory, QueueConnectionFactory, TopicConnectionFactory,
+ XATopicConnectionFactory, XAQueueConnectionFactory, XAConnectionFactory,
+ ObjectFactory, Referenceable
+{
+ /**
+ * this ConnectionFactoryImpl's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(ConnectionFactoryImpl.class);
+
+ /**
+ * The virtual host on which the broker is deployed.
+ */
+ private String _host;
+ /**
+ * The port on which the broker is listening for connection.
+ */
+ private int _port;
+ /**
+ * The default user name used of user identification.
+ */
+ private String _defaultUsername;
+ /**
+ * The default password used of user identification.
+ */
+ private String _defaultPassword;
+ /**
+ * The virtual host on which the broker is deployed.
+ */
+ private String _virtualHost;
+ /**
+ * The URL used to build this factory, (not yet supported)
+ */
+ private QpidURL _qpidURL;
+
+ // Undefined at the moment
+ public ConnectionFactoryImpl(QpidURL url)
+ {
+ _qpidURL = url;
+ }
+
+ public ConnectionFactoryImpl(String url) throws MalformedURLException
+ {
+ _qpidURL = new QpidURLImpl(url);
+ BrokerDetails bd = _qpidURL.getAllBrokerDetails().get(0);
+ _host = bd.getHost();
+ _port = bd.getPort();
+ _defaultUsername = bd.getUserName();
+ _defaultPassword = bd.getPassword();
+ _virtualHost = bd.getVirtualHost();
+ }
+
+ /**
+ * Create a connection Factory
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param defaultUsername The user name used of user identification.
+ * @param defaultPassword The password used of user identification.
+ */
+ public ConnectionFactoryImpl(String host, int port, String virtualHost, String defaultUsername,
+ String defaultPassword)
+ {
+ _host = host;
+ _port = port;
+ _defaultUsername = defaultUsername;
+ _defaultPassword = defaultPassword;
+ _virtualHost = virtualHost;
+ }
+
+ //-- Interface ConnectionFactory
+
+ /**
+ * Creates a connection with the default user identity.
+ * <p> The connection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created connection.
+ * @throws JMSException If creating the connection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public Connection createConnection() throws JMSException
+ {
+ try
+ {
+ return new ConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a connection with the specified user identity.
+ * <p> The connection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created connection.
+ * @throws JMSException If creating the connection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public Connection createConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new ConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ // ----------------------------------------
+ // Support for JMS 1.0 classes
+ // ----------------------------------------
+ //--- Interface QueueConnection
+ /**
+ * Creates a queueConnection with the default user identity.
+ * <p> The queueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created queueConnection
+ * @throws JMSException If creating the queueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public QueueConnection createQueueConnection() throws JMSException
+ {
+ try
+ {
+ return new QueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a queueConnection with the specified user identity.
+ * <p> The queueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created queueConnection.
+ * @throws JMSException If creating the queueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public QueueConnection createQueueConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new QueueConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ //--- Interface TopicConnection
+ /**
+ * Creates a topicConnection with the default user identity.
+ * <p> The topicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created topicConnection
+ * @throws JMSException If creating the topicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public TopicConnection createTopicConnection() throws JMSException
+ {
+ try
+ {
+ return new TopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a topicConnection with the specified user identity.
+ * <p> The topicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created topicConnection.
+ * @throws JMSException If creating the topicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public TopicConnection createTopicConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new TopicConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ // ---------------------------------------------------------------------------------------------------
+ // the following methods are provided for XA compatibility
+ // ---------------------------------------------------------------------------------------------------
+
+ /**
+ * Creates a XAConnection with the default user identity.
+ * <p> The XAConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XAConnection
+ * @throws JMSException If creating the XAConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAConnection createXAConnection() throws JMSException
+ {
+ try
+ {
+ return new XAConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a XAConnection with the specified user identity.
+ * <p> The XAConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XAConnection.
+ * @throws JMSException If creating the XAConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAConnection createXAConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new XAConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+
+ /**
+ * Creates a XATopicConnection with the default user identity.
+ * <p> The XATopicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XATopicConnection
+ * @throws JMSException If creating the XATopicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XATopicConnection createXATopicConnection() throws JMSException
+ {
+ try
+ {
+ return new XATopicConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a XATopicConnection with the specified user identity.
+ * <p> The XATopicConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XATopicConnection.
+ * @throws JMSException If creating the XATopicConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XATopicConnection createXATopicConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new XATopicConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a XAQueueConnection with the default user identity.
+ * <p> The XAQueueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @return A newly created XAQueueConnection
+ * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAQueueConnection createXAQueueConnection() throws JMSException
+ {
+ try
+ {
+ return new XAQueueConnectionImpl(_host, _port, _virtualHost, _defaultUsername, _defaultPassword);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ /**
+ * Creates a XAQueueConnection with the specified user identity.
+ * <p> The XAQueueConnection is created in stopped mode. No messages
+ * will be delivered until the <code>Connection.start</code> method
+ * is explicitly called.
+ *
+ * @param username the caller's user name
+ * @param password the caller's password
+ * @return A newly created XAQueueConnection.
+ * @throws JMSException If creating the XAQueueConnection fails due to some internal error.
+ * @throws JMSSecurityException If client authentication fails due to an invalid user name or password.
+ */
+ public XAQueueConnection createXAQueueConnection(String username, String password) throws JMSException
+ {
+ try
+ {
+ return new XAQueueConnectionImpl(_host, _port, _virtualHost, username, password);
+ }
+ catch (QpidException e)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("PRoblem when creating connection", e);
+ }
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ // ----------------------------------------
+ // Support for JNDI
+ // ----------------------------------------
+
+ /**
+ * Creates an object using the location or reference information
+ * specified.
+ *
+ * @param obj The possibly null object containing location or reference
+ * information that can be used in creating an object.
+ * @param name The name of this object relative to <code>nameCtx</code>,
+ * or null if no name is specified.
+ * @param nameCtx The context relative to which the <code>name</code>
+ * parameter is specified, or null if <code>name</code> is
+ * relative to the default initial context.
+ * @param environment The possibly null environment that is used in
+ * creating the object.
+ * @return The object created; null if an object cannot be created.
+ * @throws Exception if this object factory encountered an exception
+ * while attempting to create an object, and no other object factories are
+ * to be tried.
+ */
+ public Object getObjectInstance(Object obj, Name name, Context nameCtx, Hashtable environment) throws Exception
+ {
+ if (obj instanceof Reference)
+ {
+ Reference ref = (Reference) obj;
+
+ if (ref.getClassName().equals(QueueImpl.class.getName()))
+ {
+ RefAddr addr = ref.get(QueueImpl.class.getName());
+
+ if (addr != null)
+ {
+ return new QueueImpl(new BindingURLImpl((String) addr.getContent()));
+ }
+ }
+
+ if (ref.getClassName().equals(TopicImpl.class.getName()))
+ {
+ RefAddr addr = ref.get(TopicImpl.class.getName());
+
+ if (addr != null)
+ {
+ return new TopicImpl(new BindingURLImpl((String) addr.getContent()));
+ }
+ }
+
+ if (ref.getClassName().equals(DestinationImpl.class.getName()))
+ {
+ RefAddr addr = ref.get(DestinationImpl.class.getName());
+
+ if (addr != null)
+ {
+ return new DestinationImpl(new BindingURLImpl((String) addr.getContent()));
+ }
+ }
+
+ if (ref.getClassName().equals(ConnectionFactoryImpl.class.getName()))
+ {
+ RefAddr addr = ref.get(ConnectionFactoryImpl.class.getName());
+ if (addr != null)
+ {
+ return new ConnectionFactoryImpl(new QpidURLImpl((String) addr.getContent()));
+ }
+ }
+
+ }
+ return null;
+ }
+
+ //-- interface Reference
+ /**
+ * Retrieves the Reference of this object.
+ *
+ * @return The non-null Reference of this object.
+ * @throws NamingException If a naming exception was encountered while retrieving the reference.
+ */
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(ConnectionFactoryImpl.class.getName(),
+ new StringRefAddr(ConnectionFactoryImpl.class.getName(), _qpidURL.getURL()),
+ ConnectionFactoryImpl.class.getName(), null);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,503 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.Vector;
+
+import javax.jms.Connection;
+import javax.jms.ConnectionConsumer;
+import javax.jms.ConnectionMetaData;
+import javax.jms.Destination;
+import javax.jms.ExceptionListener;
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.QueueSession;
+import javax.jms.ServerSessionPool;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.jms.TopicSession;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.url.QpidURL;
+import org.apache.qpidity.nclient.Client;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+
+/**
+ * Implements javax.njms.Connection, javax.njms.QueueConnection and javax.njms.TopicConnection
+ */
+public class ConnectionImpl implements Connection
+{
+ /**
+ * This class's logger
+ */
+ private static final Logger _logger = LoggerFactory.getLogger(ConnectionImpl.class);
+
+ /**
+ * Maps from session id (Integer) to SessionImpl instance
+ */
+ protected final Vector<SessionImpl> _sessions = new Vector<SessionImpl>();
+
+ /**
+ * This is the clientID
+ */
+ private String _clientID;
+
+ /**
+ * The Exception listenr get informed when a serious problem is detected
+ */
+ private ExceptionListener _exceptionListener;
+
+ /**
+ * Whether this connection is started, i.e. whether messages are flowing to consumers.
+ * It has no meaning for message publication.
+ */
+ private boolean _started;
+
+ /**
+ * set to true if this Connection has been closed.
+ * <p/>
+ * A closed Connection cannot accept invocations to any of its methods with the exception
+ * of close(). All other methods should throw javax.njms.IllegalStateExceptions if the
+ * Connection has been closed.
+ * <p/>
+ * A Connection is open after creation, but not started. Once it has been closed, a Connection
+ * cannot be reused any more.
+ */
+ private boolean _isClosed = false;
+
+
+ /**
+ * The QpidConeection instance that is mapped with thie JMS connection
+ */
+ org.apache.qpidity.nclient.Connection _qpidConnection;
+
+ /**
+ * This is the exception listener for this qpid connection.
+ * The njms exception listener is registered with this listener.
+ */
+ QpidExceptionListenerImpl _qpidExceptionListener;
+
+ //------ Constructors ---//
+ /**
+ * Create a connection.
+ *
+ * @param host The broker host name.
+ * @param port The port on which the broker is listening for connection.
+ * @param virtualHost The virtual host on which the broker is deployed.
+ * @param username The user name used of user identification.
+ * @param password The password name used of user identification.
+ * @throws QpidException If creating a connection fails due to some internal error.
+ */
+ protected ConnectionImpl(String host, int port, String virtualHost, String username, String password)
+ throws QpidException
+ {
+ _qpidConnection = Client.createConnection();
+ _qpidConnection.connect(host, port, virtualHost, username, password);
+ }
+
+ /**
+ * Create a connection from a QpidURL
+ *
+ * @param qpidURL The url used to create this connection
+ * @throws QpidException If creating a connection fails due to some internal error.
+ */
+ protected ConnectionImpl(QpidURL qpidURL) throws QpidException
+ {
+ _qpidConnection = Client.createConnection();
+ _qpidConnection.connect(qpidURL);
+ }
+
+ //---- Interface javax.njms.Connection ---//
+ /**
+ * Creates a Session
+ *
+ * @param transacted Indicates whether the session is transacted.
+ * @param acknowledgeMode ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
+ * <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+ * @return A newly created session
+ * @throws JMSException If the Connection object fails to create a session due to some internal error.
+ */
+ public synchronized Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ checkNotClosed();
+ SessionImpl session;
+ try
+ {
+ session = new SessionImpl(this, transacted, acknowledgeMode, false);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add this session with the list of session that are handled by this connection
+ _sessions.add(session);
+ return session;
+ }
+
+ /**
+ * Gets the client identifier for this connection.
+ * <P>It is either preconfigured as a JNDI property or assigned dynamically by the application
+ * by calling the <code>setClientID</code> method.
+ * <p/>
+ * TODO: Make sure that the client identifier can be set on the <CODE>ConnectionFactory</CODE>
+ *
+ * @return The unique client identifier.
+ * @throws JMSException If this connection is closed.
+ */
+ public String getClientID() throws JMSException
+ {
+ checkNotClosed();
+ return _clientID;
+ }
+
+ /**
+ * Sets the client identifier for this connection.
+ * <P>The preferred way to assign a JMS client's client identifier is for
+ * it to be configured in a client-specific <CODE>ConnectionFactory</CODE>
+ * object and transparently assigned to the <CODE>Connection</CODE> object
+ * it creates.
+ * <p> In Qpid it is not possible to change the client ID. If one is not specified
+ * upon connection construction, an id is generated automatically. Therefore
+ * we can always throw an exception.
+ * TODO: Make sure that the client identifier can be set on the <CODE>ConnectionFactory</CODE>
+ *
+ * @param clientID the unique client identifier
+ * @throws JMSException Always as clientID is always set at construction time.
+ */
+ public void setClientID(String clientID) throws JMSException
+ {
+ checkNotClosed();
+ throw new IllegalStateException("Client name cannot be changed after being set");
+ }
+
+ /**
+ * Gets the metadata for this connection.
+ *
+ * @return The connection metadata
+ * @throws JMSException If there ie a problem getting the connection metadata for this connection.
+ * @see javax.jms.ConnectionMetaData
+ */
+ public ConnectionMetaData getMetaData() throws JMSException
+ {
+ checkNotClosed();
+ return ConnectionMetaDataImpl.getInstance();
+ }
+
+ /**
+ * Gets the <CODE>ExceptionListener</CODE> object for this connection.
+ *
+ * @return the <CODE>ExceptionListener</CODE> for this connection
+ * @throws JMSException In case of unforeseen problem
+ */
+ public synchronized ExceptionListener getExceptionListener() throws JMSException
+ {
+ checkNotClosed();
+ return _exceptionListener;
+ }
+
+ /**
+ * Sets an exception listener for this connection.
+ * <p/>
+ * <p> The JMS specification says:
+ * <P>If a JMS provider detects a serious problem with a connection, it
+ * informs the connection's <CODE>ExceptionListener</CODE>, if one has been
+ * registered. It does this by calling the listener's
+ * <CODE>onException</CODE> method, passing it a <CODE>JMSException</CODE>
+ * object describing the problem.
+ * <p/>
+ * <P>A connection serializes execution of its
+ * <CODE>ExceptionListener</CODE>.
+ * <p/>
+ * <P>A JMS provider should attempt to resolve connection problems
+ * itself before it notifies the client of them.
+ *
+ * @param exceptionListener The connection listener.
+ * @throws JMSException If the connection is closed.
+ */
+ public synchronized void setExceptionListener(ExceptionListener exceptionListener) throws JMSException
+ {
+ checkNotClosed();
+ _exceptionListener = exceptionListener;
+ _qpidExceptionListener.setJMSExceptionListner(_exceptionListener);
+ }
+
+ /**
+ * Starts (or restarts) a connection's delivery of incoming messages.
+ * A call to start on a connection that has already been
+ * started is ignored.
+ *
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public synchronized void start() throws JMSException
+ {
+ checkNotClosed();
+ if (!_started)
+ {
+ // start all the sessions
+ for (SessionImpl session : _sessions)
+ {
+ try
+ {
+ session.start();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+ _started = true;
+ }
+ }
+
+ /**
+ * Temporarily stops a connection's delivery of incoming messages.
+ * <p> The JMS specification says:
+ * <p> Delivery can be restarted using the connection's <CODE>start</CODE>
+ * method. When the connection is stopped, delivery to all the connection's message consumers is inhibited:
+ * synchronous receives block, and messages are not delivered to message listeners.
+ * <P>This call blocks until receives and/or message listeners in progress have completed.
+ *
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public synchronized void stop() throws JMSException
+ {
+ checkNotClosed();
+ if (_started)
+ {
+ // stop all the sessions
+ for (SessionImpl session : _sessions)
+ {
+ try
+ {
+ session.stop();
+ }
+ catch (Exception e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+ _started = false;
+ }
+ }
+
+ /**
+ * Closes the connection.
+ * <p/>
+ * <p> The JMS specification says:
+ * <P>Since a provider typically allocates significant resources outside
+ * the JVM on behalf of a connection, clients should close these resources
+ * when they are not needed. Relying on garbage collection to eventually
+ * reclaim these resources may not be timely enough.
+ * <P>There is no need to close the sessions, producers, and consumers of a closed connection.
+ * <P>Closing a connection causes all temporary destinations to be deleted.
+ * <P>When this method is invoked, it should not return until message
+ * processing has been shut down in an orderly fashion.
+ *
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public synchronized void close() throws JMSException
+ {
+ checkNotClosed();
+ if (!_isClosed)
+ {
+ _isClosed = true;
+ _started = false;
+ // close all the sessions
+ for (SessionImpl session : _sessions)
+ {
+ session.close();
+ }
+ // close the underlaying Qpid connection
+ try
+ {
+ _qpidConnection.close();
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+ }
+
+ /**
+ * Creates a connection consumer for this connection (optional operation).
+ * This is an expert facility for App server integration.
+ *
+ * @param destination The destination to access.
+ * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+ * @param sessionPool The session pool to associate with this connection consumer.
+ * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
+ * @return Null for the moment.
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages)
+ throws JMSException
+ {
+ checkNotClosed();
+ return null;
+ }
+
+ /**
+ * Create a durable connection consumer for this connection (optional operation).
+ *
+ * @param topic The topic to access.
+ * @param subscriptionName Durable subscription name.
+ * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+ * @param sessionPool The server session pool to associate with this durable connection consumer.
+ * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
+ * @return Null for the moment.
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public ConnectionConsumer createDurableConnectionConsumer(Topic topic, String subscriptionName,
+ String messageSelector, ServerSessionPool sessionPool,
+ int maxMessages) throws JMSException
+ {
+ checkNotClosed();
+ return null;
+ }
+
+ //-------------- QueueConnection API
+
+ /**
+ * Create a QueueSession.
+ *
+ * @param transacted Indicates whether the session is transacted.
+ * @param acknowledgeMode Indicates whether the consumer or the
+ * client will acknowledge any messages it receives; ignored if the session
+ * is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
+ * <code>Session.CLIENT_ACKNOWLEDGE</code> and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+ * @return A queueSession object/
+ * @throws JMSException If creating a QueueSession fails due to some internal error.
+ */
+ public synchronized QueueSession createQueueSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ checkNotClosed();
+ QueueSessionImpl queueSession;
+ try
+ {
+ queueSession = new QueueSessionImpl(this, transacted, acknowledgeMode);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add this session to the list of handled sessions.
+ _sessions.add(queueSession);
+ return queueSession;
+ }
+
+ /**
+ * Creates a connection consumer for this connection (optional operation).
+ * This is an expert facility for App server integration.
+ *
+ * @param queue The queue to access.
+ * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+ * @param sessionPool The session pool to associate with this connection consumer.
+ * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
+ * @return Null for the moment.
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages)
+ throws JMSException
+ {
+ return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages);
+ }
+
+ //-------------- TopicConnection API
+ /**
+ * Create a TopicSession.
+ *
+ * @param transacted Indicates whether the session is transacted
+ * @param acknowledgeMode Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>, and
+ * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+ * @return a newly created topic session
+ * @throws JMSException If creating the session fails due to some internal error.
+ */
+ public synchronized TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ checkNotClosed();
+ TopicSessionImpl session;
+ try
+ {
+ session = new TopicSessionImpl(this, transacted, acknowledgeMode);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ // add the session with this Connection's sessions
+ // important for when the Connection is closed.
+ _sessions.add(session);
+ return session;
+ }
+
+ /**
+ * Creates a connection consumer for this connection (optional operation).
+ * This is an expert facility for App server integration.
+ *
+ * @param topic The topic to access.
+ * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+ * @param sessionPool The session pool to associate with this connection consumer.
+ * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
+ * @return Null for the moment.
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages)
+ throws JMSException
+ {
+ return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages);
+ }
+
+ //-------------- protected and private methods
+ /**
+ * Validate that the Connection is not closed.
+ * <p/>
+ * If the Connection has been closed, throw a IllegalStateException. This behaviour is
+ * required by the JMS specification.
+ *
+ * @throws IllegalStateException If the session is closed.
+ */
+ protected synchronized void checkNotClosed() throws IllegalStateException
+ {
+ if (_isClosed)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Connection has been closed. Cannot invoke any further operations.");
+ }
+ throw new javax.jms.IllegalStateException(
+ "Connection has been closed. Cannot invoke any further operations.");
+ }
+ }
+
+ /**
+ * Provide access to the underlying qpid Connection.
+ *
+ * @return This JMS connection underlying Qpid Connection.
+ */
+ protected org.apache.qpidity.nclient.Connection getQpidConnection()
+ {
+ return _qpidConnection;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,165 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpid.common.QpidProperties;
+
+import javax.jms.ConnectionMetaData;
+import javax.jms.JMSException;
+import java.util.Enumeration;
+
+/**
+ * Implements javax.njms.ConnectionMetaData
+ * A ConnectionMetaDataImpl provides information describing the JMS <code>Connection</code>.
+ */
+public class ConnectionMetaDataImpl implements ConnectionMetaData
+{
+
+ /**
+ * A singleton instance.
+ */
+ static ConnectionMetaDataImpl _singleton = new ConnectionMetaDataImpl();
+
+ // ------------------------ The metadata
+ // JMS major version
+ private static final int JMS_MAJOR_VERSION = 1;
+ // JMS minor version
+ private static final int JMS_MINOR_VERSION = 1;
+ // JMS version
+ private static final String JMS_VERSION = "1.1";
+ // Provider name
+ private static final String PROVIDER_NAME = "Apache " + QpidProperties.getProductName();
+ // Provider major version
+ private static final int PROVIDER_MAJOR_VERSION = 0;
+ // Provider minor version
+ private static final int PROVIDER_MINOR_VERSION = 10;
+ // Provider version
+ private static final String PROVIDER_VERSION = QpidProperties.getProductName() + " (Client: [" + QpidProperties.getBuildVersion() + "] ; Protocol: [ 0.10 ] )";
+
+ /**
+ * Prevent instantiation.
+ */
+ private ConnectionMetaDataImpl()
+ {
+ }
+
+ /**
+ * Get the singleton instance of ConnectionMetaDataImpl.
+ *
+ * @return the singleton instance of ConnectionMetaDataImpl.
+ */
+ public static ConnectionMetaDataImpl getInstance()
+ {
+ return _singleton;
+ }
+
+ //-- Connection MetaData API
+
+ /**
+ * Gets the JMS API version.
+ *
+ * @return the JMS API version
+ * @throws JMSException Never
+ */
+ public String getJMSVersion() throws JMSException
+ {
+ return JMS_VERSION;
+ }
+
+
+ /**
+ * Gets the JMS major version number.
+ *
+ * @return the JMS API major version number
+ * @throws JMSException Never
+ */
+ public int getJMSMajorVersion() throws JMSException
+ {
+ return JMS_MAJOR_VERSION;
+ }
+
+
+ /**
+ * Gets the JMS minor version number.
+ *
+ * @return the JMS API minor version number
+ * @throws JMSException Never
+ */
+ public int getJMSMinorVersion() throws JMSException
+ {
+ return JMS_MINOR_VERSION;
+ }
+
+
+ /**
+ * Gets Qpid name.
+ *
+ * @return Qpid name
+ * @throws JMSException Never
+ */
+ public String getJMSProviderName() throws JMSException
+ {
+ return PROVIDER_NAME;
+ }
+
+ /**
+ * Gets Qpid version.
+ *
+ * @return Qpid version
+ * @throws JMSException Never
+ */
+ public String getProviderVersion() throws JMSException
+ {
+ return PROVIDER_VERSION;
+ // TODO: We certainly can dynamically get the server version.
+ }
+
+ /**
+ * Gets Qpid major version number.
+ *
+ * @return Qpid major version number
+ * @throws JMSException Never
+ */
+ public int getProviderMajorVersion() throws JMSException
+ {
+ return PROVIDER_MAJOR_VERSION;
+ }
+
+ /**
+ * Gets Qpid minor version number.
+ *
+ * @return Qpid minor version number
+ * @throws JMSException Never
+ */
+ public int getProviderMinorVersion() throws JMSException
+ {
+ return PROVIDER_MINOR_VERSION;
+ }
+
+ /**
+ * Gets an enumeration of the JMSX property names.
+ *
+ * @return an Enumeration of JMSX property names
+ * @throws JMSException if cannot retrieve metadata due to some internal error.
+ */
+ public Enumeration getJMSXPropertyNames() throws JMSException
+ {
+ return CustomJMSXProperty.asEnumeration();
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,47 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import java.util.Enumeration;
+import java.util.ArrayList;
+import java.util.Collections;
+
+public enum CustomJMSXProperty
+{
+ JMS_AMQP_NULL,
+ JMS_QPID_DESTTYPE,
+ JMSXGroupID,
+ JMSXGroupSeq;
+
+ private static Enumeration _names;
+
+ public static synchronized Enumeration asEnumeration()
+ {
+ if (_names == null)
+ {
+ CustomJMSXProperty[] properties = values();
+ ArrayList<String> nameList = new ArrayList<String>(properties.length);
+ for (CustomJMSXProperty property : properties)
+ {
+ nameList.add(property.toString());
+ }
+ _names = Collections.enumeration(nameList);
+ }
+ return _names;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,259 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.url.BindingURL;
+import org.apache.qpidity.url.BindingURLImpl;
+import org.apache.qpidity.url.URLSyntaxException;
+import org.apache.qpid.url.URLHelper;
+
+import javax.jms.Destination;
+import javax.naming.Reference;
+import javax.naming.NamingException;
+import javax.naming.StringRefAddr;
+import javax.naming.Referenceable;
+
+/**
+ * Implementation of the JMS Destination interface
+ */
+public class DestinationImpl implements Destination, Referenceable
+{
+ /**
+ * The destination's name
+ */
+ protected String _destinationName = null;
+
+ /**
+ * The excahnge name
+ */
+ protected String _exchangeName;
+
+ /**
+ * The excahnge class
+ */
+ protected String _exchangeType;
+
+ /**
+ * The queue name
+ */
+ protected String _queueName;
+
+ /**
+ * Indicate whether this destination is exclusive
+ */
+ protected boolean _isExclusive;
+
+ /**
+ * Indicates whether this destination is auto delete.
+ */
+ protected boolean _isAutoDelete;
+
+ /**
+ * Indicates whether this destination is durable
+ */
+ protected boolean _isDurable;
+
+ protected String _routingKey;
+
+ /**
+ * The biding URL used to create this destiantion
+ */
+ protected BindingURL _url;
+
+ //--- Constructor
+
+ protected DestinationImpl(String name) throws QpidException
+ {
+ _queueName = name;
+ _routingKey = name;
+ }
+
+ /**
+ * Create a destiantion from a binding URL
+ *
+ * @param binding The URL
+ * @throws QpidException If the URL is not valid
+ */
+ public DestinationImpl(BindingURL binding) throws QpidException
+ {
+ _exchangeName = binding.getExchangeName();
+ _exchangeType = binding.getExchangeClass();
+ _destinationName = binding.getDestinationName();
+ _isExclusive = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_EXCLUSIVE));
+ _isAutoDelete = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_AUTODELETE));
+ _isDurable = Boolean.parseBoolean(binding.getOption(BindingURL.OPTION_DURABLE));
+ _queueName = binding.getQueueName();
+ _routingKey = binding.getQueueName();
+ _url = binding;
+ }
+
+ //---- Getters and Setters
+ /**
+ * Overrides Object.toString();
+ *
+ * @return Stringified destination representation.
+ */
+ public String toString()
+ {
+ return _destinationName;
+ }
+
+ /**
+ * Get the destination name.
+ *
+ * @return The destination name
+ */
+ public String getDestinationName()
+ {
+ return _destinationName;
+ }
+
+
+ /**
+ * The exchange name
+ *
+ * @return The exchange name
+ */
+ public String getExchangeName()
+ {
+ return _exchangeName;
+ }
+
+ /**
+ * The exchange type.
+ *
+ * @return The exchange type.
+ */
+ public String getExchangeType()
+ {
+ return _exchangeType;
+ }
+
+ /**
+ * The queue name.
+ *
+ * @return The queue name.
+ */
+ public String getQpidQueueName()
+ {
+ return _queueName;
+ }
+
+ /**
+ * Indicates whether this destination is exclusive.
+ *
+ * @return true if this destination is exclusive.
+ */
+ public boolean isExclusive()
+ {
+ return _isExclusive;
+ }
+
+ /**
+ * Indicates whether this destination is AutoDelete.
+ *
+ * @return true if this destination is AutoDelete.
+ */
+ public boolean isAutoDelete()
+ {
+ return _isAutoDelete;
+ }
+
+ public String getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+ /**
+ * Indicates whether this destination is Durable.
+ *
+ * @return true if this destination is Durable.
+ */
+ public boolean isDurable()
+ {
+ return _isDurable;
+ }
+
+ //----- Interface Referenceable
+ public Reference getReference() throws NamingException
+ {
+ return new Reference(this.getClass().getName(), new StringRefAddr(this.getClass().getName(), toURL()),
+ ConnectionFactoryImpl.class.getName(), // factory
+ null); // factory location
+ }
+
+ //--- non public method s
+
+ /**
+ * Get the URL used to create this destiantion
+ *
+ * @return The URL used to create this destiantion
+ */
+ public String toURL()
+ {
+ if (_url == null)
+ {
+ StringBuffer sb = new StringBuffer();
+ sb.append(_exchangeType);
+ sb.append("://");
+ sb.append(_exchangeName);
+ sb.append('/');
+ if (_destinationName != null)
+ {
+ sb.append(_destinationName);
+ }
+ sb.append('/');
+ if (_queueName != null)
+ {
+ sb.append(_queueName);
+ }
+ sb.append('?');
+ if (_isDurable)
+ {
+ sb.append(org.apache.qpid.url.BindingURL.OPTION_DURABLE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+ if (_isExclusive)
+ {
+ sb.append(org.apache.qpid.url.BindingURL.OPTION_EXCLUSIVE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+ if (_isAutoDelete)
+ {
+ sb.append(org.apache.qpid.url.BindingURL.OPTION_AUTODELETE);
+ sb.append("='true'");
+ sb.append(URLHelper.DEFAULT_OPTION_SEPERATOR);
+ }
+ //removeKey the last char '?' if there is no options , ',' if there are.
+ sb.deleteCharAt(sb.length() - 1);
+ try
+ {
+ _url = new BindingURLImpl(sb.toString());
+ }
+ catch (URLSyntaxException e)
+ {
+ // this should not happen.
+ }
+ }
+ return _url.getURL();
+ }
+}
+
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,59 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.apache.qpidity.QpidException;
+
+import javax.jms.JMSException;
+import javax.transaction.xa.XAException;
+
+/**
+ * Helper class for handling exceptions
+ */
+public class ExceptionHelper
+{
+ static public JMSException convertQpidExceptionToJMSException(Exception exception)
+ {
+ JMSException jmsException = null;
+ if (!(exception instanceof JMSException))
+ {
+ if (exception instanceof QpidException)
+ {
+ jmsException = new JMSException(exception.getMessage(), String.valueOf(((QpidException) exception).getErrorCode()));
+ }
+ else
+ {
+ jmsException = new JMSException(exception.getMessage());
+ }
+ jmsException.setLinkedException(exception);
+ }
+ else
+ {
+ jmsException = (JMSException) exception;
+ }
+ return jmsException;
+ }
+
+ static public XAException convertQpidExceptionToXAException(QpidException exception)
+ {
+ String qpidErrorCode = String.valueOf(exception.getErrorCode());
+ // todo map this error to an XA code
+ int xaCode = XAException.XAER_PROTO;
+ return new XAException(xaCode);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,176 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.njms;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import javax.jms.IllegalStateException;
+import javax.jms.JMSException;
+
+/**
+ * MessageActor is the superclass for MessageProducerImpl and MessageProducerImpl.
+ */
+public abstract class MessageActor
+{
+ /**
+ * Used for debugging.
+ */
+ protected static final Logger _logger = LoggerFactory.getLogger(MessageActor.class);
+
+ /**
+ * Indicates whether this MessageActor is closed.
+ */
+ protected boolean _isClosed = false;
+
+ /**
+ * This messageActor's session
+ */
+ private SessionImpl _session;
+
+ /**
+ * The JMS destination this actor is set for.
+ */
+ DestinationImpl _destination;
+
+ /**
+ * Indicates that this actor is stopped
+ */
+ protected boolean _isStopped;
+
+ /**
+ * The ID of this actor for the session.
+ */
+ private String _messageActorID;
+
+ //-- Constructor
+
+ //TODO define the parameters
+
+ protected MessageActor(String messageActorID)
+ {
+ _messageActorID = messageActorID;
+ }
+
+ protected MessageActor(SessionImpl session, DestinationImpl destination,String messageActorID)
+ {
+ _session = session;
+ _destination = destination;
+ _messageActorID = messageActorID;
+ }
+
+ //--- public methods (part of the njms public API)
+ /**
+ * Closes the MessageActor and deregister it from its session.
+ *
+ * @throws JMSException if the MessaeActor cannot be closed due to some internal error.
+ */
+ public void close() throws JMSException
+ {
+ if (!_isClosed)
+ {
+ closeMessageActor();
+ getSession().getQpidSession().messageCancel(getMessageActorID());
+ //todo: We need to unset the qpid message listener
+ // notify the session that this message actor is closing
+ _session.closeMessageActor(this);
+ }
+ }
+
+ //-- protected methods
+
+ /**
+ * Stop this message actor
+ *
+ * @throws Exception If the consumer cannot be stopped due to some internal error.
+ */
+ protected void stop() throws Exception
+ {
+ _isStopped = true;
+ }
+
+ /**
+ * Start this message Actor
+ *
+ * @throws Exception If the consumer cannot be started due to some internal error.
+ */
+ protected void start() throws Exception
+ {
+
+ _isStopped = false;
+
+ }
+
+ /**
+ * Check if this MessageActor is not closed.
+ * <p> If the MessageActor is closed, throw a javax.njms.IllegalStateException.
+ * <p> The method is not synchronized, since MessageProducers can only be used by a single thread.
+ *
+ * @throws IllegalStateException if the MessageActor is closed
+ */
+ protected void checkNotClosed() throws IllegalStateException
+ {
+ if (_isClosed || _session == null)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Actor " + this + " is already closed");
+ }
+ throw new IllegalStateException("Actor " + this + " is already closed");
+ }
+ _session.checkNotClosed();
+ }
+
+ /**
+ * Closes a MessageActor.
+ * <p> This method is invoked when the session is closing or when this
+ * messageActor is closing.
+ *
+ * @throws JMSException If the MessaeActor cannot be closed due to some internal error.
+ */
+ protected void closeMessageActor() throws JMSException
+ {
+ if (!_isClosed)
+ {
+ getSession().getQpidSession().messageCancel(getMessageActorID());
+ _isClosed = true;
+ }
+ }
+
+ /**
+ * Get the associated session object.
+ *
+ * @return This Actor's Session.
+ */
+ public SessionImpl getSession()
+ {
+ return _session;
+ }
+
+ /**
+ * Get the ID of this actor within its session.
+ *
+ * @return This actor ID.
+ */
+ protected String getMessageActorID()
+ {
+ return _messageActorID;
+ }
+
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java
------------------------------------------------------------------------------
svn:eol-style = native