You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rh...@apache.org on 2007/01/29 17:41:51 UTC
svn commit: r501099 - in
/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server:
AMQChannel.java queue/AMQMessage.java queue/AMQQueueMBean.java
Author: rhs
Date: Mon Jan 29 08:41:50 2007
New Revision: 501099
URL: http://svn.apache.org/viewvc?view=rev&rev=501099
Log:
filled in a bunch of stubs in AMQMessage and made AMQMessage.getContents() return duplicate ByteBuffers
Modified:
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java?view=diff&rev=501099&r1=501098&r2=501099
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/AMQChannel.java Mon Jan 29 08:41:50 2007
@@ -281,7 +281,7 @@
mtb.destination = destination;
ByteBuffer buf = ByteBuffer.allocate((int)msg.getBodySize());
for (ByteBuffer bb : msg.getContents()) {
- buf.put(bb.duplicate());
+ buf.put(bb);
}
buf.flip();
mtb.body = new Content(Content.TypeEnum.INLINE_T, buf);
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java?view=diff&rev=501099&r1=501098&r2=501099
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQMessage.java Mon Jan 29 08:41:50 2007
@@ -30,6 +30,7 @@
import org.apache.qpid.AMQException;
import java.util.ArrayList;
+import java.util.Iterator;
import java.util.List;
import java.util.LinkedList;
import java.util.Set;
@@ -53,6 +54,23 @@
private List<ByteBuffer> _contents;
+ private Iterable<ByteBuffer> _dupContentsIterable = new Iterable() {
+ public Iterator<ByteBuffer> iterator() {
+ return new Iterator() {
+ private Iterator<ByteBuffer> iter = _contents.iterator();
+ public boolean hasNext() {
+ return iter.hasNext();
+ }
+ public ByteBuffer next() {
+ return iter.next().duplicate();
+ }
+ public void remove() {
+ throw new UnsupportedOperationException();
+ }
+ };
+ }
+ };
+
private boolean _redelivered;
private final long _messageId;
@@ -202,7 +220,7 @@
}
public String getCorrelationId() {
- throw new Error("XXX");
+ return _transferBody.getCorrelationId();
}
public void setPriority(byte priority) {
@@ -210,7 +228,7 @@
}
public byte getPriority() {
- throw new Error("XXX");
+ return (byte) _transferBody.getPriority();
}
public void setExpiration(long l) {
@@ -218,7 +236,7 @@
}
public long getExpiration() {
- throw new Error("XXX");
+ return _transferBody.getExpiration();
}
public void setTimestamp(long l) {
@@ -226,19 +244,24 @@
}
public long getTimestamp() {
- throw new Error("XXX");
+ return _transferBody.getTimestamp();
}
public String getContentType() {
- throw new Error("XXX");
+ return _transferBody.getContentType();
}
public String getEncoding() {
- throw new Error("XXX");
+ return _transferBody.getContentEncoding();
}
public byte[] getMessageBytes() {
- throw new Error("XXX");
+ byte[] result = new byte[(int) getBodySize()];
+ int offset = 0;
+ for (ByteBuffer bb : getContents()) {
+ bb.get(result, offset, bb.remaining());
+ }
+ return result;
}
public void storeMessage() throws AMQException
@@ -254,8 +277,8 @@
return _transferBody;
}
- public List<ByteBuffer> getContents() {
- return _contents;
+ public Iterable<ByteBuffer> getContents() {
+ return _dupContentsIterable;
}
public List<AMQBody> getPayload() {
Modified: incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java?view=diff&rev=501099&r1=501098&r2=501099
==============================================================================
--- incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java (original)
+++ incubator/qpid/branches/qpid.0-9/java/broker/src/main/java/org/apache/qpid/server/queue/AMQQueueMBean.java Mon Jan 29 08:41:50 2007
@@ -274,8 +274,13 @@
{
throw new OperationsException("AMQMessage with message id = " + msgId + " is not in the " + _queueName);
}
+
// get message content
- byte[] msgContent = msg.getMessageBytes();
+ byte[] bytes = msg.getMessageBytes();
+ Byte[] msgContent = new Byte[bytes.length];
+ for (int i = 0; i < bytes.length; i++) {
+ msgContent[i] = Byte.valueOf(bytes[i]);
+ }
// Create header attributes list
String mimeType = msg.getContentType();