You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 13:36:26 UTC

svn commit: r577253 [1/7] - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity: nclient/ nclient/impl/ nclient/util/ njms/ njms/message/

Author: arnaudsimon
Date: Wed Sep 19 04:36:23 2007
New Revision: 577253

URL: http://svn.apache.org/viewvc?rev=577253&view=rev
Log:
renamed qpidity.jms to qpidity.njms and qpidity.client to qpidity.nclient

Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MessageFactory.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MessageHelper.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MessageImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/ObjectMessageImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/QpidMessage.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/StreamMessageImpl.java   (with props)
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/TextMessageImpl.java   (with props)

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,128 @@
+package org.apache.qpidity.nclient;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpidity.BrokerDetails;
+import org.apache.qpidity.ErrorCode;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.nclient.impl.ClientSession;
+import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
+import org.apache.qpidity.transport.Channel;
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionClose;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
+import org.apache.qpidity.url.QpidURL;
+
+
+public class Client implements org.apache.qpidity.nclient.Connection
+{
+    private AtomicInteger _channelNo = new AtomicInteger();
+    private Connection _conn;
+    private ExceptionListener _exceptionListner;
+    private final Lock _lock = new ReentrantLock();
+
+    /**
+     *
+     * @return returns a new connection to the broker.
+     */
+    public static org.apache.qpidity.nclient.Connection createConnection()
+    {
+        return new Client();
+    }
+
+    public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
+    {
+        Condition negotiationComplete = _lock.newCondition();
+        _lock.lock();
+
+        ConnectionDelegate connectionDelegate = new ConnectionDelegate()
+        {
+            public SessionDelegate getSessionDelegate()
+            {
+                return new ClientSessionDelegate();
+            }
+
+            @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
+            {
+                _exceptionListner.onException(
+                        new QpidException("Server closed the connection: Reason " +
+                                           connectionClose.getReplyText(),
+                                           ErrorCode.get(connectionClose.getReplyCode()),
+                                           null));
+            }
+        };
+
+        connectionDelegate.setCondition(_lock,negotiationComplete);
+        connectionDelegate.setUsername(username);
+        connectionDelegate.setPassword(password);
+        connectionDelegate.setVirtualHost(virtualHost);
+
+        _conn = MinaHandler.connect(host, port,connectionDelegate);
+
+        // XXX: hardcoded version numbers
+        _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+
+        try
+        {
+            negotiationComplete.await();
+        }
+        catch (Exception e)
+        {
+            //
+        }
+        finally
+        {
+            _lock.unlock();
+        }
+    }
+
+    /*
+     * Until the dust settles with the URL disucssion
+     * I am not going to implement this.
+     */
+    public void connect(QpidURL url) throws QpidException
+    {
+        // temp impl to tests
+        BrokerDetails details = url.getAllBrokerDetails().get(0);
+        connect(details.getHost(),
+                details.getPort(),
+                details.getVirtualHost(),
+                details.getUserName(),
+                details.getPassword());
+    }
+
+    public void close() throws QpidException
+    {
+        Channel ch = _conn.getChannel(0);
+        ch.connectionClose(0, "client is closing", 0, 0);
+        //need to close the connection underneath as well
+    }
+
+    public Session createSession(long expiryInSeconds)
+    {
+        Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+        ClientSession ssn = new ClientSession();
+        ssn.attach(ch);
+        ssn.sessionOpen(expiryInSeconds);
+        return ssn;
+    }
+
+    public DtxSession createDTXSession(int expiryInSeconds)
+    {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public void setExceptionListener(ExceptionListener exceptionListner)
+    {
+        _exceptionListner = exceptionListner;
+    }
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.url.QpidURL;
+
+/**
+ * This represents a physical connection to a broker.
+ */
+public interface Connection
+{
+   /**
+    * Establish the connection using the given parameters
+    * 
+    * @param host
+    * @param port
+    * @param username
+    * @param password
+    * @throws QpidException
+    */ 
+   public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
+    
+    /**
+     * Establish the connection with the broker identified by the provided URL.
+     *
+     * @param url The URL of the broker.
+     * @throws QpidException If the communication layer fails to connect with the broker.
+     */
+    public void connect(QpidURL url) throws QpidException;
+    
+    /**
+     * Close this connection.
+     *
+     * @throws QpidException if the communication layer fails to close the connection.
+     */
+    public void close() throws QpidException;
+
+
+    /**
+     * Create a session for this connection.
+     * <p> The retuned session is suspended
+     * (i.e. this session is not attached with an underlying channel)
+     *
+     * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
+     * @return A Newly created (suspended) session.
+     */
+    public Session createSession(long expiryInSeconds);
+
+    /**
+     * Create a DtxSession for this connection.
+     * <p> A Dtx Session must be used when resources have to be manipulated as
+     * part of a global transaction.
+     * <p> The retuned DtxSession is suspended
+     * (i.e. this session is not attached with an underlying channel)
+     *
+     * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
+     * @return A Newly created (suspended) DtxSession.
+     */
+    public DtxSession createDTXSession(int expiryInSeconds);
+
+    /**
+     * If the communication layer detects a serious problem with a connection, it
+     * informs the connection's ExceptionListener
+     *
+     * @param exceptionListner The execptionListener
+     */
+    
+    public void setExceptionListener(ExceptionListener exceptionListner);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.transport.DtxCoordinationCommitResult;
+import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult;
+import org.apache.qpidity.transport.DtxCoordinationPrepareResult;
+import org.apache.qpidity.transport.DtxCoordinationRecoverResult;
+import org.apache.qpidity.transport.DtxCoordinationRollbackResult;
+import org.apache.qpidity.transport.DtxDemarcationEndResult;
+import org.apache.qpidity.transport.DtxDemarcationStartResult;
+import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.Option;
+
+/**
+ * This session�s resources are control under the scope of a distributed transaction.
+ */
+public interface DtxSession extends Session
+{
+
+    /**
+     * This method is called when messages should be produced and consumed on behalf a transaction
+     * branch identified by xid.
+     * possible options are:
+     * <ul>
+     * <li> {@link Option#JOIN}:  Indicate that the start applies to joining a transaction previously seen.
+     * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified.
+     * </ul>
+     *
+     * @param xid     Specifies the xid of the transaction branch to be started.
+     * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
+     * @return Confirms to the client that the transaction branch is started or specify the error condition.
+     */
+    public Future<DtxDemarcationStartResult> dtxDemarcationStart(String xid, Option... options);
+
+    /**
+     * This method is called when the work done on behalf a transaction branch finishes or needs to
+     * be suspended.
+     * possible options are:
+     * <ul>
+     * <li> {@link Option#FAIL}: indicates that this portion of work has failed;
+     * otherwise this portion of work has
+     * completed successfully.
+     * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is
+     * temporarily suspended in an incomplete state.
+     * </ul>
+     *
+     * @param xid     Specifies the xid of the transaction branch to be ended.
+     * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
+     * @return Confirms to the client that the transaction branch is ended or specify the error condition.
+     */
+    public Future<DtxDemarcationEndResult> dtxDemarcationEnd(String xid, Option... options);
+
+    /**
+     * Commit the work done on behalf a transaction branch. This method commits the work associated
+     * with xid. Any produced messages are made available and any consumed messages are discarded.
+     * possible option is:
+     * <ul>
+     * <li> {@link Option#ONE_PHASE}: When set then one-phase commit optimization is used.
+     * </ul>
+     *
+     * @param xid     Specifies the xid of the transaction branch to be committed.
+     * @param options Available option is: {@link Option#ONE_PHASE}
+     * @return Confirms to the client that the transaction branch is committed or specify the error condition.
+     */
+    public Future<DtxCoordinationCommitResult> dtxCoordinationCommit(String xid, Option... options);
+
+    /**
+     * This method is called to forget about a heuristically completed transaction branch.
+     *
+     * @param xid Specifies the xid of the transaction branch to be forgotten.
+     */
+    public void dtxCoordinationForget(String xid);
+
+    /**
+     * This method obtains the current transaction timeout value in seconds. If set-timeout was not
+     * used prior to invoking this method, the return value is the default timeout; otherwise, the
+     * value used in the previous set-timeout call is returned.
+     *
+     * @param xid Specifies the xid of the transaction branch for getting the timeout.
+     * @return The current transaction timeout value in seconds.
+     */
+    public Future<DtxCoordinationGetTimeoutResult> dtxCoordinationGetTimeout(String xid);
+
+    /**
+     * This method prepares for commitment any message produced or consumed on behalf of xid.
+     *
+     * @param xid Specifies the xid of the transaction branch that can be prepared.
+     * @return The status of the prepare operation: can be one of those:
+     *         xa-ok: Normal execution.
+     *         <p/>
+     *         xa-rdonly: The transaction branch was read-only and has been committed.
+     *         <p/>
+     *         xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified
+     *         reason.
+     *         <p/>
+     *         xa-rbtimeout: The work represented by this transaction branch took too long.
+     */
+    public Future<DtxCoordinationPrepareResult> dtxCoordinationPrepare(String xid);
+
+    /**
+     * This method is called to obtain a list of transaction branches that are in a prepared or
+     * heuristically completed state.
+     * Todo The options ahould be removed once the xml is updated
+     * @return a array of xids to be recovered.
+     */
+    public Future<DtxCoordinationRecoverResult> dtxCoordinationRecover(Option... options);
+
+    /**
+     * This method rolls back the work associated with xid. Any produced messages are discarded and
+     * any consumed messages are re-enqueued.
+     *
+     * @param xid Specifies the xid of the transaction branch that can be rolled back.
+     * @return Confirms to the client that the transaction branch is rolled back or specify the error condition.
+     */
+    public Future<DtxCoordinationRollbackResult> dtxCoordinationRollback(String xid);
+
+    /**
+     * Sets the specified transaction branch timeout value in seconds.
+     *
+     * @param xid     Specifies the xid of the transaction branch for setting the timeout.
+     * @param timeout The transaction timeout value in seconds.
+     */
+    public void dtxCoordinationSetTimeout(String xid, long timeout);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
+ * informs the connection's ExceptionListener
+ */
+public interface ExceptionListener
+{
+    /**
+     * If the communication layer detects a serious problem with a connection, it
+     * informs the connection's ExceptionListener
+     *
+     * @param exception The exception comming from the communication layer.
+     * @see Connection
+     */
+    public void onException(QpidException exception);
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,34 @@
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.njms.ConnectionFactoryImpl;
+import org.apache.qpidity.njms.TopicImpl;
+
+public class JMSTestCase
+{
+    public static void main(String[] args)
+    {
+        try
+        {
+            javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection();
+            con.start();
+            
+            javax.jms.Session ssn = con.createSession(false, 1);
+            
+            javax.jms.Destination dest = new TopicImpl("myTopic");
+            javax.jms.MessageProducer prod = ssn.createProducer(dest);
+            javax.jms.MessageConsumer cons = ssn.createConsumer(dest); 
+            
+            javax.jms.BytesMessage msg = ssn.createBytesMessage();
+            msg.writeInt(123);
+            prod.send(msg);
+            
+            javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive();
+            System.out.println("Data : " + m.readInt());
+            
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,64 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.Header;
+
+/**
+ * Assembles message parts.
+ * <p> The sequence of event for transferring a message is as follows:
+ * <ul>
+ * <li> messageHeaders
+ * <li> n calls to addData
+ * <li> messageReceived
+ * </ul>
+ * This is up to the implementation to assembled the message when the different parts
+ * are transferred.
+ */
+public interface MessagePartListener
+{    
+    /**
+     * Indicates the Message transfer has started.
+     * 
+     * @param transferId
+     */
+    public void messageTransfer(long transferId);
+    
+    /**
+     * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
+     * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
+     *
+     * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+     */
+    public void messageHeader(Header header);
+
+    /**
+     * Add the following byte array to the content of the message being received
+     *
+     * @param data Data to be added or streamed.
+     */
+    public void data(ByteBuffer src);
+
+    /**
+     * Indicates that the message has been fully received. 
+     */
+    public void messageReceived();
+
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.qpidity.transport.*;
+import org.apache.qpidity.api.Message;
+
+/**
+ * <p>A session is associated with a connection.
+ * When created a Session is not attached with an underlying channel.
+ * Session is single threaded </p>
+ * <p/>
+ * All the Session commands are asynchronous, synchronous invocation is achieved through invoking the sync method.
+ * That is to say that <code>command1</code> will be synchronously invoked using the following sequence:
+ * <ul>
+ * <li> <code>session.command1()</code>
+ * <li> <code>session.sync()</code>
+ * </ul>
+ */
+public interface Session
+{
+    public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0;
+    public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1;
+    public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
+    public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
+    public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
+    public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
+    public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
+    public static final short MESSAGE_FLOW_UNIT_BYTE = 1;
+    public static final short MESSAGE_REJECT_CODE_GENERIC = 0;
+    public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1;
+    public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
+    public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1;
+
+    //------------------------------------------------------
+    //                 Session housekeeping methods
+    //------------------------------------------------------
+
+    /**
+     * Sync method will block until all outstanding commands
+     * are executed.
+     */
+    public void sync();
+
+    /**
+     * Close this session and any associated resources.
+     */
+    public void sessionClose();
+
+    /**
+     * Suspend this session resulting in interrupting the traffic with the broker.
+     * <p> The session timer will start to tick in suspend.
+     * <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
+     */
+    public void sessionSuspend();
+
+    //------------------------------------------------------ 
+    //                 Messaging methods
+    //                   Producer           
+    //------------------------------------------------------
+    /**
+     * Transfer the given message to a specified exchange.
+     * <p/>
+     * <p>This is a convinience method for providing a complete message
+     * using a single method which internaly is mapped to messageTransfer(), headers() followed
+     * by data() and an endData().
+     * <b><i>This method should only be used by small messages</b></i></p>
+     *
+     * @param destination The exchange the message is being sent.
+     * @param msg         The Message to be sent
+     * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
+     *                    is not required, once a message has been transferred in pre-acquire
+     *                    mode (or once acquire has been sent in no-acquire mode) the message is considered
+     *                    transferred
+     *                    <p/>
+     *                    <li> on  ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
+     *                    (whether acquisition was implicit as in pre-acquire mode or
+     *                    explicit as in no-acquire mode) is not considered transferred until the original
+     *                    transfer is complete (signaled via execution.complete)
+     *                    </ul>
+     * @param acquireMode <ul>
+     *                    <li> no-acquire  ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
+     *                    must be explicitly acquired
+     *                    <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
+     *                    acquired when the transfer starts
+     *                    </ul>
+     * @throws java.io.IOException If transferring a message fails due to some internal communication error.
+     */
+    public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)
+            throws IOException;
+
+    /**
+     * <p>This is a convinience method for streaming a message using pull semantics
+     * using a single method as opposed to doing a push using messageTransfer(), headers() followed
+     * by a series of data() and an endData().</p>
+     * <p>Internally data will be pulled from Message object(which wrap's a data stream) using it's read()
+     * and pushed using messageTransfer(), headers() followed by a series of data() and an endData().
+     * <br><b><i>This method should only be used by large messages</b></i><br>
+     * There are two convinience Message classes provided to facilitate this.
+     * <ul>
+     * <li> <code>{@link org.apache.qpidity.nclient.util.FileMessage}</code>
+     * <li> <code>{@link org.apache.qpidity.nclient.util.StreamingMessage}</code>
+     * </ul>
+     * You could also implement a the <code>Message</code> interface to and wrap any
+     * data stream you want.
+     * </p>
+     *
+     * @param destination The exchange the message is being sent.
+     * @param msg         The Message to be sent
+     * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
+     *                    is not required, once a message has been transferred in pre-acquire
+     *                    mode (or once acquire has been sent in no-acquire mode) the message is considered
+     *                    transferred
+     *                    <p/>
+     *                    <li> on  ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
+     *                    (whether acquisition was implicit as in pre-acquire mode or
+     *                    explicit as in no-acquire mode) is not considered transferred until the original
+     *                    transfer is complete (signaled via execution.complete)
+     *                    </ul>
+     * @param acquireMode <ul>
+     *                    <li> no-acquire  ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
+     *                    must be explicitly acquired
+     *                    <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message
+     *                    is acquired when the transfer starts
+     *                    </ul>
+     * @throws java.io.IOException If transferring a message fails due to some internal communication error.
+     */
+    public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException;
+
+    /**
+     * Declare the beginning of a message transfer operation. This operation must
+     * be followed by {@link Session#header} then followed by any number of {@link Session#data}.
+     * The transfer is ended by {@link Session#endData}.
+     * <p> This way of transferring messages is useful when streaming large messages
+     * <p> In the interval [messageTransfer endData] any attempt to call a method other than
+     * {@link Session#header}, {@link Session#endData} ore {@link Session#sessionClose}
+     * will result in an exception being thrown.
+     *
+     * @param destination The exchange the message is being sent.
+     * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
+     *                    is not required, once a message has been transferred in pre-acquire
+     *                    mode (or once acquire has been sent in no-acquire mode) the message is considered
+     *                    transferred
+     *                    <p/>
+     *                    <li> on  ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
+     *                    (whether acquisition was implicit as in pre-acquire mode or
+     *                    explicit as in no-acquire mode) is not considered transferred until the original
+     *                    transfer is complete (signaled via execution.complete)
+     *                    </ul>
+     * @param acquireMode <ul>
+     *                    <li> no-acquire  ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
+     *                    must be explicitly acquired
+     *                    <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message
+     *                    is acquired when the transfer starts
+     *                    </ul>
+     */
+    public void messageTransfer(String destination, short confirmMode, short acquireMode);
+
+    /**
+     * Add a set of headers the following headers to the message being sent.
+     *
+     * @param headers Are either <code>{@link org.apache.qpidity.transport.DeliveryProperties}</code>
+     *                or <code>{@link org.apache.qpidity.transport.MessageProperties}</code>
+     * @see org.apache.qpidity.transport.DeliveryProperties
+     * @see org.apache.qpidity.transport.MessageProperties
+     */
+    public void header(Struct... headers);
+
+    /**
+     * Add the following byte array to the content of the message being sent.
+     *
+     * @param data Data to be added.
+     */
+    public void data(byte[] data);
+
+    /**
+     * Add the following ByteBuffer to the content of the message being sent.
+     * <p> Note that only the data between the buffer current position and the
+     * buffer limit is added.
+     * It is therefore recommended to flip the buffer before adding it to the message,
+     *
+     * @param buf Data to be added.
+     */
+    public void data(ByteBuffer buf);
+
+    /**
+     * Add the following String to the content of the message being sent.
+     *
+     * @param str String to be added.
+     */
+    public void data(String str);
+
+    /**
+     * Signals the end of data for the message.
+     */
+    public void endData();
+
+    //------------------------------------------------------
+    //                 Messaging methods
+    //                   Consumer
+    //------------------------------------------------------
+
+    /**
+     * Associate a message listener with a destination.
+     * <p> The destination is bound to a queue and messages are filtered based
+     * on the provider filter map (message filtering is specific to the provider and may not be handled).
+     * <p> Following are valid options:
+     * <ul>
+     * <li>{@link Option#NO_LOCAL}: <p>If the no-local field is set the server will not send
+     * messages to the connection that
+     * published them.
+     * <li>{@link Option#EXCLUSIVE}: <p> Request exclusive subscription access, meaning only this
+     * ubscription can access the queue.
+     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+     * </ul>
+     *
+     * @param queue       The queue this receiver is receiving messages from.
+     * @param destination The destination for the subscriber ,a.k.a the delivery tag.
+     * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation is not
+     *                    required, once a message has been transferred in pre-acquire
+     *                    mode (or once acquire has been sent in no-acquire mode) the message is considered
+     *                    transferred
+     *                    <p/>
+     *                    <li> on  ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message (whether
+     *                    acquisition was implicit as in pre-acquire mode or
+     *                    explicit as in no-acquire mode) is not considered transferred until the original
+     *                    transfer is complete (signaled via execution.complete)
+     *                    </ul>
+     * @param acquireMode <ul>
+     *                    <li> no-acquire  ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must
+     *                    be explicitly acquired
+     *                    <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
+     *                    acquired when the transfer starts
+     *                    </ul>
+     * @param listener    The listener for this destination. When big message are transfered then
+     *                    it is recommended to use a {@link org.apache.qpidity.nclient.MessagePartListener}.
+     * @param options     Set of Options (valid options are {@link Option#NO_LOCAL}, {@link Option#EXCLUSIVE}
+     *                    and {@link Option#NO_OPTION})
+     * @param filter      A set of filters for the subscription. The syntax and semantics of these filters depends
+     *                    on the providers implementation.
+     */
+    public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
+                                 MessagePartListener listener, Map<String, ?> filter, Option... options);
+
+    /**
+     * This method cancels a consumer. This does not affect already delivered messages, but it does
+     * mean the server will not send any more messages for that destination. The client may receive an
+     * arbitrary number of messages in between sending the cancel method and receiving the
+     * notification of completion of the cancel command.
+     *
+     * @param destination The destination for the subscriber used at subscription
+     */
+    public void messageCancel(String destination);
+
+    /**
+     * Associate a message part listener with a destination.
+     * <p> Only one listerner per destination is allowed. This means
+     * that the previous message listener is replaced. This is done gracefully i.e. the message
+     * listener is replaced once it return from the processing of a message.
+     *
+     * @param destination The destination the listener is associated with.
+     * @param listener    The new listener for this destination.
+     */
+    public void setMessageListener(String destination, MessagePartListener listener);
+
+    /**
+     * Sets the mode of flow control used for a given destination.
+     * <p> With credit based flow control, the broker continually maintains its current
+     * credit balance with the recipient. The credit balance consists of two values, a message
+     * count, and a byte count. Whenever message data is sent, both counts must be decremented.
+     * If either value reaches zero, the flow of message data must stop. Additional credit is
+     * received via the {@link Session#messageFlow} method.
+     * <p> Window based flow control is identical to credit based flow control, however message
+     * acknowledgment implicitly grants a single unit of message credit, and the size of the
+     * message in byte credits for each acknowledged message.
+     *
+     * @param destination The destination to set the flow mode on.
+     * @param mode        <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
+     *                    <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
+     */
+    public void messageFlowMode(String destination, short mode);
+
+
+    /**
+     * This method controls the flow of message data to a given destination. It is used by the
+     * recipient of messages to dynamically match the incoming rate of message flow to its
+     * processing or forwarding capacity. Upon receipt of this method, the sender must add "value"
+     * number of the specified unit to the available credit balance for the specified destination.
+     * A value of 0 indicates an infinite amount of credit. This disables any limit for
+     * the given unit until the credit balance is zeroed with {@link Session#messageStop}
+     * or {@link Session#messageFlush}.
+     *
+     * @param destination The destination to set the flow.
+     * @param unit        Specifies the unit of credit balance.
+     *                    <p/>
+     *                    One of: <ul>
+     *                    <li> message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE})
+     *                    <li> byte    ({@link Session#MESSAGE_FLOW_UNIT_BYTE})
+     *                    </ul>
+     * @param value       Number of credits, a value of 0 indicates an infinite amount of credit.
+     */
+    public void messageFlow(String destination, short unit, long value);
+
+    /**
+     * Forces the broker to exhaust its credit supply.
+     * <p> The broker's credit will always be zero when
+     * this method completes.
+     *
+     * @param destination The destination to call flush on.
+     */
+    public void messageFlush(String destination);
+
+    /**
+     * On receipt of this method, the brokers MUST set his credit to zero for the given
+     * destination. This obeys the generic semantics of command completion, i.e. when confirmation
+     * is issued credit MUST be zero and no further messages will be sent until such a time as
+     * further credit is received.
+     *
+     * @param destination The destination to stop.
+     */
+    public void messageStop(String destination);
+
+    /**
+     * Acknowledge the receipt of ranges of messages.
+     * <p>Message must have been previously acquired either by receiving them in
+     * pre-acquire mode or by explicitly acquiring them.
+     *
+     * @param ranges Range of acknowledged messages.
+     */
+    public void messageAcknowledge(RangeSet ranges);
+
+    /**
+     * Reject ranges of acquired messages.
+     * <p>  The broker MUST deliver rejected messages to the
+     * alternate-exchange on the queue from which it was delivered. If no alternate-exchange is
+     * defined for that queue the broker MAY discard the message.
+     *
+     * @param ranges Range of rejected messages.
+     * @param code   The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or
+     *               {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but
+     *               failed).
+     * @param text   String describing the reason for a message transfer rejection.
+     */
+    public void messageReject(RangeSet ranges, int code, String text);
+
+    /**
+     * As it is possible that the broker does not manage to reject some messages, after completion of
+     * {@link Session#messageReject} this method will return the ranges of rejected messages.
+     * <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the
+     * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}.
+     * <p> A recommended invocation sequence would be:
+     * <ul>
+     * <li> {@link Session#messageReject}
+     * <li> {@link Session#sync()}
+     * <li> {@link Session#getRejectedMessages()}
+     * </ul>
+     *
+     * @return The rejected message ranges
+     */
+    public RangeSet getRejectedMessages();
+
+    /**
+     * Try to acquire ranges of messages hence releasing them form the queue.
+     * This means that once acknowledged, a message will not be delivered to any other receiver.
+     * <p> As those messages may have been consumed by another receivers hence,
+     * message acquisition can fail.
+     * The outcome of the acquisition is returned as an array of ranges of qcquired messages.
+     * <p> This method should only be called on non-acquired messages.
+     *
+     * @param mode   One of: <ul>
+     *               <li> any ({@link Session#MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE}): acquire any available
+     *               messages for consumption
+     *               <li> all ({@link Session#MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE}): only acquire messages
+     *               if all are available for consumption
+     *               </ul>
+     * @param ranges Ranges of messages to be acquired.
+     */
+    public void messageAcquire(RangeSet ranges, short mode);
+
+    /**
+     * As it is possible that the broker does not manage to acquire some messages, after completion of
+     * {@link Session#messageAcquire} this method will return the ranges of acquired messages.
+     * <p> Note that {@link Session#messageAcquire} and this methods are asynchronous therefore for accessing to the
+     * previously acquired messages this method must be invoked in conjunction with {@link Session#sync()}.
+     * <p> A recommended invocation sequence would be:
+     * <ul>
+     * <li> {@link Session#messageAcquire}
+     * <li> {@link Session#sync()}
+     * <li> {@link Session#getAccquiredMessages()}
+     * </ul>
+     *
+     * @return returns the message ranges marked by the broker as acquired.
+     */
+    public RangeSet getAccquiredMessages();
+
+    /**
+     * Give up responsibility for processing ranges of messages.
+     * <p> Released messages are re-enqueued.
+     *
+     * @param ranges Ranges of messages to be released.
+     */
+    public void messageRelease(RangeSet ranges);
+
+    // -----------------------------------------------
+    //            Local transaction methods
+    //  ----------------------------------------------
+    /**
+     * Selects the session for local transaction support.
+     */
+    public void txSelect();
+
+    /**
+     * Commit the receipt and the delivery of all messages exchanged by this session resources.
+     *
+     * @throws IllegalStateException If this session is not transacted.
+     */
+    public void txCommit() throws IllegalStateException;
+
+    /**
+     * Rollback the receipt and the delivery of all messages exchanged by this session resources.
+     *
+     * @throws IllegalStateException If this session is not transacted.
+     */
+    public void txRollback() throws IllegalStateException;
+
+    //---------------------------------------------
+    //            Queue methods 
+    //---------------------------------------------
+
+    /**
+     * Declare a queue with the given queueName
+     * <p> Following are the valid options:
+     * <ul>
+     * <li> {@link Option#AUTO_DELETE}: <p> If this field is set and the exclusive field is also set,
+     * then the queue is deleted when the connection closes.
+     * If this field is set and the exclusive field is not set the queue is deleted when all
+     * the consumers have finished using it.
+     * <li> {@link Option#DURABLE}: <p>  If set when creating a new queue,
+     * the queue will be marked as durable. Durable queues
+     * remain active when a server restarts. Non-durable queues (transient queues) are purged
+     * if/when a server restarts. Note that durable queues do not necessarily hold persistent
+     * messages, although it does not make sense to send persistent messages to a transient
+     * queue.
+     * <li> {@link Option#EXCLUSIVE}: <p>  Exclusive queues can only be used from one connection at a time.
+     * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the
+     * declaring connection closes.
+     * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
+     * This field allows the client to assert the presence of a queue without modifying the server state.
+     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+     * </ul>
+     * <p>In the absence of a particular option, the defaul value is false for each option
+     *
+     * @param queueName         The name of the delcared queue.
+     * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
+     *                          may be rejected by a queue for the following reasons:
+     *                          <oL> <li> The queue is deleted when it is not empty;
+     *                          <li> Immediate delivery of a message is requested, but there are no consumers connected to
+     *                          the queue. </ol>
+     * @param arguments         Used for backward compatibility
+     * @param options           Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
+     *                          {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and  {@link Option#NO_OPTION})
+     * @see Option
+     */
+    public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options);
+
+    /**
+     * Bind a queue with an exchange.
+     *
+     * @param queueName    Specifies the name of the queue to bind. If the queue name is empty, refers to the current
+     *                     queue for the session, which is the last declared queue.
+     * @param exchangeName The exchange name.
+     * @param routingKey   Specifies the routing key for the binding. The routing key is used for routing messages
+     *                     depending on the exchange configuration. Not all exchanges use a routing key - refer to
+     *                     the specific exchange documentation. If the queue name is empty, the server uses the last
+     *                     queue declared on the session. If the routing key is also empty, the server uses this
+     *                     queue name for the routing key as well. If the queue name is provided but the routing key
+     *                     is empty, the server does the binding with that empty routing key. The meaning of empty
+     *                     routing keys depends on the exchange implementation.
+     * @param arguments    Used for backward compatibility
+     */
+    public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+
+    /**
+     * Unbind a queue from an exchange.
+     *
+     * @param queueName    Specifies the name of the queue to unbind.
+     * @param exchangeName The name of the exchange to unbind from.
+     * @param routingKey   Specifies the routing key of the binding to unbind.
+     * @param arguments    Used for backward compatibility
+     */
+    public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+
+    /**
+     * This method removes all messages from a queue. It does not cancel consumers. Purged messages
+     * are deleted without any formal "undo" mechanism.
+     *
+     * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the
+     *                  current queue for the session, which is the last declared queue.
+     */
+    public void queuePurge(String queueName);
+
+    /**
+     * This method deletes a queue. When a queue is deleted any pending messages are sent to a
+     * dead-letter queue if this is defined in the server configuration, and all consumers on the
+     * queue are cancelled.
+     * <p> Following are the valid options:
+     * <ul>
+     * <li> {@link Option#IF_EMPTY}: <p>  If set, the server will only delete the queue if it has no messages.
+     * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
+     * If the queue has consumers the server does does not delete it but raises a channel exception instead.
+     * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+     * </ul>
+     * </p>
+     * <p/>
+     * <p>In the absence of a particular option, the defaul value is false for each option</p>
+     *
+     * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the
+     *                  current queue for the session, which is the last declared queue.
+     * @param options   Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED}
+     *                  and {@link Option#NO_OPTION})
+     * @see Option
+     */
+    public void queueDelete(String queueName, Option... options);
+
+    // --------------------------------------
+    //              exhcange methods 
+    // --------------------------------------
+
+    /**
+     * This method creates an exchange if it does not already exist, and if the exchange exists,
+     * verifies that it is of the correct and expected class.
+     * <p> Following are the valid options:
+     * <ul>
+     * <li> {@link Option#AUTO_DELETE}: <p> If set, the exchange is deleted when all queues have finished using it.
+     * <li> {@link Option#DURABLE}: <p> If set when creating a new exchange, the exchange will
+     * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient
+     * exchanges) are purged if/when a server restarts.
+     * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the exchange.
+     * The client can use this to check whether an exchange exists without modifying the server state.
+     * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+     * </ul>
+     * <p>In the absence of a particular option, the defaul value is false for each option</p>
+     *
+     * @param exchangeName      The exchange name.
+     * @param type              Each exchange belongs to one of a set of exchange types implemented by the server. The
+     *                          exchange types define the functionality of the exchange - i.e. how messages are routed
+     *                          through it. It is not valid or meaningful to attempt to change the type of an existing
+     *                          exchange. Default exchange types are: direct, topic, headers and fanout.
+     * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
+     *                          the message will be sent.
+     * @param options           Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
+     *                          {@link Option#PASSIVE}, {@link Option#NO_OPTION})
+     * @param arguments         Used for backward compatibility
+     * @see Option
+     */
+    public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, ?> arguments,
+                                Option... options);
+
+    /**
+     * This method deletes an exchange. When an exchange is deleted all queue bindings on the
+     * exchange are cancelled.
+     * <p> Following are the valid options:
+     * <ul>
+     * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
+     * exchange has queue bindings the server does not delete it but raises a channel exception
+     * instead.
+     * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+     * </ul>
+     * <p>In the absence of a particular option, the defaul value is false for each option
+     *
+     * @param exchangeName The name of exchange to be deleted.
+     * @param options      Set of options (valid options are:  {@link Option#IF_UNUSED}, {@link Option#NO_OPTION})
+     * @see Option
+     */
+    public void exchangeDelete(String exchangeName, Option... options);
+
+
+    /**
+     * This method is used to request information on a particular exchange.
+     *
+     * @param exchangeName The name of the exchange for which information is requested. If not specified explicitly
+     *                     the default exchange is implied.
+     * @result Information on the specified exchange.
+     */
+    public Future<ExchangeQueryResult> exchangeQuery(String exchangeName);
+
+    /**
+     * If the session receives a sessionClosed with an error code it
+     * informs the session's ExceptionListener
+     *
+     * @param exceptionListner The execptionListener
+     */
+    public void setExceptionListener(ExceptionListener exceptionListner);
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,119 @@
+package org.apache.qpidity.nclient.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.MessagePartListener;
+
+/**
+ * Implements a Qpid Sesion. 
+ */
+public class ClientSession extends org.apache.qpidity.transport.Session implements  org.apache.qpidity.nclient.DtxSession
+{
+    private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
+    private ExceptionListener _exceptionListner;
+    private RangeSet _acquiredMessages;
+    private RangeSet _rejectedMessages;
+        
+    public void messageAcknowledge(RangeSet ranges)
+    {
+        for (Range range : ranges)
+        {
+            for (long l = range.getLower(); l <= range.getUpper(); l++)
+            {
+                System.out.println("Acknowleding transfer id : " + l);
+                super.processed(l);
+            }
+        }
+    }
+
+    public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options)
+    {
+        setMessageListener(destination,listener);
+        super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);
+    }
+
+    public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
+    {
+        // The javadoc clearly says that this method is suitable for small messages
+        // therefore reading the content in one shot.
+        super.messageTransfer(destination, confirmMode, acquireMode);
+        super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
+        super.data(msg.readData());
+        super.endData();        
+    }
+    
+    public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
+    {
+        super.messageTransfer(destination, confirmMode, acquireMode);
+        super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
+        boolean b = true;
+        int count = 0;
+        while(b)
+        {   
+            try
+            {
+                System.out.println("count : " + count++);
+                super.data(msg.readData());
+            }
+            catch(EOFException e)
+            {
+                b = false;
+            }
+        }   
+        
+        super.endData();
+    }
+    
+    public RangeSet getAccquiredMessages()
+    {
+        return _acquiredMessages;
+    }
+
+    public RangeSet getRejectedMessages()
+    {
+        return _rejectedMessages;
+    }
+    
+    public void setMessageListener(String destination, MessagePartListener listener)
+    {
+        if (listener == null)
+        {
+            throw new IllegalArgumentException("Cannot set message listener to null");
+        }
+        _messageListeners.put(destination, listener);       
+    }
+    
+    public void setExceptionListener(ExceptionListener exceptionListner)
+    {
+        _exceptionListner = exceptionListner;        
+    }   
+    
+    void setAccquiredMessages(RangeSet acquiredMessages)
+    {
+        _acquiredMessages = acquiredMessages;
+    }
+    
+    void setRejectedMessages(RangeSet rejectedMessages)
+    {
+        _rejectedMessages = rejectedMessages;
+    }
+    
+    void notifyException(QpidException ex)
+    {
+        _exceptionListner.onException(ex);
+    }
+    
+    Map<String,MessagePartListener> getMessageListerners()
+    {
+        return _messageListeners;
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,80 @@
+package org.apache.qpidity.nclient.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.ErrorCode;
+
+import org.apache.qpidity.nclient.MessagePartListener;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.MessageAcquired;
+import org.apache.qpidity.transport.MessageReject;
+import org.apache.qpidity.transport.MessageTransfer;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.Session;
+import org.apache.qpidity.transport.SessionClosed;
+import org.apache.qpidity.transport.SessionDelegate;
+
+
+public class ClientSessionDelegate extends SessionDelegate
+{    
+    private MessageTransfer _currentTransfer;
+    private MessagePartListener _currentMessageListener;
+    
+    @Override public void sessionClosed(Session ssn,SessionClosed sessionClosed)
+    {
+        ((ClientSession)ssn).notifyException(new QpidException(sessionClosed.getReplyText(),ErrorCode.get(sessionClosed.getReplyCode()),null));
+    }
+    
+    //  --------------------------------------------
+    //   Message methods
+    // --------------------------------------------
+    @Override public void data(Session ssn, Data data)
+    {
+        for (ByteBuffer b : data.getFragments())
+        {    
+            _currentMessageListener.data(b);
+        }
+        if (data.isLast())
+        {
+            _currentMessageListener.messageReceived();
+        }
+        
+    }
+
+    @Override public void header(Session ssn, Header header)
+    {
+        _currentMessageListener.messageHeader(header);
+    }
+
+
+    @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
+    {
+        _currentTransfer = currentTransfer;
+        _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+        _currentMessageListener.messageTransfer(currentTransfer.getId());
+    }
+    
+    @Override public void messageReject(Session session, MessageReject struct) 
+    {
+        for (Range range : struct.getTransfers())
+        {
+            for (long l = range.getLower(); l <= range.getUpper(); l++)
+            {
+                System.out.println("message rejected: " +
+                        session.getCommand((int) l));
+            }
+        }
+        ((ClientSession)session).setRejectedMessages(struct.getTransfers());
+        ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
+        session.processed(struct);
+    }
+    
+    @Override public void messageAcquired(Session session, MessageAcquired struct) 
+    {
+        ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+        session.processed(struct);
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,89 @@
+package org.apache.qpidity.nclient.impl;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+
+public class DemoClient
+{
+    public static MessagePartListenerAdapter createAdapter()
+    {
+        return new MessagePartListenerAdapter(new MessageListener()
+        {
+            public void onMessage(Message m)
+            {
+                System.out.println("\n================== Received Msg ==================");
+                System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+                System.out.println(m.toString());
+                System.out.println("================== End Msg ==================\n");
+            }
+            
+        });
+    }
+
+    public static final void main(String[] args)
+    {
+        Connection conn = Client.createConnection();
+        try{
+            conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+        }catch(Exception e){
+            e.printStackTrace();
+        }
+        
+        Session ssn = conn.createSession(50000);
+        ssn.setExceptionListener(new ExceptionListener()
+                {
+                     public void onException(QpidException e)
+                     {
+                         System.out.println(e);
+                     }
+                });
+        ssn.queueDeclare("queue1", null, null);
+        ssn.queueBind("queue1", "amq.direct", "queue1",null);
+        ssn.sync();
+        
+        ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+        // queue
+        ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+        ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+        ssn.data("this is the data");
+        ssn.endData();
+
+        //reject
+        ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+        ssn.data("this should be rejected");
+        ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
+        ssn.endData();
+        ssn.sync();
+        
+        // topic subs
+        ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
+        ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
+        ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
+        ssn.sync();
+        
+        ssn.queueDeclare("topic1", null, null);
+        ssn.queueBind("topic1", "amq.topic", "stock.*",null);        
+        ssn.queueDeclare("topic2", null, null);
+        ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+        ssn.queueDeclare("topic3", null, null);
+        ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+        ssn.sync();
+        
+        // topic
+        ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+        ssn.data("Topic message");
+        ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+        ssn.endData();
+        ssn.sync();
+    }
+    
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,74 @@
+package org.apache.qpidity.nclient.impl;
+
+import java.io.FileInputStream;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.FileMessage;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+
+public class LargeMsgDemoClient
+{
+    public static MessagePartListenerAdapter createAdapter()
+    {
+        return new MessagePartListenerAdapter(new MessageListener()
+        {
+            public void onMessage(Message m)
+            {
+                System.out.println("\n================== Received Msg ==================");
+                System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+                System.out.println(m.toString());
+                System.out.println("================== End Msg ==================\n");
+            }
+            
+        });
+    }
+
+    public static final void main(String[] args)
+    {
+        Connection conn = Client.createConnection();
+        try{
+            conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+        }catch(Exception e){
+            e.printStackTrace();
+        }
+        
+        Session ssn = conn.createSession(50000);
+        ssn.setExceptionListener(new ExceptionListener()
+                {
+                     public void onException(QpidException e)
+                     {
+                         System.out.println(e);
+                     }
+                });
+        ssn.queueDeclare("queue1", null, null);
+        ssn.queueBind("queue1", "amq.direct", "queue1",null);
+        ssn.sync();
+        
+        ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+        try
+        {
+           FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
+                                             1024,
+                                             new DeliveryProperties().setRoutingKey("queue1"),
+                                             new MessageProperties().setMessageId("123"));
+        
+           // queue
+           ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
+           ssn.sync();
+        }
+        catch(Exception e)
+        {
+            e.printStackTrace();
+        }
+    }
+    
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,150 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * <p>A Simple implementation of the message interface
+ * for small messages. When the readData methods are called
+ * we assume the message is complete. i.e there want be any
+ * appendData operations after that.</p>
+ * 
+ * <p>If you need large message support please see 
+ * <code>FileMessage</code> and <code>StreamingMessage</code>
+ * </p>
+ */
+public class ByteBufferMessage implements Message
+{
+    private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+    private ByteBuffer _readBuffer;
+    private int _dataSize; 
+    private DeliveryProperties _currentDeliveryProps;
+    private MessageProperties _currentMessageProps;
+    private long _transferId;
+
+    public ByteBufferMessage()
+    {
+        _currentDeliveryProps = new DeliveryProperties();
+        _currentMessageProps = new MessageProperties();
+    }
+
+    public ByteBufferMessage(long transferId)
+    {
+        _transferId = transferId;
+    }    
+    
+    public long getMessageTransferId()
+    {
+        return _transferId;
+    }
+    
+    public void clearData()
+    {
+        _data = new LinkedList<ByteBuffer>();
+        _readBuffer = null;
+    }
+        
+    public void appendData(byte[] src) throws IOException
+    {
+        appendData(ByteBuffer.wrap(src));
+    }
+
+    /**
+     * write the data from the current position up to the buffer limit
+     */
+    public void appendData(ByteBuffer src) throws IOException
+    {
+        _data.offer(src);
+        _dataSize += src.remaining();        
+    }
+    
+    public DeliveryProperties getDeliveryProperties()
+    {
+        return _currentDeliveryProps;
+    }
+
+    public MessageProperties getMessageProperties()
+    {
+        System.out.println("MessageProperties is null ? " + _currentMessageProps == null? "true":"false");
+        return _currentMessageProps;
+    }
+    
+    public void setDeliveryProperties(DeliveryProperties props)
+    {
+        _currentDeliveryProps = props;
+    }
+
+    public void setMessageProperties(MessageProperties props)
+    {
+        _currentMessageProps = props;
+    }
+    
+    public void readData(byte[] target) throws IOException
+    {
+        getReadBuffer().get(target);
+    }
+    
+    public ByteBuffer readData() throws IOException
+    {   
+        return getReadBuffer();
+    }
+    
+    private void buildReadBuffer()
+    {
+        //optimize for the simple cases
+        if(_data.size() == 1)
+        {
+            _readBuffer = _data.element().duplicate();
+        }
+        else
+        {
+            _readBuffer = ByteBuffer.allocate(_dataSize);
+            for(ByteBuffer buf:_data)
+            {
+                _readBuffer.put(buf);
+            }
+        }
+    }
+    
+    private ByteBuffer getReadBuffer() throws IOException
+    {
+        if (_readBuffer != null )
+        {
+           return _readBuffer.slice();
+        }    
+        else
+        {
+            if (_data.size() >0)
+            {
+               buildReadBuffer();
+               return _readBuffer.slice();
+            }
+            else
+            {
+                throw new IOException("No Data to read");
+            }
+        }
+    }
+    
+    //hack for testing
+    @Override public String toString()
+    {
+        try
+        {
+            ByteBuffer temp = getReadBuffer();
+            byte[] b = new byte[temp.remaining()];
+            temp.get(b);
+            return new String(b);
+        }
+        catch(IOException e)
+        {
+            return "No data";
+        }
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,87 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.EOFException;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * FileMessage provides pull style semantics for
+ * larges messages backed by a disk. 
+ * Instead of loading all data into memeory it uses
+ * FileChannel to map regions of the file into memeory
+ * at a time.
+ * 
+ * The write methods are not supported. 
+ * 
+ * From the standpoint of performance it is generally 
+ * only worth mapping relatively large files into memory.
+ *    
+ * FileMessage msg = new FileMessage(in,delProps,msgProps);
+ * session.messageTransfer(dest,msg,0,0);
+ * 
+ * The messageTransfer method will read the file in chunks
+ * and stream it.
+ *
+ */
+public class FileMessage extends ReadOnlyMessage implements Message
+{
+    private FileChannel _fileChannel;
+    private int _chunkSize;
+    private long _fileSize;
+    private long _pos = 0;
+    
+    public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
+    {
+        _messageProperties = messageProperties;
+        _deliveryProperties = deliveryProperties;
+        
+        _fileChannel = in.getChannel();
+        _chunkSize = chunkSize;
+        _fileSize = _fileChannel.size();
+        
+        if (_fileSize <= _chunkSize)
+        {
+            _chunkSize = (int)_fileSize;
+        }
+    }
+
+    public void readData(byte[] target) throws IOException
+    {        
+        throw new UnsupportedOperationException();              
+    }
+    
+    public ByteBuffer readData() throws IOException
+    {
+        if (_pos == _fileSize)
+        {
+            throw new EOFException();
+        }
+        
+        if (_pos + _chunkSize > _fileSize)
+        {
+            _chunkSize = (int)(_fileSize - _pos);
+        }
+        MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize);        
+        _pos += _chunkSize;
+        return bb;
+    }
+
+    /**
+     * This message is used by an application user to
+     * provide data to the client library using pull style
+     * semantics. Since the message is not transfered yet, it
+     * does not have a transfer id. Hence this method is not
+     * applicable to this implementation.    
+     */
+    public long getMessageTransferId()
+    {
+        throw new UnsupportedOperationException();
+    }
+}

Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
------------------------------------------------------------------------------
    svn:eol-style = native