You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/07/31 13:44:00 UTC
svn commit: r561299 -
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/
Author: arnaudsimon
Date: Tue Jul 31 04:43:58 2007
New Revision: 561299
URL: http://svn.apache.org/viewvc?view=rev&rev=561299
Log:
Implemented methods
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java (with props)
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java Tue Jul 31 04:43:58 2007
@@ -20,7 +20,6 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpidity.QpidException;
-import org.apache.qpid.nclient.jms.QueueSessionImpl;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -30,7 +29,7 @@
/**
- *
+ * Implements javax.jms.Connection, javax.jms.QueueConnection adn javax.jms.TopicConnection
*/
public class ConnectionImpl implements Connection, QueueConnection, TopicConnection
{
@@ -104,9 +103,22 @@
//---- Interface javax.jms.Connection ---//
- public Session createSession(boolean b, int i) throws JMSException
+ /**
+ * Creates a Session
+ *
+ * @param transacted Indicates whether the session is transacted.
+ * @param acknowledgeMode ignored if the session is transacted. Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>,
+ * <code>Session.CLIENT_ACKNOWLEDGE</code>, and <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+ * @return A newly created session
+ * @throws JMSException If the Connection object fails to create a session due to some internal error.
+ */
+ public Session createSession(boolean transacted, int acknowledgeMode) throws JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ checkNotClosed();
+ SessionImpl session = new SessionImpl(this, transacted, acknowledgeMode);
+ // add this session with the list of session that are handled by this connection
+ _sessions.add(session);
+ return session;
}
/**
@@ -334,23 +346,60 @@
return null; //To change body of implemented methods use File | Settings | File Templates.
}
- public ConnectionConsumer createConnectionConsumer(Queue queue, String string, ServerSessionPool serverSessionPool,
- int i) throws JMSException
+ /**
+ * Creates a connection consumer for this connection (optional operation).
+ * This is an expert facility for App server integration.
+ *
+ * @param queue The queue to access.
+ * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+ * @param sessionPool The session pool to associate with this connection consumer.
+ * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
+ * @return Null for the moment.
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public ConnectionConsumer createConnectionConsumer(Queue queue, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws
+ JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return createConnectionConsumer((Destination) queue, messageSelector, sessionPool, maxMessages);
}
//-------------- TopicConnection API
-
- public TopicSession createTopicSession(boolean b, int i) throws JMSException
- {
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ /**
+ * Create a TopicSession.
+ *
+ * @param transacted Indicates whether the session is transacted
+ * @param acknowledgeMode Legal values are <code>Session.AUTO_ACKNOWLEDGE</code>, <code>Session.CLIENT_ACKNOWLEDGE</code>, and
+ * <code>Session.DUPS_OK_ACKNOWLEDGE</code>.
+ * @return a newly created topic session
+ * @throws JMSException If creating the session fails due to some internal error.
+ */
+ public TopicSession createTopicSession(boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ checkNotClosed();
+ TopicSessionImpl session = new TopicSessionImpl(this, transacted, acknowledgeMode);
+ // add the session with this Connection's sessions
+ // important for when the Connection is closed.
+ _sessions.add(session);
+ return session;
}
- public ConnectionConsumer createConnectionConsumer(Topic topic, String string, ServerSessionPool serverSessionPool,
- int i) throws JMSException
+ /**
+ * Creates a connection consumer for this connection (optional operation).
+ * This is an expert facility for App server integration.
+ *
+ * @param topic The topic to access.
+ * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+ * @param sessionPool The session pool to associate with this connection consumer.
+ * @param maxMessages The maximum number of messages that can be assigned to a server session at one time.
+ * @return Null for the moment.
+ * @throws JMSException In case of a problem due to some internal error.
+ */
+ public ConnectionConsumer createConnectionConsumer(Topic topic, String messageSelector,
+ ServerSessionPool sessionPool, int maxMessages) throws
+ JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return createConnectionConsumer((Destination) topic, messageSelector, sessionPool, maxMessages);
}
//-------------- protected and private methods
@@ -375,4 +424,13 @@
}
}
+ /**
+ * Provide access to the underlying qpid Connection.
+ *
+ * @return This JMS connection underlying Qpid Connection.
+ */
+ protected org.apache.qpid.nclient.api.Connection getQpidConnection()
+ {
+ return _qpidConnection;
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionMetaDataImpl.java Tue Jul 31 04:43:58 2007
@@ -24,8 +24,8 @@
import java.util.Enumeration;
/**
- * A <CODE>ConnectionMetaDataImpl</CODE> object provides information describing the
- * JMS <CODE>Connection</CODE>.
+ * Implements javax.jms.ConnectionMetaData
+ * A ConnectionMetaDataImpl provides information describing the JMS <code>Connection</code>.
*/
public class ConnectionMetaDataImpl implements ConnectionMetaData
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java Tue Jul 31 04:43:58 2007
@@ -50,7 +50,7 @@
/**
* The subscription name
*/
- private String _subscriptionName;
+ protected String _subscriptionName;
/**
* A MessageListener set up for this consumer.
@@ -58,14 +58,33 @@
private MessageListener _messageListener = null;
//----- Constructors
+ /**
+ * Create a new MessageProducerImpl.
+ *
+ * @param session The session from which the MessageProducerImpl is instantiated
+ * @param destination The default destination for this MessageProducerImpl
+ * @param messageSelector The message selector for this QueueReceiverImpl.
+ * @param noLocal If true inhibits the delivery of messages published by its own connection.
+ * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
+ * If this value is null, a non-durable subscription is created.
+ * @throws JMSException If the MessageProducerImpl cannot be created due to some internal error.
+ */
protected MessageConsumerImpl(SessionImpl session, DestinationImpl destination, String messageSelector,
- boolean noLocal, String subscriptionName)
+ boolean noLocal, String subscriptionName) throws JMSException
{
super(session, destination);
_messageSelector = messageSelector;
_noLocal = noLocal;
_subscriptionName = subscriptionName;
-
+ try
+ {
+ // TODO define the relevant options
+ _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), null);
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
}
//----- Message consumer API
@@ -112,22 +131,48 @@
public void setMessageListener(MessageListener messageListener) throws JMSException
{
checkNotClosed();
- // create a message listener wrapper
+ // TODO: create a message listener wrapper
}
+ /**
+ * Receive the next message produced for this message consumer.
+ * <P>This call blocks indefinitely until a message is produced or until this message consumer is closed.
+ *
+ * @return The next message produced for this message consumer, or
+ * null if this message consumer is concurrently closed
+ * @throws JMSException If receiving the next message fails due to some internal error.
+ */
public Message receive() throws JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return receive(0);
}
- public Message receive(long l) throws JMSException
+ /**
+ * Receive the next message that arrives within the specified timeout interval.
+ * <p> This call blocks until a message arrives, the timeout expires, or this message consumer is closed.
+ * <p> A timeout of zero never expires, and the call blocks indefinitely.
+ * <p> A timeout less than 0 throws a JMSException.
+ *
+ * @param timeout The timeout value (in milliseconds)
+ * @return The next message that arrives within the specified timeout interval.
+ * @throws JMSException If receiving the next message fails due to some internal error.
+ */
+ public Message receive(long timeout) throws JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ Message message = null;
+ // todo convert this message into a JMS one: _qpidReceiver.receive(-1);
+ return message;
}
+ /**
+ * Receive the next message if one is immediately available.
+ *
+ * @return the next message or null if one is not available.
+ * @throws JMSException If receiving the next message fails due to some internal error.
+ */
public Message receiveNoWait() throws JMSException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return receive(-1);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageProducerImpl.java Tue Jul 31 04:43:58 2007
@@ -23,7 +23,7 @@
import javax.jms.Message;
/**
- *
+ * Implements MessageProducer
*/
public class MessageProducerImpl extends MessageActor implements MessageProducer
{
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java?view=auto&rev=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java Tue Jul 31 04:43:58 2007
@@ -0,0 +1,55 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import javax.jms.QueueReceiver;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+
+/**
+ * Implements javax.jms.QueueReceiver
+ */
+public class QueueReceiverImpl extends MessageConsumerImpl implements QueueReceiver
+{
+ //--- Constructor
+ /**
+ * create a new QueueReceiverImpl.
+ *
+ * @param session The session from which the QueueReceiverImpl is instantiated.
+ * @param queue The default queue for this QueueReceiverImpl.
+ * @param messageSelector the message selector for this QueueReceiverImpl.
+ * @throws JMSException If the QueueReceiverImpl cannot be created due to some internal error.
+ */
+ protected QueueReceiverImpl(SessionImpl session, Queue queue, String messageSelector) throws JMSException
+ {
+ super(session, (DestinationImpl) queue, messageSelector, false, null);
+ }
+
+ //--- Interface QueueReceiver
+ /**
+ * Get the Queue associated with this queue receiver.
+ *
+ * @return this receiver's Queue
+ * @throws JMSException If getting the queue for this queue receiver fails due to some internal error.
+ */
+ public Queue getQueue() throws JMSException
+ {
+ checkNotClosed();
+ return (QueueImpl) _destination;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueReceiverImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java?view=auto&rev=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java Tue Jul 31 04:43:58 2007
@@ -0,0 +1,131 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import javax.jms.QueueSender;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.Message;
+
+/**
+ * Implements javax.jms.QueueSender
+ */
+public class QueueSenderImpl extends MessageProducerImpl implements QueueSender
+{
+ //--- Constructor
+ /**
+ * Create a new QueueSenderImpl.
+ *
+ * @param session the session from which the QueueSenderImpl is instantiated
+ * @param queue the default queue for this QueueSenderImpl
+ * @throws JMSException If the QueueSenderImpl cannot be created due to some internal error.
+ */
+ protected QueueSenderImpl(SessionImpl session, QueueImpl queue) throws JMSException
+ {
+ super(session, queue);
+ }
+
+ //--- Interface javax.jms.QueueSender
+ /**
+ * Get the queue associated with this QueueSender.
+ *
+ * @return This QueueSender's queue
+ * @throws JMSException If getting the queue for this QueueSender fails due to some internal error.
+ */
+ public Queue getQueue() throws JMSException
+ {
+ return (Queue) getDestination();
+ }
+
+ /**
+ * Sends a message to the queue. Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+ * and time to live.
+ *
+ * @param message The message to send.
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If invoked on QueueSender that did not specify a queue at creation time.
+ */
+ public void send(Message message) throws JMSException
+ {
+ super.send(message);
+ }
+
+ /**
+ * Send a message to the queue, specifying delivery mode, priority, and time to live.
+ *
+ * @param message The message to send
+ * @param deliveryMode The delivery mode to use
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ *
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ * @throws java.lang.UnsupportedOperationException
+ * If invoked on QueueSender that did not specify a queue at creation time.
+ */
+ public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+
+ /**
+ * Send a message to a queue for an unidentified message producer.
+ * Uses the <CODE>QueueSender</CODE>'s default delivery mode, priority,
+ * and time to live.
+ *
+ * @param queue The queue to send this message to
+ * @param message The message to send
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ */
+ public void send(Queue queue, Message message) throws JMSException
+ {
+ super.send(queue, message);
+ }
+
+ /**
+ * Sends a message to a queue for an unidentified message producer,
+ * specifying delivery mode, priority and time to live.
+ *
+ * @param queue The queue to send this message to
+ * @param message The message to send
+ * @param deliveryMode The delivery mode to use
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException if sending the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If the queue is invalid.
+ */
+ public void send(Queue queue, Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ super.send(queue, message, deliveryMode, priority, timeToLive);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSenderImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/QueueSessionImpl.java Tue Jul 31 04:43:58 2007
@@ -17,11 +17,127 @@
*/
package org.apache.qpid.nclient.jms;
-import org.apache.qpid.nclient.jms.SessionImpl;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
/**
- *
+ * Implementation of javax.jms.QueueSession
*/
-public class QueueSessionImpl extends SessionImpl
+public class QueueSessionImpl extends SessionImpl implements QueueSession
{
+ //--- constructor
+ /**
+ * Create a JMS Session
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @param transacted Indicates if the session transacted.
+ * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+ * {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code>
+ * parameter is true.
+ * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
+ * @throws javax.jms.JMSException In case of internal error.
+ */
+ protected QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ super(connection, transacted, acknowledgeMode);
+ }
+
+ //-- Overwritten methods
+ /**
+ * Creates a durable subscriber to the specified topic,
+ *
+ * @param topic The non-temporary <CODE>Topic</CODE> to subscribe to.
+ * @param name The name used to identify this subscription.
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public TopicSubscriber createDurableSubscriber(Topic topic, String name) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createDurableSubscriber from QueueSession");
+ }
+
+ /**
+ * Create a TemporaryTopic.
+ *
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public TemporaryTopic createTemporaryTopic() throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createTemporaryTopic from QueueSession");
+ }
+
+ /**
+ * Creates a topic identity given a Topicname.
+ *
+ * @param topicName The name of this <CODE>Topic</CODE>
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public Topic createTopic(String topicName) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createTopic from QueueSession");
+ }
+
+ /**
+ * Unsubscribes a durable subscription that has been created by a client.
+ *
+ * @param name the name used to identify this subscription
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public void unsubscribe(String name) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke unsubscribe from QueueSession");
+ }
+
+ //--- Interface javax.jms.QueueSession
+ /**
+ * Create a QueueReceiver to receive messages from the specified queue.
+ *
+ * @param queue the <CODE>Queue</CODE> to access
+ * @return A QueueReceiver
+ * @throws JMSException If creating a receiver fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid queue is specified.
+ */
+ public QueueReceiver createReceiver(Queue queue) throws JMSException
+ {
+ return createReceiver(queue, null);
+ }
+
+ /**
+ * Create a QueueReceiver to receive messages from the specified queue for a given message selector.
+ *
+ * @param queue the Queue to access
+ * @param messageSelector A value of null or an empty string indicates that
+ * there is no message selector for the message consumer.
+ * @return A QueueReceiver
+ * @throws JMSException If creating a receiver fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid queue is specified.
+ * @throws InvalidSelectorException If the message selector is invalid.
+ */
+ public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException
+ {
+ checkNotClosed();
+ checkDestination(queue);
+ return new QueueReceiverImpl(this, queue, messageSelector);
+ }
+
+ /**
+ * Create a QueueSender object to send messages to the specified queue.
+ *
+ * @param queue the Queue to access, or null if this is an unidentified producer
+ * @return A QueueSender
+ * @throws JMSException If creating the sender fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid queue is specified.
+ */
+ public QueueSender createSender(Queue queue) throws JMSException
+ {
+ checkNotClosed();
+ // we do not check the destination since unidentified producers are allowed (no default destination).
+ return new QueueSenderImpl(this, (QueueImpl) queue);
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java Tue Jul 31 04:43:58 2007
@@ -87,8 +87,45 @@
* This session connection
*/
private ConnectionImpl _connection;
- //--- javax.jms.Session API
+ //--- Constructor
+ /**
+ * Create a JMS Session
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @param transacted Indicates if the session transacted.
+ * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+ * {@link Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter is true.
+ * @throws JMSSecurityException If the user could not be authenticated.
+ * @throws JMSException In case of internal error.
+ */
+ protected SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ _connection = connection;
+ _transacted = transacted;
+ // for transacted sessions we ignore the acknowledgeMode and use GenericAckMode.SESSION_TRANSACTED
+ if (_transacted)
+ {
+ acknowledgeMode = Session.SESSION_TRANSACTED;
+ }
+ _acknowledgeMode = acknowledgeMode;
+ try
+ {
+ // create the qpid session
+ _qpidSession = _connection.getQpidConnection().createSession(0);
+ // set transacted if required
+ if (_transacted)
+ {
+ _qpidSession.setTransacted();
+ }
+ }
+ catch (QpidException e)
+ {
+ throw ExceptionHelper.convertQpidExceptionToJMSException(e);
+ }
+ }
+
+ //--- javax.jms.Session API
/**
* Creates a <CODE>BytesMessage</CODE> object used to send a message
* containing a stream of uninterpreted bytes.
@@ -245,7 +282,7 @@
try
{
// Note: this operation makes sure that asynch message processing has returned
- _qpidSession.commit();
+ _qpidSession.txCommit();
}
catch (QpidException e)
{
@@ -272,7 +309,7 @@
try
{
// Note: this operation makes sure that asynch message processing has returned
- _qpidSession.rollback();
+ _qpidSession.txRollback();
}
catch (org.apache.qpidity.QpidException e)
{
@@ -482,7 +519,7 @@
}
/**
- * reates a topic identity given a Topicname.
+ * Creates a topic identity given a Topicname.
* <P>This facility is provided for the rare cases where clients need to
* dynamically manipulate queue identity. It allows the creation of a
* queue identity with a provider-specific name. Clients that depend
@@ -575,25 +612,23 @@
return new QueueBrowserImpl(this, queue, messageSelector);
}
- /**
- * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier.
- *
- * @return A temporary queue.
- *
- * @exception JMSException If creating the temporary queue fails due to some internal error.
- */
+ /**
+ * Create a TemporaryQueue. Its lifetime will be tha of the Connection unless it is deleted earlier.
+ *
+ * @return A temporary queue.
+ * @throws JMSException If creating the temporary queue fails due to some internal error.
+ */
public TemporaryQueue createTemporaryQueue() throws JMSException
{
return new TemporaryQueueImpl();
}
- /**
- * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier.
- *
- * @return A temporary topic.
- *
- * @exception JMSException If creating the temporary topic fails due to some internal error.
- */
+ /**
+ * Create a TemporaryTopic. Its lifetime will be tha of the Connection unless it is deleted earlier.
+ *
+ * @return A temporary topic.
+ * @throws JMSException If creating the temporary topic fails due to some internal error.
+ */
public TemporaryTopic createTemporaryTopic() throws JMSException
{
return new TemporaryTopicImpl();
@@ -665,6 +700,7 @@
/**
* Validate that the destination is valid i.e. it is not null
*
+ * @param dest The destination to be checked
* @throws InvalidDestinationException If the destination not valid.
*/
protected void checkDestination(Destination dest) throws InvalidDestinationException
@@ -727,8 +763,17 @@
//else there is no effect
}
- //------ Private Methods
+ /**
+ * Access to the underlying Qpid Session
+ *
+ * @return The associated Qpid Session.
+ */
+ protected org.apache.qpid.nclient.api.Session getQpidSession()
+ {
+ return _qpidSession;
+ }
+ //------ Private Methods
/**
* Close the producer and the consumers of this session
*
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java?view=auto&rev=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java Tue Jul 31 04:43:58 2007
@@ -0,0 +1,128 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient.jms;
+
+import javax.jms.*;
+
+/**
+ * Implements TopicPublisher
+ */
+public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher
+{
+ //--- Constructor
+ /**
+ * Create a TopicPublisherImpl.
+ *
+ * @param session The session from which the TopicPublisherImpl is instantiated
+ * @param topic The default topic for this TopicPublisherImpl
+ * @throws JMSException If the TopicPublisherImpl cannot be created due to some internal error.
+ */
+ protected TopicPublisherImpl(SessionImpl session, Topic topic) throws JMSException
+ {
+ super(session, (DestinationImpl) topic);
+ }
+
+ //--- Interface javax.jms.TopicPublisher
+ /**
+ * Get the topic associated with this TopicPublisher.
+ *
+ * @return This publisher's topic
+ * @throws JMSException If getting the topic fails due to some internal error.
+ */
+ public Topic getTopic() throws JMSException
+ {
+ return (Topic) getDestination();
+ }
+
+
+ /**
+ * Publish a message to the topic using the default delivery mode, priority and time to live.
+ *
+ * @param message The message to publish
+ * @throws JMSException If publishing the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If an invalid topic is specified.
+ * @throws java.lang.UnsupportedOperationException
+ * If that publisher topic was not specified at creation time.
+ */
+ public void publish(Message message) throws JMSException
+ {
+ super.send(message);
+ }
+
+ /**
+ * Publish a message to the topic, specifying delivery mode, priority and time to live.
+ *
+ * @param message The message to publish
+ * @param deliveryMode The delivery mode to use
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException If publishing the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If an invalid topic is specified.
+ * @throws java.lang.UnsupportedOperationException
+ * If that publisher topic was not specified at creation time.
+ */
+ public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException
+ {
+ super.send(message, deliveryMode, priority, timeToLive);
+ }
+
+
+ /**
+ * Publish a message to a topic for an unidentified message producer.
+ * Uses this TopicPublisher's default delivery mode, priority and time to live.
+ *
+ * @param topic The topic to publish this message to
+ * @param message The message to publish
+ * @throws JMSException If publishing the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If an invalid topic is specified.
+ */
+ public void publish(Topic topic, Message message) throws JMSException
+ {
+ super.send(topic, message);
+ }
+
+ /**
+ * Publishes a message to a topic for an unidentified message
+ * producer, specifying delivery mode, priority and time to live.
+ *
+ * @param topic The topic to publish this message to
+ * @param message The message to publish
+ * @param deliveryMode The delivery mode
+ * @param priority The priority for this message
+ * @param timeToLive The message's lifetime (in milliseconds)
+ * @throws JMSException If publishing the message fails due to some internal error.
+ * @throws javax.jms.MessageFormatException
+ * If an invalid message is specified.
+ * @throws javax.jms.InvalidDestinationException
+ * If an invalid topic is specified.
+ */
+ public void publish(Topic topic, Message message, int deliveryMode, int priority, long timeToLive) throws
+ JMSException
+ {
+ super.send(topic, message, deliveryMode, priority, timeToLive);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicPublisherImpl.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSessionImpl.java Tue Jul 31 04:43:58 2007
@@ -17,11 +17,128 @@
*/
package org.apache.qpid.nclient.jms;
-import org.apache.qpid.nclient.jms.SessionImpl;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
/**
- *
+ * Implements TopicSession
*/
-public class TopicSessionImpl extends SessionImpl
+public class TopicSessionImpl extends SessionImpl implements TopicSession
{
+ //-- constructor
+ /**
+ * Create a new TopicSessionImpl.
+ *
+ * @param connection The ConnectionImpl object from which the Session is created.
+ * @param transacted Specifiy whether this session is transacted?
+ * @param acknowledgeMode The session's acknowledgement mode. This value is ignored and set to
+ * {@link javax.jms.Session#SESSION_TRANSACTED} if the <code>transacted</code> parameter
+ * is true.
+ * @throws javax.jms.JMSSecurityException If the user could not be authenticated.
+ * @throws javax.jms.JMSException In case of internal error.
+ */
+ protected TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException
+ {
+ super(connection, transacted, acknowledgeMode);
+ }
+
+ //-- Overwritten methods
+ /**
+ * Create a QueueBrowser.
+ *
+ * @param queue The <CODE>Queue</CODE> to browse.
+ * @param messageSelector Only messages with properties matching the message selector expression are delivered.
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createBrowser from TopicSession");
+ }
+
+ /**
+ * Create a QueueBrowser.
+ *
+ * @param queue The <CODE>Queue</CODE> to browse.
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public QueueBrowser createBrowser(Queue queue) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createBrowser from TopicSession");
+ }
+
+ /**
+ * Creates a temporary queue.
+ *
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public TemporaryQueue createTemporaryQueue() throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createTemporaryQueue from TopicSession");
+ }
+
+ /**
+ * Creates a queue identity by a given name.
+ *
+ * @param queueName the name of this <CODE>Queue</CODE>
+ * @return Always throws an exception
+ * @throws IllegalStateException Always
+ */
+ @Override
+ public Queue createQueue(String queueName) throws JMSException
+ {
+ throw new IllegalStateException("Cannot invoke createQueue from TopicSession");
+ }
+
+ //--- Interface TopicSession
+ /**
+ * Create a publisher for the specified topic.
+ *
+ * @param topic the <CODE>Topic</CODE> to publish to, or null if this is an unidentified publisher.
+ * @throws JMSException If the creating a publisher fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid topic is specified.
+ */
+ public TopicPublisher createPublisher(Topic topic) throws JMSException
+ {
+
+ checkNotClosed();
+ // we do not check the destination topic here, since unidentified publishers are allowed.
+ return new TopicPublisherImpl(this, topic);
+ }
+
+ /**
+ * Creates a nondurable subscriber to the specified topic.
+ *
+ * @param topic The Topic to subscribe to
+ * @throws JMSException If creating a subscriber fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid topic is specified.
+ */
+ public TopicSubscriber createSubscriber(Topic topic) throws JMSException
+ {
+ return createSubscriber(topic, null, false);
+ }
+
+ /**
+ * Creates a nondurable subscriber to the specified topic, using a
+ * message selector or specifying whether messages published by its
+ * own connection should be delivered to it.
+ *
+ * @param topic The Topic to subscribe to
+ * @param messageSelector A value of null or an empty string indicates that there is no message selector.
+ * @param noLocal If true then inhibits the delivery of messages published by this subscriber's connection.
+ * @throws JMSException If creating a subscriber fails due to some internal error.
+ * @throws InvalidDestinationException If an invalid topic is specified.
+ * @throws InvalidSelectorException If the message selector is invalid.
+ */
+ public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException
+ {
+ checkNotClosed();
+ checkDestination(topic);
+ return new TopicSubscriberImpl(this, topic, messageSelector, noLocal, null);
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java?view=diff&rev=561299&r1=561298&r2=561299
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/TopicSubscriberImpl.java Tue Jul 31 04:43:58 2007
@@ -30,18 +30,18 @@
/**
* Create a new TopicSubscriberImpl.
*
- * @param session The session of this topic subscriber.
- * @param topic The default topic for this TopicSubscriberImpl
- * @param messageSelector The MessageSelector
- * @param noLocal If true inhibits the delivery of messages published by its own connection.
- * @param name Name of the subscription if this is to be created as a durable subscriber. If this value is null,
- * a non-durable subscription is created.
+ * @param session The session of this topic subscriber.
+ * @param topic The default topic for this TopicSubscriberImpl
+ * @param messageSelector The MessageSelector
+ * @param noLocal If true inhibits the delivery of messages published by its own connection.
+ * @param subscriptionName Name of the subscription if this is to be created as a durable subscriber.
+ * If this value is null, a non-durable subscription is created.
* @throws javax.jms.JMSException If the TopicSubscriberImpl cannot be created due to internal error.
*/
protected TopicSubscriberImpl(SessionImpl session, Topic topic, String messageSelector, boolean noLocal,
- String name) throws JMSException
+ String subscriptionName) throws JMSException
{
- super(session, (DestinationImpl) topic, messageSelector, noLocal, name);
+ super(session, (DestinationImpl) topic, messageSelector, noLocal, subscriptionName);
}
//--- javax.jms.TopicSubscriber interface