You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/08/03 13:34:04 UTC
svn commit: r562414 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient:
./ api/ impl/ jms/
Author: arnaudsimon
Date: Fri Aug 3 04:34:02 2007
New Revision: 562414
URL: http://svn.apache.org/viewvc?view=rev&rev=562414
Log:
Removed "api" from the package name
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Connection.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java (with props)
Removed:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/api/
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Connection.java?view=auto&rev=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Connection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Connection.java Fri Aug 3 04:34:02 2007
@@ -0,0 +1,78 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+
+import java.net.URL;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * This represents a physical connection to a broker.
+ */
+public interface Connection
+{
+ /**
+ * Establish the connection with the broker identified by the provided URL.
+ *
+ * @param url The URL of the broker.
+ * @throws QpidException If the communication layer fails to connect with the broker.
+ */
+ public void connect(URL url) throws QpidException;
+
+ /**
+ * Close this connection.
+ *
+ * @throws QpidException if the communication layer fails to close the connection.
+ */
+ public void close() throws QpidException;
+
+
+ /**
+ * Create a session for this connection.
+ * <p> The retuned session is suspended
+ * (i.e. this session is not attached with an underlying channel)
+ *
+ * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
+ * @return A Newly created (suspended) session.
+ * @throws QpidException If the connection fails to create a session due to some internal error.
+ */
+ public Session createSession(int expiryInSeconds) throws QpidException;
+
+ /**
+ * Create a DtxSession for this connection.
+ * <p> A Dtx Session must be used when resources have to be manipulated as
+ * part of a global transaction.
+ * <p> The retuned DtxSession is suspended
+ * (i.e. this session is not attached with an underlying channel)
+ *
+ * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
+ * @return A Newly created (suspended) DtxSession.
+ * @throws QpidException If the connection fails to create a DtxSession due to some internal error.
+ */
+ public DtxSession createDTXSession(int expiryInSeconds) throws QpidException;
+
+ /**
+ * If the communication layer detects a serious problem with a connection, it
+ * informs the connection's ExceptionListener
+ *
+ * @param exceptionListner The execptionListener
+ */
+ public void setExceptionListener(ExceptionListener exceptionListner);
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Connection.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java?view=auto&rev=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java Fri Aug 3 04:34:02 2007
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * This session�s resources are control under the scope of a distributed transaction.
+ */
+public interface DtxSession extends Session
+{
+
+ /**
+ * Get the XA resource associated with this session.
+ *
+ * @return this session XA resource.
+ * @throws QpidException If the session fails to retrieve its associated XA resource
+ * due to some error.
+ */
+ public javax.transaction.xa.XAResource getDTXResource() throws QpidException;
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/DtxSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java?view=auto&rev=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java Fri Aug 3 04:34:02 2007
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
+ * informs the connection's ExceptionListener
+ */
+public interface ExceptionListener
+{
+ /**
+ * If the communication layer detects a serious problem with a connection, it
+ * informs the connection's ExceptionListener
+ *
+ * @param exception The exception comming from the communication layer.
+ * @see Connection
+ */
+ public void onException(QpidException exception);
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/ExceptionListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java?view=auto&rev=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java Fri Aug 3 04:34:02 2007
@@ -0,0 +1,38 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import org.apache.qpidity.api.Message;
+
+/**
+ * MessageListeners are used to asynchronously receive messages.
+ */
+public interface MessageListener
+{
+ /**
+ * <p>Transfer a message to the listener.
+ * You will be notified when the whole message is received
+ * However, underneath the message might be streamed off disk
+ * or network buffers.
+ * </p>
+ *
+ * @param message The message delivered to the listner.
+ */
+ public void messageTransfer(Message message);
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/MessageListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java?view=auto&rev=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java Fri Aug 3 04:34:02 2007
@@ -0,0 +1,392 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import java.util.Map;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
+import org.apache.qpidity.Header;
+import org.apache.qpidity.Range;
+import org.apache.qpidity.api.Message;
+
+/**
+ * <p>A session is associated with a connection.
+ * When created a Session is not attached with an underlying channel.
+ * Session is single threaded </p>
+ */
+public interface Session
+{
+
+ //------------------------------------------------------
+ // Session housekeeping methods
+ //------------------------------------------------------
+ /**
+ * Close this session and any associated resources.
+ *
+ * @throws QpidException If the communication layer fails to close this session or if an internal error happens
+ * when closing this session resources. .
+ */
+ public void close() throws QpidException;
+
+ /**
+ * Suspend this session resulting in interrupting the traffic with the broker.
+ * <p> The session timer will start to tick in suspend.
+ * <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
+ *
+ * @throws QpidException If the communication layer fails to suspend this session
+ */
+ public void suspend() throws QpidException;
+
+ /**
+ * This will resume an existing session
+ * <p> Upon resume the session is attached with an underlying channel
+ * hence making operation on this session available.
+ *
+ * @throws QpidException If the communication layer fails to execute this properly
+ */
+ public void resume() throws QpidException;
+
+ //------------------------------------------------------
+ // Messaging methods
+ // Producer
+ //------------------------------------------------------
+ /**
+ * Transfer the given message to a specified exchange.
+ * <p> Following are the valid options for messageTransfer
+ * <ul>
+ * <li> CONFIRM
+ * <li> PRE_ACCQUIRE
+ * </ul>
+ * <p> In the absence of a particular option, the defaul value is:
+ * <ul>
+ * <li> CONFIRM = false
+ * <li> NO-ACCQUIRE
+ * </ul>
+ *
+ * @param exchange The exchange the message is being sent.
+ * @param msg The Message to be sent
+ * @param options A list of valid options
+ * @throws QpidException If the session fails to send the message due to some error
+ */
+ public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException;
+
+ /**
+ * Declare the beginning of a message transfer operation. This operation must
+ * be followed by {@link Session#addMessageHeaders} then followed by any number of {@link Session#addData}.
+ * The transfer is ended by endData.
+ * <p> This way of transferring messages is useful when streaming large messages
+ * <p> In the interval [messageTransfer endData] any attempt to call a method other than
+ * {@link Session#addMessageHeaders}, {@link Session#endData} ore {@link Session#close}
+ * will result in an exception being thrown.
+ * <p> Following are the valid options for messageTransfer
+ * <ul>
+ * <li> CONFIRM
+ * <li> PRE_ACCQUIRE
+ * </ul>
+ * <p> In the absence of a particular option, the defaul value is:
+ * <ul>
+ * <li> CONFIRM = false
+ * <li> NO-ACCQUIRE
+ * </ul>
+ *
+ * @param exchange The exchange the message is being sent.
+ * @param options Set of options.
+ * @throws QpidException If the session fails to send the message due to some error.
+ */
+ public void messageTransfer(String exchange, Option... options) throws QpidException;
+
+ /**
+ * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
+ * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being sent.
+ *
+ * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ * @throws QpidException If the session fails to execute the method due to some error
+ * @see org.apache.qpidity.DeliveryProperties
+ * @see org.apache.qpidity.ApplicationProperties
+ */
+ public void addMessageHeaders(Header... headers) throws QpidException;
+
+ /**
+ * Add the following byte array to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ * @param off Offset from which to start reading data
+ * @param len Number of bytes to be read
+ * @throws QpidException If the session fails to execute the method due to some error
+ */
+ public void addData(byte[] data, int off, int len) throws QpidException;
+
+ /**
+ * Signals the end of data for the message.
+ *
+ * @throws QpidException If the session fails to execute the method due to some error
+ */
+ public void endData() throws QpidException;
+
+ //------------------------------------------------------
+ // Messaging methods
+ // Consumer
+ //------------------------------------------------------
+
+ /**
+ * Associate a message listener with a destination.
+ * <p> The destination is bound to a queue and messages are filtered based
+ * on the provider filter map (message filtering is specific to the provider and may not be handled).
+ * <p/>
+ * <p> Following are the valid options
+ * <ul>
+ * <li> NO_LOCAL
+ * <li> EXCLUSIVE
+ * <li> NO_ACQUIRE
+ * <li> CONFIRM
+ * </ul>
+ * <p> In the absence of a particular option, defaul values are:
+ * <ul>
+ * <li> NO_LOCAL = false
+ * <li> EXCLUSIVE = false
+ * <li> PRE-ACCQUIRE
+ * <li> CONFIRM = false
+ * </ul>
+ *
+ * @param queue The queue this receiver is receiving messages from.
+ * @param destination The destination for the subscriber ,a.k.a the delivery tag.
+ * @param listener The listener for this destination. When big message are transfered then
+ * it is recommended to use a {@link StreamingMessageListener}.
+ * @param options Set of Options.
+ * @param filter The filters to apply to consumed messages.
+ * @throws QpidException If the session fails to create the receiver due to some error.
+ */
+ public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter,
+ Option... options) throws QpidException;
+
+ /**
+ * Cancels a subscription with a ginven destination.
+ *
+ * @param destination The destination for the subscriber used at subscription
+ * @throws QpidException If cancelling the subscription fails due to some error.
+ */
+ public void messageCancel(String destination) throws QpidException;
+
+ /**
+ * Associate a message listener with a destination.
+ * We currently allow one listerner per destination this means
+ * that the previous message listener is replaced. This is done gracefully i.e. the message
+ * listener is replaced once it return from the processing of a message.
+ *
+ * @param destination The destination the listener is associated with.
+ * @param listener The new listener for this destination. When big message are transfered then
+ * it is recommended to use a {@link StreamingMessageListener}.
+ */
+ public void setMessageListener(String destination, MessageListener listener);
+
+
+ /**
+ * Acknowledge the receipt of ranges of messages.
+ * <p>Message must have been previously acquired either by receiving them in
+ * pre-acquire mode or by explicitly acquiring them.
+ *
+ * @param range Range of acknowledged messages.
+ * @throws QpidException If the acknowledgement of the messages fails due to some error.
+ */
+ public void messageAcknowledge(Range... range) throws QpidException;
+
+ /**
+ * Reject ranges of acquired messages.
+ * <p> A rejected message will not be delivered to any receiver
+ * and may be either discarded or moved to the broker dead letter queue.
+ *
+ * @param range Range of rejected messages.
+ * @throws QpidException If those messages cannot be rejected dus to some error
+ */
+ public void messageReject(Range... range) throws QpidException;
+
+ /**
+ * Try to acquire ranges of messages hence releasing them form the queue.
+ * This means that once acknowledged, a message will not be delivered to any other receiver.
+ * <p> As those messages may have been consumed by another receivers hence,
+ * message acquisition can fail.
+ * The outcome of the acquisition is returned as an array of ranges of qcquired messages.
+ * <p> This method should only be called on non-acquired messages.
+ *
+ * @param range Ranges of messages to be acquired.
+ * @return Ranges of explicitly acquired messages.
+ * @throws QpidException If this message cannot be acquired dus to some error
+ */
+ public Range[] messageAcquire(Range... range) throws QpidException;
+
+ /**
+ * Give up responsibility for processing ranges of messages.
+ * <p> Released messages are re-enqueued.
+ *
+ * @param range Ranges of messages to be released.
+ * @throws QpidException If this message cannot be released dus to some error.
+ */
+ public void messageRelease(Range... range) throws QpidException;
+
+ // -----------------------------------------------
+ // Local transaction methods
+ // ----------------------------------------------
+ /**
+ * Selects the session for local transaction support.
+ *
+ * @throws QpidException If selecting this session for local transaction support fails due to some error.
+ */
+ public void txSelect() throws QpidException;
+
+ /**
+ * Commit the receipt and the delivery of all messages exchanged by this session resources.
+ *
+ * @throws QpidException If the session fails to commit due to some error.
+ * @throws IllegalStateException If this session is not transacted.
+ */
+ public void txCommit() throws QpidException, IllegalStateException;
+
+ /**
+ * Rollback the receipt and the delivery of all messages exchanged by this session resources.
+ *
+ * @throws QpidException If the session fails to rollback due to some error.
+ * @throws IllegalStateException If this session is not transacted.
+ */
+ public void txRollback() throws QpidException, IllegalStateException;
+
+ //---------------------------------------------
+ // Queue methods
+ //---------------------------------------------
+
+ /**
+ * Declare a queue with the given queueName
+ * <p> Following are the valid options for declareQueue
+ * <ul>
+ * <li> AUTO_DELETE
+ * <li> DURABLE
+ * <li> EXCLUSIVE
+ * <li> NO_WAIT
+ * <li> PASSIVE
+ * </ul>
+ * </p>
+ * <p/>
+ * <p>In the absence of a particular option, the defaul value is false for each option
+ *
+ * @param queueName The name of the delcared queue.
+ * @param alternateExchange Alternate excahnge.
+ * @param options Set of Options.
+ * @throws QpidException If the session fails to declare the queue due to some error.
+ * @see Option
+ */
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments,
+ Option... options) throws QpidException;
+
+ /**
+ * Bind a queue with an exchange.
+ *
+ * @param queueName The queue to be bound.
+ * @param exchangeName The exchange name.
+ * @param routingKey The routing key.
+ * @throws QpidException If the session fails to bind the queue due to some error.
+ */
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
+ QpidException;
+
+ /**
+ * Unbind a queue from an exchange.
+ *
+ * @param queueName The queue to be unbound.
+ * @param exchangeName The exchange name.
+ * @param routingKey The routing key.
+ * @throws QpidException If the session fails to unbind the queue due to some error.
+ */
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
+ QpidException;
+
+ /**
+ * Purge a queue. i.e. delete all enqueued messages
+ *
+ * @param queueName The queue to be purged
+ * @throws QpidException If the session fails to purge the queue due to some error.
+ */
+ public void queuePurge(String queueName) throws QpidException;
+
+ /**
+ * Delet a queue.
+ * <p> Following are the valid options for createReceive
+ * <ul>
+ * <li> IF_EMPTY
+ * <li> IF_UNUSE
+ * <li> NO_WAIT
+ * </ul>
+ * </p>
+ * <p/>
+ * <p>In the absence of a particular option, the defaul value is false for each option</p>
+ *
+ * @param queueName The name of the queue to be deleted
+ * @param options Set of options
+ * @throws QpidException If the session fails to delete the queue due to some error.
+ * @see Option
+ * <p/>
+ * Following are the valid options
+ */
+ public void queueDelete(String queueName, Option... options) throws QpidException;
+
+ // --------------------------------------
+ // exhcange methods
+ // --------------------------------------
+
+ /**
+ * Declare an exchange.
+ * <p> Following are the valid options for createReceive
+ * <ul>
+ * <li> AUTO_DELETE
+ * <li> DURABLE
+ * <li> INTERNAL
+ * <li> NO_WAIT
+ * <li> PASSIVE
+ * </ul>
+ * </p>
+ * <p/>
+ * <p>In the absence of a particular option, the defaul value is false for each option</p> *
+ *
+ * @param exchangeName The exchange name.
+ * @param exchangeClass The fully qualified name of the exchange class.
+ * @param options Set of options.
+ * @throws QpidException If the session fails to declare the exchange due to some error.
+ * @see Option
+ */
+ public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
+ Map<String, ?> arguments, Option... options) throws QpidException;
+
+ /**
+ * Delete an exchange.
+ * <p> Following are the valid options for createReceive
+ * <ul>
+ * <li> IF_UNUSEDL
+ * <li> NO_WAIT
+ * </ul>
+ * </p>
+ * <p/>
+ * <p>In the absence of a particular option, the defaul value is false for each option
+ * Immediately deleted even if it is used by another resources.</p>
+ *
+ * @param exchangeName The name of exchange to be deleted.
+ * @param options Set of options.
+ * @throws QpidException If the session fails to delete the exchange due to some error.
+ * @see Option
+ */
+ public void exchangeDelete(String exchangeName, Option... options) throws QpidException;
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/Session.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java?view=auto&rev=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java Fri Aug 3 04:34:02 2007
@@ -0,0 +1,53 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.nclient;
+
+import org.apache.qpidity.Header;
+
+/**
+ * <p>This message listener is useful if you need to
+ * know when each message part becomes available
+ * as opposed to knowing when the whole message arrives.</p>
+ * <p/>
+ * <p> The sequence of event for transferring a message is as follows:
+ * <ul>
+ * <li> n calls to addMessageHeaders (should be usually one or two)
+ * <li> n calls to addData
+ * <li> {@link org.apache.qpid.nclient.MessageListener#messageTransfer}(<code>null</code>).
+ * </ul>
+ * This is up to the implementation to assembled the message when the different parts
+ * are transferred.
+ */
+public interface StreamingMessageListener extends MessageListener
+{
+ /**
+ * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
+ * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
+ *
+ * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ */
+ public void addMessageHeaders(Header... headers);
+
+ /**
+ * Add the following byte array to the content of the message being received
+ *
+ * @param data Data to be added or streamed.
+ */
+ public void addData(byte[] data);
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/StreamingMessageListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java?view=diff&rev=562414&r1=562413&r2=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/ClientSession.java Fri Aug 3 04:34:02 2007
@@ -2,28 +2,149 @@
import java.util.Map;
-import org.apache.qpidity.api.StreamingMessageListener;
-import org.apache.qpid.nclient.api.MessageListener;
-import org.apache.qpidity.Option;
-import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Session;
+import org.apache.qpidity.api.Message;
+import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpidity.*;
-public class ClientSession extends Session implements org.apache.qpid.nclient.api.Session
+/**
+ * Implements a Qpid Sesion.
+ */
+public class ClientSession implements org.apache.qpid.nclient.Session
{
- public void setMessageListener(String destination,MessageListener listener)
+ //------------------------------------------------------
+ // Session housekeeping methods
+ //------------------------------------------------------
+ public void close() throws QpidException
{
- super.setMessageListener(destination, new StreamingListenerAdapter(listener));
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void messageSubscribe(String queue, String destination, Map<String, ?> filter, Option... _options) throws QpidException
+ public void suspend() throws QpidException
{
- // TODO
+ //To change body of implemented methods use File | Settings | File Templates.
}
- public void messageSubscribe(String queue, String destination, Map<String, ?> filter, StreamingMessageListener listener, Option... _options) throws QpidException
+ public void resume() throws QpidException
{
- // TODO
+ //To change body of implemented methods use File | Settings | File Templates.
+ }//------------------------------------------------------
+ // Messaging methods
+ // Producer
+ //------------------------------------------------------
+ public void messageTransfer(String exchange, Message msg, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageTransfer(String exchange, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addMessageHeaders(Header... headers) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void addData(byte[] data, int off, int len) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void endData() throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageSubscribe(String queue, String destination, MessageListener listener, Map<String, ?> filter,
+ Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageCancel(String destination) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void setMessageListener(String destination, MessageListener listener)
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageAcknowledge(Range... range) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void messageReject(Range... range) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public Range[] messageAcquire(Range... range) throws QpidException
+ {
+ return new Range[0]; //To change body of implemented methods use File | Settings | File Templates.
}
+ public void messageRelease(Range... range) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }// -----------------------------------------------
+ // Local transaction methods
+ // ----------------------------------------------
+ public void txSelect() throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void txCommit() throws QpidException, IllegalStateException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void txRollback() throws QpidException, IllegalStateException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments,
+ Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
+ QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments) throws
+ QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queuePurge(String queueName) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void queueDelete(String queueName, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void exchangeDeclare(String exchangeName, String exchangeClass, String alternateExchange,
+ Map<String, ?> arguments, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void exchangeDelete(String exchangeName, Option... options) throws QpidException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java?view=diff&rev=562414&r1=562413&r2=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/impl/StreamingListenerAdapter.java Fri Aug 3 04:34:02 2007
@@ -1,10 +1,10 @@
package org.apache.qpid.nclient.impl;
-import org.apache.qpid.nclient.api.MessageListener;
+import org.apache.qpid.nclient.MessageListener;
+import org.apache.qpid.nclient.StreamingMessageListener;
import org.apache.qpidity.Header;
import org.apache.qpidity.Option;
import org.apache.qpidity.api.Message;
-import org.apache.qpidity.api.StreamingMessageListener;
public class StreamingListenerAdapter implements StreamingMessageListener
{
@@ -16,23 +16,18 @@
_adaptee = l;
}
- public void data(byte[] src)
+ public void addData(byte[] src)
{
_currentMsg.appendData(src);
}
- public void endData()
- {
- _adaptee.onMessage(_currentMsg);
- }
-
- public void messageHeaders(Header... headers)
+ public void addMessageHeaders(Header... headers)
{
//_currentMsg add the headers
}
- public void messageTransfer(String destination, Option... options)
+ public void messageTransfer(Message message)
{
- // _currentMsg create message from factory
+ _adaptee.messageTransfer(_currentMsg);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java?view=diff&rev=562414&r1=562413&r2=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/ConnectionImpl.java Fri Aug 3 04:34:02 2007
@@ -25,6 +25,7 @@
import javax.jms.IllegalStateException;
import javax.jms.Session;
import javax.jms.ExceptionListener;
+import javax.jms.Connection;
import java.util.Vector;
@@ -85,7 +86,7 @@
/**
* The QpidConeection instance that is mapped with thie JMS connection
*/
- org.apache.qpid.nclient.api.Connection _qpidConnection;
+ org.apache.qpid.nclient.Connection _qpidConnection;
/**
* This is the exception listener for this qpid connection.
@@ -436,7 +437,7 @@
*
* @return This JMS connection underlying Qpid Connection.
*/
- protected org.apache.qpid.nclient.api.Connection getQpidConnection()
+ protected org.apache.qpid.nclient.Connection getQpidConnection()
{
return _qpidConnection;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java?view=diff&rev=562414&r1=562413&r2=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageActor.java Fri Aug 3 04:34:02 2007
@@ -60,7 +60,6 @@
protected MessageActor(SessionImpl session, DestinationImpl destination)
{
- // TODO create the qpidResource _qpidResource =
_session = session;
_destination = destination;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java?view=diff&rev=562414&r1=562413&r2=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageConsumerImpl.java Fri Aug 3 04:34:02 2007
@@ -19,6 +19,7 @@
//import org.apache.qpid.nclient.api.MessageReceiver;
import org.apache.qpidity.QpidException;
+import org.apache.qpidity.Option;
import javax.jms.JMSException;
import javax.jms.MessageConsumer;
@@ -30,10 +31,6 @@
*/
public class MessageConsumerImpl extends MessageActor implements MessageConsumer
{
- /**
- * The underlying qpid receiver
- */
- /* private MessageReceiver _qpidReceiver;*/
/**
* This MessageConsumer's messageselector.
@@ -55,7 +52,12 @@
/**
* A MessageListener set up for this consumer.
*/
- private MessageListener _messageListener = null;
+ private MessageListener _messageListener;
+
+ /**
+ * A warpper around the JSM message listener
+ */
+ private MessageListenerWrapper _messageListenerWrapper;
//----- Constructors
/**
@@ -79,7 +81,8 @@
/*try
{
// TODO define the relevant options
- _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), null);
+ _qpidReceiver = _session.getQpidSession().createReceiver(destination.getName(), Option.DURABLE);
+ _qpidResource = _qpidReceiver;
}
catch (QpidException e)
{
@@ -131,7 +134,17 @@
public void setMessageListener(MessageListener messageListener) throws JMSException
{
checkNotClosed();
- // TODO: create a message listener wrapper
+ _messageListener = messageListener;
+ if( messageListener == null )
+ {
+
+ _messageListenerWrapper = null;
+ }
+ else
+ {
+ _messageListenerWrapper = new MessageListenerWrapper(this);
+ //TODO _qpidReceiver.setAsynchronous(_messageListenerWrapper);
+ }
}
/**
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java?view=diff&rev=562414&r1=562413&r2=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/MessageListenerWrapper.java Fri Aug 3 04:34:02 2007
@@ -17,15 +17,13 @@
*/
package org.apache.qpid.nclient.jms;
-import org.apache.qpid.nclient.api.MessageListener;
+import org.apache.qpid.nclient.MessageListener;
import org.apache.qpid.nclient.jms.message.AbstractJMSMessage;
import org.apache.qpid.nclient.jms.message.QpidMessage;
import org.apache.qpidity.api.Message;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import javax.jms.JMSException;
-
/**
* This is a wrapper for the JMS message listener
*/
@@ -57,13 +55,13 @@
_consumer = consumer;
}
- //---- org.apache.qpid.nclient.api.MessageListener API
+ //---- org.apache.qpid.nclient.MessageListener API
/**
* Deliver a message to the listener.
*
* @param message The message delivered to the listner.
*/
- public void onMessage(Message message)
+ public void messageTransfer(Message message)
{
try
{
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java?view=diff&rev=562414&r1=562413&r2=562414
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/jms/SessionImpl.java Fri Aug 3 04:34:02 2007
@@ -21,7 +21,6 @@
import org.slf4j.LoggerFactory;
import org.apache.qpid.nclient.jms.message.*;
import org.apache.qpidity.QpidException;
-import org.apache.qpidity.Option;
import javax.jms.*;
import javax.jms.IllegalStateException;
@@ -72,7 +71,7 @@
/**
* The underlying QpidSession
*/
- private org.apache.qpid.nclient.api.Session _qpidSession;
+ private org.apache.qpid.nclient.Session _qpidSession;
/**
* Indicates whether this session is recovering
@@ -337,7 +336,7 @@
// close the underlaying QpidSession
try
{
- _qpidSession.sessionClose();
+ _qpidSession.close();
}
catch (org.apache.qpidity.QpidException e)
{
@@ -463,7 +462,6 @@
*/
public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
{
-
return createConsumer(destination, messageSelector, false);
}
@@ -664,11 +662,12 @@
/**
* Remove a message actor form this session
* <p> This method is called when an actor is independently closed.
+ *
* @param actor The closed actor.
*/
protected void closeMessageActor(MessageActor actor)
{
- _messageActors.remove(actor);
+ _messageActors.remove(actor);
}
/**
@@ -678,15 +677,7 @@
*/
protected void start() throws JMSException
{
- try
- {
- // TODO: make sure that the correct options are used
- _qpidSession.sessionFlow(Option.SUSPEND);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
+ // TODO: make sure that the correct options are used
}
/**
@@ -696,15 +687,7 @@
*/
protected void stop() throws JMSException
{
- try
- {
// TODO: make sure that the correct options are used
- _qpidSession.sessionFlow(Option.RESUME);
- }
- catch (QpidException e)
- {
- throw ExceptionHelper.convertQpidExceptionToJMSException(e);
- }
}
/**
@@ -818,7 +801,7 @@
*
* @return The associated Qpid Session.
*/
- protected org.apache.qpid.nclient.api.Session getQpidSession()
+ protected org.apache.qpid.nclient.Session getQpidSession()
{
return _qpidSession;
}