You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/08/15 15:56:46 UTC
svn commit: r566155 - in /incubator/qpid/trunk/qpid/java:
client/src/main/java/org/apache/qpidity/client/
client/src/main/java/org/apache/qpidity/client/util/
common/src/main/java/org/apache/qpidity/
common/src/main/java/org/apache/qpidity/api/
Author: rajith
Date: Wed Aug 15 06:56:45 2007
New Revision: 566155
URL: http://svn.apache.org/viewvc?view=rev&rev=566155
Log:
Added clearData() and getTransferId() to the Message interface
Added Java doc
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/ClientSessionDelegate.java Wed Aug 15 06:56:45 2007
@@ -52,6 +52,7 @@
{
_currentTransfer = currentTransfer;
_currentMessageListener = ((ClientSession)session).getMessageListerners().get(currentTransfer.getDestination());
+ _currentMessageListener.messageTransfer(currentTransfer.getId());
//a better way is to tell the broker to stop the transfer
if (_currentMessageListener == null && _currentTransfer.getAcquireMode() == 1)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/MessagePartListener.java Wed Aug 15 06:56:45 2007
@@ -35,6 +35,13 @@
public interface MessagePartListener
{
/**
+ * Indicates the Message transfer has started.
+ *
+ * @param transferId
+ */
+ public void messageTransfer(long transferId);
+
+ /**
* Add the following headers ( {@link org.apache.qpidity.DeliveryProperties}
* or {@link org.apache.qpidity.ApplicationProperties} ) to the message being received.
*
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ByteBufferMessage.java Wed Aug 15 06:56:45 2007
@@ -23,20 +23,39 @@
{
private Queue<ByteBuffer> _data = new LinkedList<ByteBuffer>();
private ByteBuffer _readBuffer;
- private int dataSize;
+ private int _dataSize;
private DeliveryProperties _currentDeliveryProps;
private MessageProperties _currentMessageProps;
+ private long _transferId;
+ public ByteBufferMessage(long transferId)
+ {
+ _transferId = transferId;
+ }
+
+ public long getMessageTransferId()
+ {
+ return _transferId;
+ }
+ public void clearData()
+ {
+ _data = new LinkedList<ByteBuffer>();
+ _readBuffer = null;
+ }
+
public void appendData(byte[] src) throws IOException
{
appendData(ByteBuffer.wrap(src));
}
+ /**
+ * write the data from the current position up to the buffer limit
+ */
public void appendData(ByteBuffer src) throws IOException
{
_data.offer(src);
- dataSize += src.remaining();
+ _dataSize += src.remaining();
}
public DeliveryProperties getDeliveryProperties()
@@ -88,7 +107,7 @@
}
else
{
- _readBuffer = ByteBuffer.allocate(dataSize);
+ _readBuffer = ByteBuffer.allocate(_dataSize);
for(ByteBuffer buf:_data)
{
_readBuffer.put(buf);
@@ -104,7 +123,7 @@
buildReadBuffer();
}
ByteBuffer temp = _readBuffer.duplicate();
- byte[] b = new byte[temp.limit()];
+ byte[] b = new byte[temp.remaining()];
temp.get(b);
return new String(b);
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/FileMessage.java Wed Aug 15 06:56:45 2007
@@ -73,4 +73,15 @@
return bb;
}
+ /**
+ * This message is used by an application user to
+ * provide data to the client library using pull style
+ * semantics. Since the message is not transfered yet, it
+ * does not have a transfer id. Hence this method is not
+ * applicable to this implementation.
+ */
+ public long getMessageTransferId()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/MessagePartListenerAdapter.java Wed Aug 15 06:56:45 2007
@@ -24,8 +24,12 @@
public MessagePartListenerAdapter(MessageListener listener)
{
- _adaptee = listener;
- _currentMsg = new ByteBufferMessage();
+ _adaptee = listener;
+ }
+
+ public void messageTransfer(long transferId)
+ {
+ _currentMsg = new ByteBufferMessage(transferId);
}
public void addData(ByteBuffer src)
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/ReadOnlyMessage.java Wed Aug 15 06:56:45 2007
@@ -29,6 +29,10 @@
public MessageProperties getMessageProperties()
{
return _messageProperties;
- }
-
+ }
+
+ public void clearData()
+ {
+ throw new UnsupportedOperationException("This Message is read only after the initial source, cannot clear data");
+ }
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpidity/client/util/StreamingMessage.java Wed Aug 15 06:56:45 2007
@@ -44,5 +44,16 @@
return _readBuf.duplicate();
}
-
+
+ /**
+ * This message is used by an application user to
+ * provide data to the client library using pull style
+ * semantics. Since the message is not transfered yet, it
+ * does not have a transfer id. Hence this method is not
+ * applicable to this implementation.
+ */
+ public long getMessageTransferId()
+ {
+ throw new UnsupportedOperationException();
+ }
}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/ToyBroker.java Wed Aug 15 06:56:45 2007
@@ -28,7 +28,6 @@
import java.util.HashMap;
import java.util.List;
import java.util.Map;
-import java.util.Queue;
/**
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java?view=diff&rev=566155&r1=566154&r2=566155
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpidity/api/Message.java Wed Aug 15 06:56:45 2007
@@ -42,10 +42,23 @@
* <li> To Disk
* <li> To Socket (Stream)
* </ul>
- * @param src
+ * @param src - the data to append
*/
public void appendData(byte[] src) throws IOException;
+ /**
+ * This will abstract the underlying message data.
+ * The Message implementation may not hold all message
+ * data in memory (especially in the case of large messages)
+ *
+ * The appendData function might write data to
+ * <ul>
+ * <li> Memory (Ex: ByteBuffer)
+ * <li> To Disk
+ * <li> To Socket (Stream)
+ * </ul>
+ * @param src - the data to append
+ */
public void appendData(ByteBuffer src) throws IOException;
/**
@@ -59,10 +72,50 @@
* <li> From Disk
* <li> From Socket as and when it gets streamed
* </ul>
- * @param target
+ * @param target The target byte[] which the data gets copied to
*/
public void readData(byte[] target) throws IOException;
-
+
+ /**
+ * * This will abstract the underlying message data.
+ * The Message implementation may not hold all message
+ * data in memory (especially in the case of large messages)
+ *
+ * The read function might copy data from
+ * <ul>
+ * <li> From memory (Ex: ByteBuffer)
+ * <li> From Disk
+ * <li> From Socket as and when it gets streamed
+ * </ul>
+ *
+ * @return A ByteBuffer containing data
+ * @throws IOException
+ */
public ByteBuffer readData() throws IOException;
+
+ /**
+ * This should clear the body of the message.
+ */
+ public void clearData();
+
+ /**
+ * The provides access to the command Id assigned to the
+ * message transfer.
+ * This id is useful when you do
+ * <ul>
+ * <li>For message acquiring - If the transfer happend in no-acquire mode
+ * you could use this id to accquire it.
+ * <li>For releasing a message. You can use this id to release an acquired
+ * message
+ * <li>For Acknowledging a message - You need to pass this ID, in order to
+ * acknowledge the message
+ * <li>For Rejecting a message - You need to pass this ID, in order to reject
+ * the message.
+ * </ul>
+ *
+ * @return the message transfer id.
+ */
+ public long getMessageTransferId();
+
}