You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ar...@apache.org on 2007/09/19 13:36:26 UTC
svn commit: r577253 [1/7] - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity:
nclient/ nclient/impl/ nclient/util/ njms/ njms/message/
Author: arnaudsimon
Date: Wed Sep 19 04:36:23 2007
New Revision: 577253
URL: http://svn.apache.org/viewvc?rev=577253&view=rev
Log:
renamed qpidity.jms to qpidity.njms and qpidity.client to qpidity.nclient
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessageListener.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/MessagePartListenerAdapter.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ReadOnlyMessage.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/StreamingMessage.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionFactoryImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ConnectionMetaDataImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/CustomJMSXProperty.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/DestinationImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/ExceptionHelper.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageActor.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageConsumerImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/MessageProducerImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidBrowserListener.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QpidExceptionListenerImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueBrowserImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueConnectionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueReceiverImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSenderImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/QueueSessionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/SessionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryDestination.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryQueueImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TemporaryTopicImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicConnectionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicPublisherImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSessionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/TopicSubscriberImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAConnectionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueConnectionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAQueueSessionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XAResourceImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XASessionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicConnectionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/XATopicSessionImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/BytesMessageImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MapMessageImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MessageFactory.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MessageHelper.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/MessageImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/ObjectMessageImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/QpidMessage.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/StreamMessageImpl.java (with props)
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/njms/message/TextMessageImpl.java (with props)
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,128 @@
+package org.apache.qpidity.nclient;
+
+import java.util.concurrent.atomic.AtomicInteger;
+import java.util.concurrent.locks.Condition;
+import java.util.concurrent.locks.Lock;
+import java.util.concurrent.locks.ReentrantLock;
+
+import org.apache.qpidity.BrokerDetails;
+import org.apache.qpidity.ErrorCode;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.nclient.impl.ClientSession;
+import org.apache.qpidity.nclient.impl.ClientSessionDelegate;
+import org.apache.qpidity.transport.Channel;
+import org.apache.qpidity.transport.Connection;
+import org.apache.qpidity.transport.ConnectionClose;
+import org.apache.qpidity.transport.ConnectionDelegate;
+import org.apache.qpidity.transport.ConnectionEvent;
+import org.apache.qpidity.transport.ProtocolHeader;
+import org.apache.qpidity.transport.SessionDelegate;
+import org.apache.qpidity.transport.network.mina.MinaHandler;
+import org.apache.qpidity.url.QpidURL;
+
+
+public class Client implements org.apache.qpidity.nclient.Connection
+{
+ private AtomicInteger _channelNo = new AtomicInteger();
+ private Connection _conn;
+ private ExceptionListener _exceptionListner;
+ private final Lock _lock = new ReentrantLock();
+
+ /**
+ *
+ * @return returns a new connection to the broker.
+ */
+ public static org.apache.qpidity.nclient.Connection createConnection()
+ {
+ return new Client();
+ }
+
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException
+ {
+ Condition negotiationComplete = _lock.newCondition();
+ _lock.lock();
+
+ ConnectionDelegate connectionDelegate = new ConnectionDelegate()
+ {
+ public SessionDelegate getSessionDelegate()
+ {
+ return new ClientSessionDelegate();
+ }
+
+ @Override public void connectionClose(Channel context, ConnectionClose connectionClose)
+ {
+ _exceptionListner.onException(
+ new QpidException("Server closed the connection: Reason " +
+ connectionClose.getReplyText(),
+ ErrorCode.get(connectionClose.getReplyCode()),
+ null));
+ }
+ };
+
+ connectionDelegate.setCondition(_lock,negotiationComplete);
+ connectionDelegate.setUsername(username);
+ connectionDelegate.setPassword(password);
+ connectionDelegate.setVirtualHost(virtualHost);
+
+ _conn = MinaHandler.connect(host, port,connectionDelegate);
+
+ // XXX: hardcoded version numbers
+ _conn.send(new ConnectionEvent(0, new ProtocolHeader(1, 0, 10)));
+
+ try
+ {
+ negotiationComplete.await();
+ }
+ catch (Exception e)
+ {
+ //
+ }
+ finally
+ {
+ _lock.unlock();
+ }
+ }
+
+ /*
+ * Until the dust settles with the URL disucssion
+ * I am not going to implement this.
+ */
+ public void connect(QpidURL url) throws QpidException
+ {
+ // temp impl to tests
+ BrokerDetails details = url.getAllBrokerDetails().get(0);
+ connect(details.getHost(),
+ details.getPort(),
+ details.getVirtualHost(),
+ details.getUserName(),
+ details.getPassword());
+ }
+
+ public void close() throws QpidException
+ {
+ Channel ch = _conn.getChannel(0);
+ ch.connectionClose(0, "client is closing", 0, 0);
+ //need to close the connection underneath as well
+ }
+
+ public Session createSession(long expiryInSeconds)
+ {
+ Channel ch = _conn.getChannel(_channelNo.incrementAndGet());
+ ClientSession ssn = new ClientSession();
+ ssn.attach(ch);
+ ssn.sessionOpen(expiryInSeconds);
+ return ssn;
+ }
+
+ public DtxSession createDTXSession(int expiryInSeconds)
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
+ {
+ _exceptionListner = exceptionListner;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Client.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,86 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.url.QpidURL;
+
+/**
+ * This represents a physical connection to a broker.
+ */
+public interface Connection
+{
+ /**
+ * Establish the connection using the given parameters
+ *
+ * @param host
+ * @param port
+ * @param username
+ * @param password
+ * @throws QpidException
+ */
+ public void connect(String host, int port,String virtualHost,String username, String password) throws QpidException;
+
+ /**
+ * Establish the connection with the broker identified by the provided URL.
+ *
+ * @param url The URL of the broker.
+ * @throws QpidException If the communication layer fails to connect with the broker.
+ */
+ public void connect(QpidURL url) throws QpidException;
+
+ /**
+ * Close this connection.
+ *
+ * @throws QpidException if the communication layer fails to close the connection.
+ */
+ public void close() throws QpidException;
+
+
+ /**
+ * Create a session for this connection.
+ * <p> The retuned session is suspended
+ * (i.e. this session is not attached with an underlying channel)
+ *
+ * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
+ * @return A Newly created (suspended) session.
+ */
+ public Session createSession(long expiryInSeconds);
+
+ /**
+ * Create a DtxSession for this connection.
+ * <p> A Dtx Session must be used when resources have to be manipulated as
+ * part of a global transaction.
+ * <p> The retuned DtxSession is suspended
+ * (i.e. this session is not attached with an underlying channel)
+ *
+ * @param expiryInSeconds Expiry time expressed in seconds, if the value is <= 0 then the session does not expire.
+ * @return A Newly created (suspended) DtxSession.
+ */
+ public DtxSession createDTXSession(int expiryInSeconds);
+
+ /**
+ * If the communication layer detects a serious problem with a connection, it
+ * informs the connection's ExceptionListener
+ *
+ * @param exceptionListner The execptionListener
+ */
+
+ public void setExceptionListener(ExceptionListener exceptionListner);
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Connection.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,141 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.transport.DtxCoordinationCommitResult;
+import org.apache.qpidity.transport.DtxCoordinationGetTimeoutResult;
+import org.apache.qpidity.transport.DtxCoordinationPrepareResult;
+import org.apache.qpidity.transport.DtxCoordinationRecoverResult;
+import org.apache.qpidity.transport.DtxCoordinationRollbackResult;
+import org.apache.qpidity.transport.DtxDemarcationEndResult;
+import org.apache.qpidity.transport.DtxDemarcationStartResult;
+import org.apache.qpidity.transport.Future;
+import org.apache.qpidity.transport.Option;
+
+/**
+ * This session�s resources are control under the scope of a distributed transaction.
+ */
+public interface DtxSession extends Session
+{
+
+ /**
+ * This method is called when messages should be produced and consumed on behalf a transaction
+ * branch identified by xid.
+ * possible options are:
+ * <ul>
+ * <li> {@link Option#JOIN}: Indicate that the start applies to joining a transaction previously seen.
+ * <li> {@link Option#RESUME}: Indicate that the start applies to resuming a suspended transaction branch specified.
+ * </ul>
+ *
+ * @param xid Specifies the xid of the transaction branch to be started.
+ * @param options Possible options are: {@link Option#JOIN} and {@link Option#RESUME}.
+ * @return Confirms to the client that the transaction branch is started or specify the error condition.
+ */
+ public Future<DtxDemarcationStartResult> dtxDemarcationStart(String xid, Option... options);
+
+ /**
+ * This method is called when the work done on behalf a transaction branch finishes or needs to
+ * be suspended.
+ * possible options are:
+ * <ul>
+ * <li> {@link Option#FAIL}: indicates that this portion of work has failed;
+ * otherwise this portion of work has
+ * completed successfully.
+ * <li> {@link Option#SUSPEND}: Indicates that the transaction branch is
+ * temporarily suspended in an incomplete state.
+ * </ul>
+ *
+ * @param xid Specifies the xid of the transaction branch to be ended.
+ * @param options Available options are: {@link Option#FAIL} and {@link Option#SUSPEND}.
+ * @return Confirms to the client that the transaction branch is ended or specify the error condition.
+ */
+ public Future<DtxDemarcationEndResult> dtxDemarcationEnd(String xid, Option... options);
+
+ /**
+ * Commit the work done on behalf a transaction branch. This method commits the work associated
+ * with xid. Any produced messages are made available and any consumed messages are discarded.
+ * possible option is:
+ * <ul>
+ * <li> {@link Option#ONE_PHASE}: When set then one-phase commit optimization is used.
+ * </ul>
+ *
+ * @param xid Specifies the xid of the transaction branch to be committed.
+ * @param options Available option is: {@link Option#ONE_PHASE}
+ * @return Confirms to the client that the transaction branch is committed or specify the error condition.
+ */
+ public Future<DtxCoordinationCommitResult> dtxCoordinationCommit(String xid, Option... options);
+
+ /**
+ * This method is called to forget about a heuristically completed transaction branch.
+ *
+ * @param xid Specifies the xid of the transaction branch to be forgotten.
+ */
+ public void dtxCoordinationForget(String xid);
+
+ /**
+ * This method obtains the current transaction timeout value in seconds. If set-timeout was not
+ * used prior to invoking this method, the return value is the default timeout; otherwise, the
+ * value used in the previous set-timeout call is returned.
+ *
+ * @param xid Specifies the xid of the transaction branch for getting the timeout.
+ * @return The current transaction timeout value in seconds.
+ */
+ public Future<DtxCoordinationGetTimeoutResult> dtxCoordinationGetTimeout(String xid);
+
+ /**
+ * This method prepares for commitment any message produced or consumed on behalf of xid.
+ *
+ * @param xid Specifies the xid of the transaction branch that can be prepared.
+ * @return The status of the prepare operation: can be one of those:
+ * xa-ok: Normal execution.
+ * <p/>
+ * xa-rdonly: The transaction branch was read-only and has been committed.
+ * <p/>
+ * xa-rbrollback: The broker marked the transaction branch rollback-only for an unspecified
+ * reason.
+ * <p/>
+ * xa-rbtimeout: The work represented by this transaction branch took too long.
+ */
+ public Future<DtxCoordinationPrepareResult> dtxCoordinationPrepare(String xid);
+
+ /**
+ * This method is called to obtain a list of transaction branches that are in a prepared or
+ * heuristically completed state.
+ * Todo The options ahould be removed once the xml is updated
+ * @return a array of xids to be recovered.
+ */
+ public Future<DtxCoordinationRecoverResult> dtxCoordinationRecover(Option... options);
+
+ /**
+ * This method rolls back the work associated with xid. Any produced messages are discarded and
+ * any consumed messages are re-enqueued.
+ *
+ * @param xid Specifies the xid of the transaction branch that can be rolled back.
+ * @return Confirms to the client that the transaction branch is rolled back or specify the error condition.
+ */
+ public Future<DtxCoordinationRollbackResult> dtxCoordinationRollback(String xid);
+
+ /**
+ * Sets the specified transaction branch timeout value in seconds.
+ *
+ * @param xid Specifies the xid of the transaction branch for setting the timeout.
+ * @param timeout The transaction timeout value in seconds.
+ */
+ public void dtxCoordinationSetTimeout(String xid, long timeout);
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/DtxSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,37 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.QpidException;
+
+/**
+ * If the communication layer detects a serious problem with a <CODE>connection</CODE>, it
+ * informs the connection's ExceptionListener
+ */
+public interface ExceptionListener
+{
+ /**
+ * If the communication layer detects a serious problem with a connection, it
+ * informs the connection's ExceptionListener
+ *
+ * @param exception The exception comming from the communication layer.
+ * @see Connection
+ */
+ public void onException(QpidException exception);
+}
\ No newline at end of file
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/ExceptionListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,34 @@
+package org.apache.qpidity.nclient;
+
+import org.apache.qpidity.njms.ConnectionFactoryImpl;
+import org.apache.qpidity.njms.TopicImpl;
+
+public class JMSTestCase
+{
+ public static void main(String[] args)
+ {
+ try
+ {
+ javax.jms.Connection con = (new ConnectionFactoryImpl("localhost",5672, "test", "guest","guest")).createConnection();
+ con.start();
+
+ javax.jms.Session ssn = con.createSession(false, 1);
+
+ javax.jms.Destination dest = new TopicImpl("myTopic");
+ javax.jms.MessageProducer prod = ssn.createProducer(dest);
+ javax.jms.MessageConsumer cons = ssn.createConsumer(dest);
+
+ javax.jms.BytesMessage msg = ssn.createBytesMessage();
+ msg.writeInt(123);
+ prod.send(msg);
+
+ javax.jms.BytesMessage m = (javax.jms.BytesMessage)cons.receive();
+ System.out.println("Data : " + m.readInt());
+
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/JMSTestCase.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,64 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.transport.Header;
+
+/**
+ * Assembles message parts.
+ * <p> The sequence of event for transferring a message is as follows:
+ * <ul>
+ * <li> messageHeaders
+ * <li> n calls to addData
+ * <li> messageReceived
+ * </ul>
+ * This is up to the implementation to assembled the message when the different parts
+ * are transferred.
+ */
+public interface MessagePartListener
+{
+ /**
+ * Indicates the Message transfer has started.
+ *
+ * @param transferId
+ */
+ public void messageTransfer(long transferId);
+
+ /**
+ * Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
+ * or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
+ *
+ * @param headers Either <code>DeliveryProperties</code> or <code>ApplicationProperties</code>
+ */
+ public void messageHeader(Header header);
+
+ /**
+ * Add the following byte array to the content of the message being received
+ *
+ * @param data Data to be added or streamed.
+ */
+ public void data(ByteBuffer src);
+
+ /**
+ * Indicates that the message has been fully received.
+ */
+ public void messageReceived();
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/MessagePartListener.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,614 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpidity.nclient;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.Map;
+
+import org.apache.qpidity.transport.*;
+import org.apache.qpidity.api.Message;
+
+/**
+ * <p>A session is associated with a connection.
+ * When created a Session is not attached with an underlying channel.
+ * Session is single threaded </p>
+ * <p/>
+ * All the Session commands are asynchronous, synchronous invocation is achieved through invoking the sync method.
+ * That is to say that <code>command1</code> will be synchronously invoked using the following sequence:
+ * <ul>
+ * <li> <code>session.command1()</code>
+ * <li> <code>session.sync()</code>
+ * </ul>
+ */
+public interface Session
+{
+ public static final short TRANSFER_ACQUIRE_MODE_NO_ACQUIRE = 0;
+ public static final short TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE = 1;
+ public static final short TRANSFER_CONFIRM_MODE_REQUIRED = 1;
+ public static final short TRANSFER_CONFIRM_MODE_NOT_REQUIRED = 0;
+ public static final short MESSAGE_FLOW_MODE_CREDIT = 0;
+ public static final short MESSAGE_FLOW_MODE_WINDOW = 1;
+ public static final short MESSAGE_FLOW_UNIT_MESSAGE = 0;
+ public static final short MESSAGE_FLOW_UNIT_BYTE = 1;
+ public static final short MESSAGE_REJECT_CODE_GENERIC = 0;
+ public static final short MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED = 1;
+ public static final short MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE = 0;
+ public static final short MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE = 1;
+
+ //------------------------------------------------------
+ // Session housekeeping methods
+ //------------------------------------------------------
+
+ /**
+ * Sync method will block until all outstanding commands
+ * are executed.
+ */
+ public void sync();
+
+ /**
+ * Close this session and any associated resources.
+ */
+ public void sessionClose();
+
+ /**
+ * Suspend this session resulting in interrupting the traffic with the broker.
+ * <p> The session timer will start to tick in suspend.
+ * <p> When a session is suspend any operation of this session and of the associated resources are unavailable.
+ */
+ public void sessionSuspend();
+
+ //------------------------------------------------------
+ // Messaging methods
+ // Producer
+ //------------------------------------------------------
+ /**
+ * Transfer the given message to a specified exchange.
+ * <p/>
+ * <p>This is a convinience method for providing a complete message
+ * using a single method which internaly is mapped to messageTransfer(), headers() followed
+ * by data() and an endData().
+ * <b><i>This method should only be used by small messages</b></i></p>
+ *
+ * @param destination The exchange the message is being sent.
+ * @param msg The Message to be sent
+ * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
+ * is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
+ * (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul>
+ * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
+ * must be explicitly acquired
+ * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
+ * acquired when the transfer starts
+ * </ul>
+ * @throws java.io.IOException If transferring a message fails due to some internal communication error.
+ */
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode)
+ throws IOException;
+
+ /**
+ * <p>This is a convinience method for streaming a message using pull semantics
+ * using a single method as opposed to doing a push using messageTransfer(), headers() followed
+ * by a series of data() and an endData().</p>
+ * <p>Internally data will be pulled from Message object(which wrap's a data stream) using it's read()
+ * and pushed using messageTransfer(), headers() followed by a series of data() and an endData().
+ * <br><b><i>This method should only be used by large messages</b></i><br>
+ * There are two convinience Message classes provided to facilitate this.
+ * <ul>
+ * <li> <code>{@link org.apache.qpidity.nclient.util.FileMessage}</code>
+ * <li> <code>{@link org.apache.qpidity.nclient.util.StreamingMessage}</code>
+ * </ul>
+ * You could also implement a the <code>Message</code> interface to and wrap any
+ * data stream you want.
+ * </p>
+ *
+ * @param destination The exchange the message is being sent.
+ * @param msg The Message to be sent
+ * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
+ * is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
+ * (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul>
+ * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
+ * must be explicitly acquired
+ * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message
+ * is acquired when the transfer starts
+ * </ul>
+ * @throws java.io.IOException If transferring a message fails due to some internal communication error.
+ */
+ public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException;
+
+ /**
+ * Declare the beginning of a message transfer operation. This operation must
+ * be followed by {@link Session#header} then followed by any number of {@link Session#data}.
+ * The transfer is ended by {@link Session#endData}.
+ * <p> This way of transferring messages is useful when streaming large messages
+ * <p> In the interval [messageTransfer endData] any attempt to call a method other than
+ * {@link Session#header}, {@link Session#endData} ore {@link Session#sessionClose}
+ * will result in an exception being thrown.
+ *
+ * @param destination The exchange the message is being sent.
+ * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation
+ * is not required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message
+ * (whether acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul>
+ * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message
+ * must be explicitly acquired
+ * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message
+ * is acquired when the transfer starts
+ * </ul>
+ */
+ public void messageTransfer(String destination, short confirmMode, short acquireMode);
+
+ /**
+ * Add a set of headers the following headers to the message being sent.
+ *
+ * @param headers Are either <code>{@link org.apache.qpidity.transport.DeliveryProperties}</code>
+ * or <code>{@link org.apache.qpidity.transport.MessageProperties}</code>
+ * @see org.apache.qpidity.transport.DeliveryProperties
+ * @see org.apache.qpidity.transport.MessageProperties
+ */
+ public void header(Struct... headers);
+
+ /**
+ * Add the following byte array to the content of the message being sent.
+ *
+ * @param data Data to be added.
+ */
+ public void data(byte[] data);
+
+ /**
+ * Add the following ByteBuffer to the content of the message being sent.
+ * <p> Note that only the data between the buffer current position and the
+ * buffer limit is added.
+ * It is therefore recommended to flip the buffer before adding it to the message,
+ *
+ * @param buf Data to be added.
+ */
+ public void data(ByteBuffer buf);
+
+ /**
+ * Add the following String to the content of the message being sent.
+ *
+ * @param str String to be added.
+ */
+ public void data(String str);
+
+ /**
+ * Signals the end of data for the message.
+ */
+ public void endData();
+
+ //------------------------------------------------------
+ // Messaging methods
+ // Consumer
+ //------------------------------------------------------
+
+ /**
+ * Associate a message listener with a destination.
+ * <p> The destination is bound to a queue and messages are filtered based
+ * on the provider filter map (message filtering is specific to the provider and may not be handled).
+ * <p> Following are valid options:
+ * <ul>
+ * <li>{@link Option#NO_LOCAL}: <p>If the no-local field is set the server will not send
+ * messages to the connection that
+ * published them.
+ * <li>{@link Option#EXCLUSIVE}: <p> Request exclusive subscription access, meaning only this
+ * ubscription can access the queue.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * </ul>
+ *
+ * @param queue The queue this receiver is receiving messages from.
+ * @param destination The destination for the subscriber ,a.k.a the delivery tag.
+ * @param confirmMode <ul> </li>off ({@link Session#TRANSFER_CONFIRM_MODE_NOT_REQUIRED}): confirmation is not
+ * required, once a message has been transferred in pre-acquire
+ * mode (or once acquire has been sent in no-acquire mode) the message is considered
+ * transferred
+ * <p/>
+ * <li> on ({@link Session#TRANSFER_CONFIRM_MODE_REQUIRED}): an acquired message (whether
+ * acquisition was implicit as in pre-acquire mode or
+ * explicit as in no-acquire mode) is not considered transferred until the original
+ * transfer is complete (signaled via execution.complete)
+ * </ul>
+ * @param acquireMode <ul>
+ * <li> no-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_NO_ACQUIRE}): the message must
+ * be explicitly acquired
+ * <li> pre-acquire ({@link Session#TRANSFER_ACQUIRE_MODE_PRE_ACQUIRE}): the message is
+ * acquired when the transfer starts
+ * </ul>
+ * @param listener The listener for this destination. When big message are transfered then
+ * it is recommended to use a {@link org.apache.qpidity.nclient.MessagePartListener}.
+ * @param options Set of Options (valid options are {@link Option#NO_LOCAL}, {@link Option#EXCLUSIVE}
+ * and {@link Option#NO_OPTION})
+ * @param filter A set of filters for the subscription. The syntax and semantics of these filters depends
+ * on the providers implementation.
+ */
+ public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode,
+ MessagePartListener listener, Map<String, ?> filter, Option... options);
+
+ /**
+ * This method cancels a consumer. This does not affect already delivered messages, but it does
+ * mean the server will not send any more messages for that destination. The client may receive an
+ * arbitrary number of messages in between sending the cancel method and receiving the
+ * notification of completion of the cancel command.
+ *
+ * @param destination The destination for the subscriber used at subscription
+ */
+ public void messageCancel(String destination);
+
+ /**
+ * Associate a message part listener with a destination.
+ * <p> Only one listerner per destination is allowed. This means
+ * that the previous message listener is replaced. This is done gracefully i.e. the message
+ * listener is replaced once it return from the processing of a message.
+ *
+ * @param destination The destination the listener is associated with.
+ * @param listener The new listener for this destination.
+ */
+ public void setMessageListener(String destination, MessagePartListener listener);
+
+ /**
+ * Sets the mode of flow control used for a given destination.
+ * <p> With credit based flow control, the broker continually maintains its current
+ * credit balance with the recipient. The credit balance consists of two values, a message
+ * count, and a byte count. Whenever message data is sent, both counts must be decremented.
+ * If either value reaches zero, the flow of message data must stop. Additional credit is
+ * received via the {@link Session#messageFlow} method.
+ * <p> Window based flow control is identical to credit based flow control, however message
+ * acknowledgment implicitly grants a single unit of message credit, and the size of the
+ * message in byte credits for each acknowledged message.
+ *
+ * @param destination The destination to set the flow mode on.
+ * @param mode <ul> <li>credit ({@link Session#MESSAGE_FLOW_MODE_CREDIT}): choose credit based flow control
+ * <li> window ({@link Session#MESSAGE_FLOW_MODE_WINDOW}): choose window based flow control</ul>
+ */
+ public void messageFlowMode(String destination, short mode);
+
+
+ /**
+ * This method controls the flow of message data to a given destination. It is used by the
+ * recipient of messages to dynamically match the incoming rate of message flow to its
+ * processing or forwarding capacity. Upon receipt of this method, the sender must add "value"
+ * number of the specified unit to the available credit balance for the specified destination.
+ * A value of 0 indicates an infinite amount of credit. This disables any limit for
+ * the given unit until the credit balance is zeroed with {@link Session#messageStop}
+ * or {@link Session#messageFlush}.
+ *
+ * @param destination The destination to set the flow.
+ * @param unit Specifies the unit of credit balance.
+ * <p/>
+ * One of: <ul>
+ * <li> message ({@link Session#MESSAGE_FLOW_UNIT_MESSAGE})
+ * <li> byte ({@link Session#MESSAGE_FLOW_UNIT_BYTE})
+ * </ul>
+ * @param value Number of credits, a value of 0 indicates an infinite amount of credit.
+ */
+ public void messageFlow(String destination, short unit, long value);
+
+ /**
+ * Forces the broker to exhaust its credit supply.
+ * <p> The broker's credit will always be zero when
+ * this method completes.
+ *
+ * @param destination The destination to call flush on.
+ */
+ public void messageFlush(String destination);
+
+ /**
+ * On receipt of this method, the brokers MUST set his credit to zero for the given
+ * destination. This obeys the generic semantics of command completion, i.e. when confirmation
+ * is issued credit MUST be zero and no further messages will be sent until such a time as
+ * further credit is received.
+ *
+ * @param destination The destination to stop.
+ */
+ public void messageStop(String destination);
+
+ /**
+ * Acknowledge the receipt of ranges of messages.
+ * <p>Message must have been previously acquired either by receiving them in
+ * pre-acquire mode or by explicitly acquiring them.
+ *
+ * @param ranges Range of acknowledged messages.
+ */
+ public void messageAcknowledge(RangeSet ranges);
+
+ /**
+ * Reject ranges of acquired messages.
+ * <p> The broker MUST deliver rejected messages to the
+ * alternate-exchange on the queue from which it was delivered. If no alternate-exchange is
+ * defined for that queue the broker MAY discard the message.
+ *
+ * @param ranges Range of rejected messages.
+ * @param code The reject code must be one of {@link Session#MESSAGE_REJECT_CODE_GENERIC} or
+ * {@link Session#MESSAGE_REJECT_CODE_IMMEDIATE_DELIVERY_FAILED} (immediate delivery was attempted but
+ * failed).
+ * @param text String describing the reason for a message transfer rejection.
+ */
+ public void messageReject(RangeSet ranges, int code, String text);
+
+ /**
+ * As it is possible that the broker does not manage to reject some messages, after completion of
+ * {@link Session#messageReject} this method will return the ranges of rejected messages.
+ * <p> Note that {@link Session#messageReject} and this methods are asynchronous therefore for accessing to the
+ * previously rejected messages this method must be invoked in conjunction with {@link Session#sync()}.
+ * <p> A recommended invocation sequence would be:
+ * <ul>
+ * <li> {@link Session#messageReject}
+ * <li> {@link Session#sync()}
+ * <li> {@link Session#getRejectedMessages()}
+ * </ul>
+ *
+ * @return The rejected message ranges
+ */
+ public RangeSet getRejectedMessages();
+
+ /**
+ * Try to acquire ranges of messages hence releasing them form the queue.
+ * This means that once acknowledged, a message will not be delivered to any other receiver.
+ * <p> As those messages may have been consumed by another receivers hence,
+ * message acquisition can fail.
+ * The outcome of the acquisition is returned as an array of ranges of qcquired messages.
+ * <p> This method should only be called on non-acquired messages.
+ *
+ * @param mode One of: <ul>
+ * <li> any ({@link Session#MESSAGE_ACQUIRE_ANY_AVAILABLE_MESSAGE}): acquire any available
+ * messages for consumption
+ * <li> all ({@link Session#MESSAGE_ACQUIRE_MESSAGES_IF_ALL_ARE_AVAILABLE}): only acquire messages
+ * if all are available for consumption
+ * </ul>
+ * @param ranges Ranges of messages to be acquired.
+ */
+ public void messageAcquire(RangeSet ranges, short mode);
+
+ /**
+ * As it is possible that the broker does not manage to acquire some messages, after completion of
+ * {@link Session#messageAcquire} this method will return the ranges of acquired messages.
+ * <p> Note that {@link Session#messageAcquire} and this methods are asynchronous therefore for accessing to the
+ * previously acquired messages this method must be invoked in conjunction with {@link Session#sync()}.
+ * <p> A recommended invocation sequence would be:
+ * <ul>
+ * <li> {@link Session#messageAcquire}
+ * <li> {@link Session#sync()}
+ * <li> {@link Session#getAccquiredMessages()}
+ * </ul>
+ *
+ * @return returns the message ranges marked by the broker as acquired.
+ */
+ public RangeSet getAccquiredMessages();
+
+ /**
+ * Give up responsibility for processing ranges of messages.
+ * <p> Released messages are re-enqueued.
+ *
+ * @param ranges Ranges of messages to be released.
+ */
+ public void messageRelease(RangeSet ranges);
+
+ // -----------------------------------------------
+ // Local transaction methods
+ // ----------------------------------------------
+ /**
+ * Selects the session for local transaction support.
+ */
+ public void txSelect();
+
+ /**
+ * Commit the receipt and the delivery of all messages exchanged by this session resources.
+ *
+ * @throws IllegalStateException If this session is not transacted.
+ */
+ public void txCommit() throws IllegalStateException;
+
+ /**
+ * Rollback the receipt and the delivery of all messages exchanged by this session resources.
+ *
+ * @throws IllegalStateException If this session is not transacted.
+ */
+ public void txRollback() throws IllegalStateException;
+
+ //---------------------------------------------
+ // Queue methods
+ //---------------------------------------------
+
+ /**
+ * Declare a queue with the given queueName
+ * <p> Following are the valid options:
+ * <ul>
+ * <li> {@link Option#AUTO_DELETE}: <p> If this field is set and the exclusive field is also set,
+ * then the queue is deleted when the connection closes.
+ * If this field is set and the exclusive field is not set the queue is deleted when all
+ * the consumers have finished using it.
+ * <li> {@link Option#DURABLE}: <p> If set when creating a new queue,
+ * the queue will be marked as durable. Durable queues
+ * remain active when a server restarts. Non-durable queues (transient queues) are purged
+ * if/when a server restarts. Note that durable queues do not necessarily hold persistent
+ * messages, although it does not make sense to send persistent messages to a transient
+ * queue.
+ * <li> {@link Option#EXCLUSIVE}: <p> Exclusive queues can only be used from one connection at a time.
+ * Once a connection declares an exclusive queue, that queue cannot be used by any other connections until the
+ * declaring connection closes.
+ * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the queue.
+ * This field allows the client to assert the presence of a queue without modifying the server state.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * </ul>
+ * <p>In the absence of a particular option, the defaul value is false for each option
+ *
+ * @param queueName The name of the delcared queue.
+ * @param alternateExchange If a message is rejected by a queue, then it is sent to the alternate-exchange. A message
+ * may be rejected by a queue for the following reasons:
+ * <oL> <li> The queue is deleted when it is not empty;
+ * <li> Immediate delivery of a message is requested, but there are no consumers connected to
+ * the queue. </ol>
+ * @param arguments Used for backward compatibility
+ * @param options Set of Options ( valide options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
+ * {@link Option#EXCLUSIVE}, {@link Option#PASSIVE} and {@link Option#NO_OPTION})
+ * @see Option
+ */
+ public void queueDeclare(String queueName, String alternateExchange, Map<String, ?> arguments, Option... options);
+
+ /**
+ * Bind a queue with an exchange.
+ *
+ * @param queueName Specifies the name of the queue to bind. If the queue name is empty, refers to the current
+ * queue for the session, which is the last declared queue.
+ * @param exchangeName The exchange name.
+ * @param routingKey Specifies the routing key for the binding. The routing key is used for routing messages
+ * depending on the exchange configuration. Not all exchanges use a routing key - refer to
+ * the specific exchange documentation. If the queue name is empty, the server uses the last
+ * queue declared on the session. If the routing key is also empty, the server uses this
+ * queue name for the routing key as well. If the queue name is provided but the routing key
+ * is empty, the server does the binding with that empty routing key. The meaning of empty
+ * routing keys depends on the exchange implementation.
+ * @param arguments Used for backward compatibility
+ */
+ public void queueBind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+
+ /**
+ * Unbind a queue from an exchange.
+ *
+ * @param queueName Specifies the name of the queue to unbind.
+ * @param exchangeName The name of the exchange to unbind from.
+ * @param routingKey Specifies the routing key of the binding to unbind.
+ * @param arguments Used for backward compatibility
+ */
+ public void queueUnbind(String queueName, String exchangeName, String routingKey, Map<String, ?> arguments);
+
+ /**
+ * This method removes all messages from a queue. It does not cancel consumers. Purged messages
+ * are deleted without any formal "undo" mechanism.
+ *
+ * @param queueName Specifies the name of the queue to purge. If the queue name is empty, refers to the
+ * current queue for the session, which is the last declared queue.
+ */
+ public void queuePurge(String queueName);
+
+ /**
+ * This method deletes a queue. When a queue is deleted any pending messages are sent to a
+ * dead-letter queue if this is defined in the server configuration, and all consumers on the
+ * queue are cancelled.
+ * <p> Following are the valid options:
+ * <ul>
+ * <li> {@link Option#IF_EMPTY}: <p> If set, the server will only delete the queue if it has no messages.
+ * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the queue if it has no consumers.
+ * If the queue has consumers the server does does not delete it but raises a channel exception instead.
+ * <li>{@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * </ul>
+ * </p>
+ * <p/>
+ * <p>In the absence of a particular option, the defaul value is false for each option</p>
+ *
+ * @param queueName Specifies the name of the queue to delete. If the queue name is empty, refers to the
+ * current queue for the session, which is the last declared queue.
+ * @param options Set of options (Valid options are: {@link Option#IF_EMPTY}, {@link Option#IF_UNUSED}
+ * and {@link Option#NO_OPTION})
+ * @see Option
+ */
+ public void queueDelete(String queueName, Option... options);
+
+ // --------------------------------------
+ // exhcange methods
+ // --------------------------------------
+
+ /**
+ * This method creates an exchange if it does not already exist, and if the exchange exists,
+ * verifies that it is of the correct and expected class.
+ * <p> Following are the valid options:
+ * <ul>
+ * <li> {@link Option#AUTO_DELETE}: <p> If set, the exchange is deleted when all queues have finished using it.
+ * <li> {@link Option#DURABLE}: <p> If set when creating a new exchange, the exchange will
+ * be marked as durable. Durable exchanges remain active when a server restarts. Non-durable exchanges (transient
+ * exchanges) are purged if/when a server restarts.
+ * <li> {@link Option#PASSIVE}: <p> If set, the server will not create the exchange.
+ * The client can use this to check whether an exchange exists without modifying the server state.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * </ul>
+ * <p>In the absence of a particular option, the defaul value is false for each option</p>
+ *
+ * @param exchangeName The exchange name.
+ * @param type Each exchange belongs to one of a set of exchange types implemented by the server. The
+ * exchange types define the functionality of the exchange - i.e. how messages are routed
+ * through it. It is not valid or meaningful to attempt to change the type of an existing
+ * exchange. Default exchange types are: direct, topic, headers and fanout.
+ * @param alternateExchange In the event that a message cannot be routed, this is the name of the exchange to which
+ * the message will be sent.
+ * @param options Set of options (valid options are: {@link Option#AUTO_DELETE}, {@link Option#DURABLE},
+ * {@link Option#PASSIVE}, {@link Option#NO_OPTION})
+ * @param arguments Used for backward compatibility
+ * @see Option
+ */
+ public void exchangeDeclare(String exchangeName, String type, String alternateExchange, Map<String, ?> arguments,
+ Option... options);
+
+ /**
+ * This method deletes an exchange. When an exchange is deleted all queue bindings on the
+ * exchange are cancelled.
+ * <p> Following are the valid options:
+ * <ul>
+ * <li> {@link Option#IF_UNUSED}: <p> If set, the server will only delete the exchange if it has no queue bindings. If the
+ * exchange has queue bindings the server does not delete it but raises a channel exception
+ * instead.
+ * <li> {@link Option#NO_OPTION}: <p> Has no effect as it represents an �empty� option.
+ * </ul>
+ * <p>In the absence of a particular option, the defaul value is false for each option
+ *
+ * @param exchangeName The name of exchange to be deleted.
+ * @param options Set of options (valid options are: {@link Option#IF_UNUSED}, {@link Option#NO_OPTION})
+ * @see Option
+ */
+ public void exchangeDelete(String exchangeName, Option... options);
+
+
+ /**
+ * This method is used to request information on a particular exchange.
+ *
+ * @param exchangeName The name of the exchange for which information is requested. If not specified explicitly
+ * the default exchange is implied.
+ * @result Information on the specified exchange.
+ */
+ public Future<ExchangeQueryResult> exchangeQuery(String exchangeName);
+
+ /**
+ * If the session receives a sessionClosed with an error code it
+ * informs the session's ExceptionListener
+ *
+ * @param exceptionListner The execptionListener
+ */
+ public void setExceptionListener(ExceptionListener exceptionListner);
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/Session.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,119 @@
+package org.apache.qpidity.nclient.impl;
+
+import java.io.EOFException;
+import java.io.IOException;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.apache.qpidity.transport.Option;
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.RangeSet;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.MessagePartListener;
+
+/**
+ * Implements a Qpid Sesion.
+ */
+public class ClientSession extends org.apache.qpidity.transport.Session implements org.apache.qpidity.nclient.DtxSession
+{
+ private Map<String,MessagePartListener> _messageListeners = new HashMap<String,MessagePartListener>();
+ private ExceptionListener _exceptionListner;
+ private RangeSet _acquiredMessages;
+ private RangeSet _rejectedMessages;
+
+ public void messageAcknowledge(RangeSet ranges)
+ {
+ for (Range range : ranges)
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("Acknowleding transfer id : " + l);
+ super.processed(l);
+ }
+ }
+ }
+
+ public void messageSubscribe(String queue, String destination, short confirmMode, short acquireMode, MessagePartListener listener, Map<String, ?> filter, Option... options)
+ {
+ setMessageListener(destination,listener);
+ super.messageSubscribe(queue, destination, confirmMode, acquireMode, filter, options);
+ }
+
+ public void messageTransfer(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
+ {
+ // The javadoc clearly says that this method is suitable for small messages
+ // therefore reading the content in one shot.
+ super.messageTransfer(destination, confirmMode, acquireMode);
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
+ super.data(msg.readData());
+ super.endData();
+ }
+
+ public void messageStream(String destination, Message msg, short confirmMode, short acquireMode) throws IOException
+ {
+ super.messageTransfer(destination, confirmMode, acquireMode);
+ super.header(msg.getDeliveryProperties(),msg.getMessageProperties());
+ boolean b = true;
+ int count = 0;
+ while(b)
+ {
+ try
+ {
+ System.out.println("count : " + count++);
+ super.data(msg.readData());
+ }
+ catch(EOFException e)
+ {
+ b = false;
+ }
+ }
+
+ super.endData();
+ }
+
+ public RangeSet getAccquiredMessages()
+ {
+ return _acquiredMessages;
+ }
+
+ public RangeSet getRejectedMessages()
+ {
+ return _rejectedMessages;
+ }
+
+ public void setMessageListener(String destination, MessagePartListener listener)
+ {
+ if (listener == null)
+ {
+ throw new IllegalArgumentException("Cannot set message listener to null");
+ }
+ _messageListeners.put(destination, listener);
+ }
+
+ public void setExceptionListener(ExceptionListener exceptionListner)
+ {
+ _exceptionListner = exceptionListner;
+ }
+
+ void setAccquiredMessages(RangeSet acquiredMessages)
+ {
+ _acquiredMessages = acquiredMessages;
+ }
+
+ void setRejectedMessages(RangeSet rejectedMessages)
+ {
+ _rejectedMessages = rejectedMessages;
+ }
+
+ void notifyException(QpidException ex)
+ {
+ _exceptionListner.onException(ex);
+ }
+
+ Map<String,MessagePartListener> getMessageListerners()
+ {
+ return _messageListeners;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSession.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,80 @@
+package org.apache.qpidity.nclient.impl;
+
+import java.nio.ByteBuffer;
+
+import org.apache.qpidity.ErrorCode;
+
+import org.apache.qpidity.nclient.MessagePartListener;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.transport.Data;
+import org.apache.qpidity.transport.Header;
+import org.apache.qpidity.transport.MessageAcquired;
+import org.apache.qpidity.transport.MessageReject;
+import org.apache.qpidity.transport.MessageTransfer;
+import org.apache.qpidity.transport.Range;
+import org.apache.qpidity.transport.Session;
+import org.apache.qpidity.transport.SessionClosed;
+import org.apache.qpidity.transport.SessionDelegate;
+
+
+public class ClientSessionDelegate extends SessionDelegate
+{
+ private MessageTransfer _currentTransfer;
+ private MessagePartListener _currentMessageListener;
+
+ @Override public void sessionClosed(Session ssn,SessionClosed sessionClosed)
+ {
+ ((ClientSession)ssn).notifyException(new QpidException(sessionClosed.getReplyText(),ErrorCode.get(sessionClosed.getReplyCode()),null));
+ }
+
+ // --------------------------------------------
+ // Message methods
+ // --------------------------------------------
+ @Override public void data(Session ssn, Data data)
+ {
+ for (ByteBuffer b : data.getFragments())
+ {
+ _currentMessageListener.data(b);
+ }
+ if (data.isLast())
+ {
+ _currentMessageListener.messageReceived();
+ }
+
+ }
+
+ @Override public void header(Session ssn, Header header)
+ {
+ _currentMessageListener.messageHeader(header);
+ }
+
+
+ @Override public void messageTransfer(Session session, MessageTransfer currentTransfer)
+ {
+ _currentTransfer = currentTransfer;
+ _currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+ _currentMessageListener.messageTransfer(currentTransfer.getId());
+ }
+
+ @Override public void messageReject(Session session, MessageReject struct)
+ {
+ for (Range range : struct.getTransfers())
+ {
+ for (long l = range.getLower(); l <= range.getUpper(); l++)
+ {
+ System.out.println("message rejected: " +
+ session.getCommand((int) l));
+ }
+ }
+ ((ClientSession)session).setRejectedMessages(struct.getTransfers());
+ ((ClientSession)session).notifyException(new QpidException("Message Rejected",ErrorCode.MESSAGE_REJECTED,null));
+ session.processed(struct);
+ }
+
+ @Override public void messageAcquired(Session session, MessageAcquired struct)
+ {
+ ((ClientSession)session).setAccquiredMessages(struct.getTransfers());
+ session.processed(struct);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/ClientSessionDelegate.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,89 @@
+package org.apache.qpidity.nclient.impl;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+
+public class DemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+ // queue
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.header(new DeliveryProperties().setRoutingKey("queue1"),new MessageProperties().setMessageId("123"));
+ ssn.data("this is the data");
+ ssn.endData();
+
+ //reject
+ ssn.messageTransfer("amq.direct", (short) 0, (short) 1);
+ ssn.data("this should be rejected");
+ ssn.header(new DeliveryProperties().setRoutingKey("stocks"));
+ ssn.endData();
+ ssn.sync();
+
+ // topic subs
+ ssn.messageSubscribe("topic1", "myDest2", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic2", "myDest3", (short)0, (short)0,createAdapter(), null);
+ ssn.messageSubscribe("topic3", "myDest4", (short)0, (short)0,createAdapter(), null);
+ ssn.sync();
+
+ ssn.queueDeclare("topic1", null, null);
+ ssn.queueBind("topic1", "amq.topic", "stock.*",null);
+ ssn.queueDeclare("topic2", null, null);
+ ssn.queueBind("topic2", "amq.topic", "stock.us.*",null);
+ ssn.queueDeclare("topic3", null, null);
+ ssn.queueBind("topic3", "amq.topic", "stock.us.rh",null);
+ ssn.sync();
+
+ // topic
+ ssn.messageTransfer("amq.topic", (short) 0, (short) 1);
+ ssn.data("Topic message");
+ ssn.header(new DeliveryProperties().setRoutingKey("stock.us.ibm"),new MessageProperties().setMessageId("456"));
+ ssn.endData();
+ ssn.sync();
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/DemoClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,74 @@
+package org.apache.qpidity.nclient.impl;
+
+import java.io.FileInputStream;
+
+import org.apache.qpidity.QpidException;
+import org.apache.qpidity.api.Message;
+import org.apache.qpidity.nclient.Client;
+import org.apache.qpidity.nclient.Connection;
+import org.apache.qpidity.nclient.ExceptionListener;
+import org.apache.qpidity.nclient.Session;
+import org.apache.qpidity.nclient.util.FileMessage;
+import org.apache.qpidity.nclient.util.MessageListener;
+import org.apache.qpidity.nclient.util.MessagePartListenerAdapter;
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+
+public class LargeMsgDemoClient
+{
+ public static MessagePartListenerAdapter createAdapter()
+ {
+ return new MessagePartListenerAdapter(new MessageListener()
+ {
+ public void onMessage(Message m)
+ {
+ System.out.println("\n================== Received Msg ==================");
+ System.out.println("Message Id : " + m.getMessageProperties().getMessageId());
+ System.out.println(m.toString());
+ System.out.println("================== End Msg ==================\n");
+ }
+
+ });
+ }
+
+ public static final void main(String[] args)
+ {
+ Connection conn = Client.createConnection();
+ try{
+ conn.connect("0.0.0.0", 5672, "test", "guest", "guest");
+ }catch(Exception e){
+ e.printStackTrace();
+ }
+
+ Session ssn = conn.createSession(50000);
+ ssn.setExceptionListener(new ExceptionListener()
+ {
+ public void onException(QpidException e)
+ {
+ System.out.println(e);
+ }
+ });
+ ssn.queueDeclare("queue1", null, null);
+ ssn.queueBind("queue1", "amq.direct", "queue1",null);
+ ssn.sync();
+
+ ssn.messageSubscribe("queue1", "myDest", (short)0, (short)0,createAdapter(), null);
+
+ try
+ {
+ FileMessage msg = new FileMessage(new FileInputStream("/home/rajith/TestFile"),
+ 1024,
+ new DeliveryProperties().setRoutingKey("queue1"),
+ new MessageProperties().setMessageId("123"));
+
+ // queue
+ ssn.messageStream("amq.direct",msg, (short) 0, (short) 1);
+ ssn.sync();
+ }
+ catch(Exception e)
+ {
+ e.printStackTrace();
+ }
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/impl/LargeMsgDemoClient.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,150 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.util.LinkedList;
+import java.util.Queue;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * <p>A Simple implementation of the message interface
+ * for small messages. When the readData methods are called
+ * we assume the message is complete. i.e there want be any
+ * appendData operations after that.</p>
+ *
+ * <p>If you need large message support please see
+ * <code>FileMessage</code> and <code>StreamingMessage</code>
+ * </p>
+ */
+public class ByteBufferMessage implements Message
+{
+ private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
+ private ByteBuffer _readBuffer;
+ private int _dataSize;
+ private DeliveryProperties _currentDeliveryProps;
+ private MessageProperties _currentMessageProps;
+ private long _transferId;
+
+ public ByteBufferMessage()
+ {
+ _currentDeliveryProps = new DeliveryProperties();
+ _currentMessageProps = new MessageProperties();
+ }
+
+ public ByteBufferMessage(long transferId)
+ {
+ _transferId = transferId;
+ }
+
+ public long getMessageTransferId()
+ {
+ return _transferId;
+ }
+
+ public void clearData()
+ {
+ _data = new LinkedList<ByteBuffer>();
+ _readBuffer = null;
+ }
+
+ public void appendData(byte[] src) throws IOException
+ {
+ appendData(ByteBuffer.wrap(src));
+ }
+
+ /**
+ * write the data from the current position up to the buffer limit
+ */
+ public void appendData(ByteBuffer src) throws IOException
+ {
+ _data.offer(src);
+ _dataSize += src.remaining();
+ }
+
+ public DeliveryProperties getDeliveryProperties()
+ {
+ return _currentDeliveryProps;
+ }
+
+ public MessageProperties getMessageProperties()
+ {
+ System.out.println("MessageProperties is null ? " + _currentMessageProps == null? "true":"false");
+ return _currentMessageProps;
+ }
+
+ public void setDeliveryProperties(DeliveryProperties props)
+ {
+ _currentDeliveryProps = props;
+ }
+
+ public void setMessageProperties(MessageProperties props)
+ {
+ _currentMessageProps = props;
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ getReadBuffer().get(target);
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ return getReadBuffer();
+ }
+
+ private void buildReadBuffer()
+ {
+ //optimize for the simple cases
+ if(_data.size() == 1)
+ {
+ _readBuffer = _data.element().duplicate();
+ }
+ else
+ {
+ _readBuffer = ByteBuffer.allocate(_dataSize);
+ for(ByteBuffer buf:_data)
+ {
+ _readBuffer.put(buf);
+ }
+ }
+ }
+
+ private ByteBuffer getReadBuffer() throws IOException
+ {
+ if (_readBuffer != null )
+ {
+ return _readBuffer.slice();
+ }
+ else
+ {
+ if (_data.size() >0)
+ {
+ buildReadBuffer();
+ return _readBuffer.slice();
+ }
+ else
+ {
+ throw new IOException("No Data to read");
+ }
+ }
+ }
+
+ //hack for testing
+ @Override public String toString()
+ {
+ try
+ {
+ ByteBuffer temp = getReadBuffer();
+ byte[] b = new byte[temp.remaining()];
+ temp.get(b);
+ return new String(b);
+ }
+ catch(IOException e)
+ {
+ return "No data";
+ }
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/ByteBufferMessage.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java?rev=577253&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java Wed Sep 19 04:36:23 2007
@@ -0,0 +1,87 @@
+package org.apache.qpidity.nclient.util;
+
+import java.io.EOFException;
+import java.io.FileInputStream;
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.MappedByteBuffer;
+import java.nio.channels.FileChannel;
+
+import org.apache.qpidity.transport.DeliveryProperties;
+import org.apache.qpidity.transport.MessageProperties;
+import org.apache.qpidity.api.Message;
+
+/**
+ * FileMessage provides pull style semantics for
+ * larges messages backed by a disk.
+ * Instead of loading all data into memeory it uses
+ * FileChannel to map regions of the file into memeory
+ * at a time.
+ *
+ * The write methods are not supported.
+ *
+ * From the standpoint of performance it is generally
+ * only worth mapping relatively large files into memory.
+ *
+ * FileMessage msg = new FileMessage(in,delProps,msgProps);
+ * session.messageTransfer(dest,msg,0,0);
+ *
+ * The messageTransfer method will read the file in chunks
+ * and stream it.
+ *
+ */
+public class FileMessage extends ReadOnlyMessage implements Message
+{
+ private FileChannel _fileChannel;
+ private int _chunkSize;
+ private long _fileSize;
+ private long _pos = 0;
+
+ public FileMessage(FileInputStream in,int chunkSize,DeliveryProperties deliveryProperties,MessageProperties messageProperties)throws IOException
+ {
+ _messageProperties = messageProperties;
+ _deliveryProperties = deliveryProperties;
+
+ _fileChannel = in.getChannel();
+ _chunkSize = chunkSize;
+ _fileSize = _fileChannel.size();
+
+ if (_fileSize <= _chunkSize)
+ {
+ _chunkSize = (int)_fileSize;
+ }
+ }
+
+ public void readData(byte[] target) throws IOException
+ {
+ throw new UnsupportedOperationException();
+ }
+
+ public ByteBuffer readData() throws IOException
+ {
+ if (_pos == _fileSize)
+ {
+ throw new EOFException();
+ }
+
+ if (_pos + _chunkSize > _fileSize)
+ {
+ _chunkSize = (int)(_fileSize - _pos);
+ }
+ MappedByteBuffer bb = _fileChannel.map(FileChannel.MapMode.READ_ONLY, _pos, _chunkSize);
+ _pos += _chunkSize;
+ return bb;
+ }
+
+ /**
+ * This message is used by an application user to
+ * provide data to the client library using pull style
+ * semantics. Since the message is not transfered yet, it
+ * does not have a transfer id. Hence this method is not
+ * applicable to this implementation.
+ */
+ public long getMessageTransferId()
+ {
+ throw new UnsupportedOperationException();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/nclient/util/FileMessage.java
------------------------------------------------------------------------------
svn:eol-style = native