You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2011/12/28 14:02:48 UTC
svn commit: r1225178 [6/8] - in /qpid/trunk/qpid/java: ./ bdbstore/src/main/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/
bdbstore/src/main/java/org/apache/qpid/server/store/berkeleydb/tuples/
bdbstore/src/test/ bdbstore/src/test/jav...
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.client.protocol;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
import java.io.OutputStream;
@@ -409,10 +410,10 @@ public class AMQProtocolHandler implemen
final ArrayList<AMQDataBlock> dataBlocks = _codecFactory.getDecoder().decodeBuffer(msg);
// Decode buffer
-
- for (AMQDataBlock message : dataBlocks)
+ int size = dataBlocks.size();
+ for (int i = 0; i < size; i++)
{
-
+ AMQDataBlock message = dataBlocks.get(i);
if (PROTOCOL_DEBUG)
{
_protocolLogger.info(String.format("RECV: [%s] %s", this, message));
@@ -420,10 +421,10 @@ public class AMQProtocolHandler implemen
if(message instanceof AMQFrame)
{
- final boolean debug = _logger.isDebugEnabled();
+
final long msgNumber = ++_messageReceivedCount;
- if (debug && ((msgNumber % 1000) == 0))
+ if (((msgNumber % 1000) == 0) && _logger.isDebugEnabled())
{
_logger.debug("Received " + _messageReceivedCount + " protocol messages");
}
@@ -514,12 +515,20 @@ public class AMQProtocolHandler implemen
return getStateManager().createWaiter(states);
}
- public synchronized void writeFrame(AMQDataBlock frame)
+ public void writeFrame(AMQDataBlock frame)
+ {
+ writeFrame(frame, true);
+ }
+
+ public synchronized void writeFrame(AMQDataBlock frame, boolean flush)
{
final ByteBuffer buf = asByteBuffer(frame);
_writtenBytes += buf.remaining();
_sender.send(buf);
- _sender.flush();
+ if(flush)
+ {
+ _sender.flush();
+ }
if (PROTOCOL_DEBUG)
{
@@ -539,35 +548,51 @@ public class AMQProtocolHandler implemen
}
+ private static final int REUSABLE_BYTE_BUFFER_CAPACITY = 65 * 1024;
+ private final byte[] _reusableBytes = new byte[REUSABLE_BYTE_BUFFER_CAPACITY];
+ private final ByteBuffer _reusableByteBuffer = ByteBuffer.wrap(_reusableBytes);
+ private final BytesDataOutput _reusableDataOutput = new BytesDataOutput(_reusableBytes);
+
private ByteBuffer asByteBuffer(AMQDataBlock block)
{
- final ByteBuffer buf = ByteBuffer.allocate((int) block.getSize());
+ final int size = (int) block.getSize();
- try
- {
- block.writePayload(new DataOutputStream(new OutputStream()
- {
+ final byte[] data;
- @Override
- public void write(int b) throws IOException
- {
- buf.put((byte) b);
- }
+ if(size > REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ data= new byte[size];
+ }
+ else
+ {
- @Override
- public void write(byte[] b, int off, int len) throws IOException
- {
- buf.put(b, off, len);
- }
- }));
+ data = _reusableBytes;
+ }
+ _reusableDataOutput.setBuffer(data);
+
+ try
+ {
+ block.writePayload(_reusableDataOutput);
}
catch (IOException e)
{
throw new RuntimeException(e);
}
- buf.flip();
+ final ByteBuffer buf;
+
+ if(size < REUSABLE_BYTE_BUFFER_CAPACITY)
+ {
+ buf = _reusableByteBuffer;
+ buf.position(0);
+ }
+ else
+ {
+ buf = ByteBuffer.wrap(data);
+ }
+ buf.limit(_reusableDataOutput.length());
+
return buf;
}
@@ -840,4 +865,160 @@ public class AMQProtocolHandler implemen
return _suggestedProtocolVersion;
}
+ private static class BytesDataOutput implements DataOutput
+ {
+ int _pos = 0;
+ byte[] _buf;
+
+ public BytesDataOutput(byte[] buf)
+ {
+ _buf = buf;
+ }
+
+ public void setBuffer(byte[] buf)
+ {
+ _buf = buf;
+ _pos = 0;
+ }
+
+ public void reset()
+ {
+ _pos = 0;
+ }
+
+ public int length()
+ {
+ return _pos;
+ }
+
+ public void write(int b)
+ {
+ _buf[_pos++] = (byte) b;
+ }
+
+ public void write(byte[] b)
+ {
+ System.arraycopy(b, 0, _buf, _pos, b.length);
+ _pos+=b.length;
+ }
+
+
+ public void write(byte[] b, int off, int len)
+ {
+ System.arraycopy(b, off, _buf, _pos, len);
+ _pos+=len;
+
+ }
+
+ public void writeBoolean(boolean v)
+ {
+ _buf[_pos++] = v ? (byte) 1 : (byte) 0;
+ }
+
+ public void writeByte(int v)
+ {
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeShort(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeChar(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeInt(int v)
+ {
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+
+ public void writeLong(long v)
+ {
+ _buf[_pos++] = (byte) (v >>> 56);
+ _buf[_pos++] = (byte) (v >>> 48);
+ _buf[_pos++] = (byte) (v >>> 40);
+ _buf[_pos++] = (byte) (v >>> 32);
+ _buf[_pos++] = (byte) (v >>> 24);
+ _buf[_pos++] = (byte) (v >>> 16);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte)v;
+ }
+
+ public void writeFloat(float v)
+ {
+ writeInt(Float.floatToIntBits(v));
+ }
+
+ public void writeDouble(double v)
+ {
+ writeLong(Double.doubleToLongBits(v));
+ }
+
+ public void writeBytes(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ _buf[_pos++] = ((byte)s.charAt(i));
+ }
+ }
+
+ public void writeChars(String s)
+ {
+ int len = s.length();
+ for (int i = 0 ; i < len ; i++)
+ {
+ int v = s.charAt(i);
+ _buf[_pos++] = (byte) (v >>> 8);
+ _buf[_pos++] = (byte) v;
+ }
+ }
+
+ public void writeUTF(String s)
+ {
+ int strlen = s.length();
+
+ int pos = _pos;
+ _pos+=2;
+
+
+ for (int i = 0; i < strlen; i++)
+ {
+ int c = s.charAt(i);
+ if ((c >= 0x0001) && (c <= 0x007F))
+ {
+ c = s.charAt(i);
+ _buf[_pos++] = (byte) c;
+
+ }
+ else if (c > 0x07FF)
+ {
+ _buf[_pos++] = (byte) (0xE0 | ((c >> 12) & 0x0F));
+ _buf[_pos++] = (byte) (0x80 | ((c >> 6) & 0x3F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ else
+ {
+ _buf[_pos++] = (byte) (0xC0 | ((c >> 6) & 0x1F));
+ _buf[_pos++] = (byte) (0x80 | (c & 0x3F));
+ }
+ }
+
+ int len = _pos - (pos + 2);
+
+ _buf[pos++] = (byte) (len >>> 8);
+ _buf[pos] = (byte) len;
+ }
+
+ }
+
+
}
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/nclient/util/MessagePartListenerAdapter.java Wed Dec 28 13:02:41 2011
@@ -49,19 +49,9 @@ public class MessagePartListenerAdapter
{
_currentMsg = new ByteBufferMessage(xfr.getId());
- for (Struct st : xfr.getHeader().getStructs())
- {
- if(st instanceof DeliveryProperties)
- {
- _currentMsg.setDeliveryProperties((DeliveryProperties)st);
-
- }
- else if(st instanceof MessageProperties)
- {
- _currentMsg.setMessageProperties((MessageProperties)st);
- }
-
- }
+ Header header = xfr.getHeader();
+ _currentMsg.setDeliveryProperties(header.getDeliveryProperties());
+ _currentMsg.setMessageProperties(header.getMessageProperties());
ByteBuffer body = xfr.getBody();
Propchange: qpid/trunk/qpid/java/client/src/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/client/BasicMessageConsumer_0_8_Test.java Wed Dec 28 13:02:41 2011
@@ -46,8 +46,9 @@ public class BasicMessageConsumer_0_8_Te
AMQBindingURL burl = new AMQBindingURL(url);
AMQDestination queue = new AMQQueue(burl);
- AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
- BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ TestAMQSession testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer =
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.SERVER, consumer.getRejectBehaviour());
}
@@ -65,8 +66,9 @@ public class BasicMessageConsumer_0_8_Te
final AMQBindingURL burl = new AMQBindingURL(url);
final AMQDestination queue = new AMQQueue(burl);
- final AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
- final BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ final TestAMQSession testSession = new TestAMQSession(conn);
+ final BasicMessageConsumer_0_8 consumer =
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
@@ -90,8 +92,9 @@ public class BasicMessageConsumer_0_8_Te
assertNull("Reject behaviour should have been null", queue.getRejectBehaviour());
- AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8> testSession = new TestAMQSession(conn);
- BasicMessageConsumer_0_8 consumer = new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
+ TestAMQSession testSession = new TestAMQSession(conn);
+ BasicMessageConsumer_0_8 consumer =
+ new BasicMessageConsumer_0_8(0, conn, queue, "", false, null, testSession, null, null, 10, 5, false, Session.SESSION_TRANSACTED, false, false);
assertEquals("Reject behaviour was was not as expected", RejectBehaviour.NORMAL, consumer.getRejectBehaviour());
}
Propchange: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Wed Dec 28 13:02:41 2011
@@ -29,12 +29,7 @@ import javax.jms.Topic;
import javax.jms.TopicSubscriber;
import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BasicMessageConsumer_0_8;
-import org.apache.qpid.client.BasicMessageProducer_0_8;
-import org.apache.qpid.client.MockAMQConnection;
+import org.apache.qpid.client.*;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
@@ -42,7 +37,7 @@ import org.apache.qpid.filter.MessageFil
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
+public class TestAMQSession extends AMQSession_0_8
{
public TestAMQSession(AMQConnection connection)
@@ -92,7 +87,7 @@ public class TestAMQSession extends AMQS
return null;
}
- protected void sendRecover() throws AMQException, FailoverException
+ public void sendRecover() throws AMQException, FailoverException
{
}
Propchange: qpid/trunk/qpid/java/client/test/
------------------------------------------------------------------------------
--- svn:ignore (original)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -1 +1,2 @@
+*.iml
TEST-org.apache.qpid.client.AllClientUnitTests.txt
Modified: qpid/trunk/qpid/java/common/Composite.tpl
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/Composite.tpl?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/Composite.tpl (original)
+++ qpid/trunk/qpid/java/common/Composite.tpl Wed Dec 28 13:02:41 2011
@@ -245,6 +245,11 @@ if segments:
return this;
}
+ public int getBodySize()
+ {
+ return this.body == null ? 0 : this.body.remaining();
+ }
+
public final ByteBuffer getBody() {
if (this.body == null)
{
Propchange: qpid/trunk/qpid/java/common/src/main/
------------------------------------------------------------------------------
--- svn:ignore (added)
+++ svn:ignore Wed Dec 28 13:02:41 2011
@@ -0,0 +1,2 @@
+*.iml
+
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Wed Dec 28 13:02:41 2011
@@ -24,12 +24,7 @@ import java.io.*;
import java.nio.ByteBuffer;
import java.util.*;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQDataBlockDecoder;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQMethodBodyFactory;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.ProtocolInitiation;
+import org.apache.qpid.framing.*;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -193,24 +188,41 @@ public class AMQDecoder
}
}
+ private static class SimpleDataInputStream extends DataInputStream implements MarkableDataInput
+ {
+ public SimpleDataInputStream(InputStream in)
+ {
+ super(in);
+ }
+
+ public AMQShortString readAMQShortString() throws IOException
+ {
+ return EncodingUtils.readAMQShortString(this);
+ }
+
+ }
+
public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
// get prior remaining data from accumulator
ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
- DataInputStream msg;
+ MarkableDataInput msg;
- ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
+ ByteArrayInputStream bais;
+ DataInput di;
if(!_remainingBufs.isEmpty())
{
+ bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
_remainingBufs.add(bais);
- msg = new DataInputStream(new RemainingByteArrayInputStream());
+ msg = new SimpleDataInputStream(new RemainingByteArrayInputStream());
}
else
{
- msg = new DataInputStream(bais);
+ bais = null;
+ msg = new ByteArrayDataInput(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
}
boolean enoughData = true;
@@ -245,11 +257,24 @@ public class AMQDecoder
iterator.remove();
}
}
- if(bais.available()!=0)
+
+ if(bais == null)
+ {
+ if(msg.available()!=0)
+ {
+ byte[] remaining = new byte[msg.available()];
+ msg.read(remaining);
+ _remainingBufs.add(new ByteArrayInputStream(remaining));
+ }
+ }
+ else
{
- byte[] remaining = new byte[bais.available()];
- bais.read(remaining);
- _remainingBufs.add(new ByteArrayInputStream(remaining));
+ if(bais.available()!=0)
+ {
+ byte[] remaining = new byte[bais.available()];
+ bais.read(remaining);
+ _remainingBufs.add(new ByteArrayInputStream(remaining));
+ }
}
}
}
Added: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java?rev=1225178&view=auto
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java (added)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/codec/MarkableDataInput.java Wed Dec 28 13:02:41 2011
@@ -0,0 +1,21 @@
+package org.apache.qpid.codec;
+
+import org.apache.qpid.framing.AMQShortString;
+
+import java.io.DataInput;
+import java.io.IOException;
+
+public interface MarkableDataInput extends DataInput
+{
+ public void mark(int pos);
+ public void reset() throws IOException;
+
+ int available() throws IOException;
+
+ long skip(long i) throws IOException;
+
+ int read(byte[] b) throws IOException;
+
+ public AMQShortString readAMQShortString() throws IOException;
+
+}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -36,7 +37,7 @@ public interface AMQBody
*/
public abstract int getSize();
- public void writePayload(DataOutputStream buffer) throws IOException;
+ public void writePayload(DataOutput buffer) throws IOException;
void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Wed Dec 28 13:02:41 2011
@@ -21,6 +21,7 @@
package org.apache.qpid.framing;
import java.io.DataInputStream;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -42,6 +43,6 @@ public abstract class AMQDataBlock imple
* Writes the datablock to the specified buffer.
* @param buffer
*/
- public abstract void writePayload(DataOutputStream buffer) throws IOException;
+ public abstract void writePayload(DataOutput buffer) throws IOException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Wed Dec 28 13:02:41 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import org.apache.qpid.codec.MarkableDataInput;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -43,7 +44,7 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
+ public boolean decodable(MarkableDataInput in) throws AMQFrameDecodingException, IOException
{
final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
@@ -65,7 +66,7 @@ public class AMQDataBlockDecoder
}
- public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, MarkableDataInput in)
throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
final byte type = in.readByte();
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,10 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.qpid.codec.MarkableDataInput;
+
+import java.io.*;
+import java.io.DataOutput;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -38,7 +39,7 @@ public class AMQFrame extends AMQDataBlo
_bodyFrame = bodyFrame;
}
- public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
+ public AMQFrame(final MarkableDataInput in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
{
this._channel = channel;
this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -55,7 +56,7 @@ public class AMQFrame extends AMQDataBlo
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
buffer.writeByte(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
@@ -79,7 +80,7 @@ public class AMQFrame extends AMQDataBlo
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
- public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
+ public static void writeFrame(DataOutput buffer, final int channel, AMQBody body) throws IOException
{
buffer.writeByte(body.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
@@ -89,7 +90,7 @@ public class AMQFrame extends AMQDataBlo
}
- public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
+ public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
{
buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
@@ -104,7 +105,7 @@ public class AMQFrame extends AMQDataBlo
}
- public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
+ public static void writeFrames(DataOutput buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
{
buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Wed Dec 28 13:02:41 2011
@@ -25,6 +25,7 @@ import org.apache.qpid.AMQConnectionExce
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import java.io.DataOutput;
import java.io.DataOutputStream;
import java.io.IOException;
@@ -45,12 +46,12 @@ public interface AMQMethodBody extends A
/** @return unsigned short */
public int getMethod();
- public void writeMethodPayload(DataOutputStream buffer) throws IOException;
+ public void writeMethodPayload(DataOutput buffer) throws IOException;
public int getSize();
- public void writePayload(DataOutputStream buffer) throws IOException;
+ public void writePayload(DataOutput buffer) throws IOException;
//public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,11 +20,13 @@
*/
package org.apache.qpid.framing;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInput;
import java.io.DataInputStream;
import java.io.IOException;
@@ -39,7 +41,7 @@ public class AMQMethodBodyFactory implem
_protocolSession = protocolSession;
}
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+ public AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException
{
return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Wed Dec 28 13:02:41 2011
@@ -24,11 +24,12 @@ package org.apache.qpid.framing;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.codec.MarkableDataInput;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
public abstract class AMQMethodBodyImpl implements AMQMethodBody
@@ -101,7 +102,7 @@ public abstract class AMQMethodBodyImpl
return 2 + 2 + getBodySize();
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(DataOutput buffer) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, getClazz());
EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -109,14 +110,15 @@ public abstract class AMQMethodBodyImpl
}
- protected byte readByte(DataInputStream buffer) throws IOException
+ protected byte readByte(DataInput buffer) throws IOException
{
return buffer.readByte();
}
- protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+ protected AMQShortString readAMQShortString(MarkableDataInput buffer) throws IOException
{
- return EncodingUtils.readAMQShortString(buffer);
+ AMQShortString str = buffer.readAMQShortString();
+ return str == null ? null : str.intern(false);
}
protected int getSizeOf(AMQShortString string)
@@ -124,27 +126,27 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedShortStringLength(string);
}
- protected void writeByte(DataOutputStream buffer, byte b) throws IOException
+ protected void writeByte(DataOutput buffer, byte b) throws IOException
{
buffer.writeByte(b);
}
- protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
+ protected void writeAMQShortString(DataOutput buffer, AMQShortString string) throws IOException
{
EncodingUtils.writeShortStringBytes(buffer, string);
}
- protected int readInt(DataInputStream buffer) throws IOException
+ protected int readInt(DataInput buffer) throws IOException
{
return buffer.readInt();
}
- protected void writeInt(DataOutputStream buffer, int i) throws IOException
+ protected void writeInt(DataOutput buffer, int i) throws IOException
{
buffer.writeInt(i);
}
- protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+ protected FieldTable readFieldTable(DataInput buffer) throws AMQFrameDecodingException, IOException
{
return EncodingUtils.readFieldTable(buffer);
}
@@ -154,17 +156,17 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
+ protected void writeFieldTable(DataOutput buffer, FieldTable table) throws IOException
{
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected long readLong(DataInputStream buffer) throws IOException
+ protected long readLong(DataInput buffer) throws IOException
{
return buffer.readLong();
}
- protected void writeLong(DataOutputStream buffer, long l) throws IOException
+ protected void writeLong(DataOutput buffer, long l) throws IOException
{
buffer.writeLong(l);
}
@@ -174,27 +176,27 @@ public abstract class AMQMethodBodyImpl
return (response == null) ? 4 : response.length + 4;
}
- protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+ protected void writeBytes(DataOutput buffer, byte[] data) throws IOException
{
EncodingUtils.writeBytes(buffer,data);
}
- protected byte[] readBytes(DataInputStream buffer) throws IOException
+ protected byte[] readBytes(DataInput buffer) throws IOException
{
return EncodingUtils.readBytes(buffer);
}
- protected short readShort(DataInputStream buffer) throws IOException
+ protected short readShort(DataInput buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
- protected void writeShort(DataOutputStream buffer, short s) throws IOException
+ protected void writeShort(DataOutput buffer, short s) throws IOException
{
EncodingUtils.writeShort(buffer, s);
}
- protected Content readContent(DataInputStream buffer)
+ protected Content readContent(DataInput buffer)
{
return null;
}
@@ -204,56 +206,56 @@ public abstract class AMQMethodBodyImpl
return 0;
}
- protected void writeContent(DataOutputStream buffer, Content body)
+ protected void writeContent(DataOutput buffer, Content body)
{
}
- protected byte readBitfield(DataInputStream buffer) throws IOException
+ protected byte readBitfield(DataInput buffer) throws IOException
{
return readByte(buffer);
}
- protected int readUnsignedShort(DataInputStream buffer) throws IOException
+ protected int readUnsignedShort(DataInput buffer) throws IOException
{
return buffer.readUnsignedShort();
}
- protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
+ protected void writeBitfield(DataOutput buffer, byte bitfield0) throws IOException
{
buffer.writeByte(bitfield0);
}
- protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+ protected void writeUnsignedShort(DataOutput buffer, int s) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, s);
}
- protected long readUnsignedInteger(DataInputStream buffer) throws IOException
+ protected long readUnsignedInteger(DataInput buffer) throws IOException
{
return EncodingUtils.readUnsignedInteger(buffer);
}
- protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
+ protected void writeUnsignedInteger(DataOutput buffer, long i) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, i);
}
- protected short readUnsignedByte(DataInputStream buffer) throws IOException
+ protected short readUnsignedByte(DataInput buffer) throws IOException
{
return (short) buffer.readUnsignedByte();
}
- protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
+ protected void writeUnsignedByte(DataOutput buffer, short unsignedByte) throws IOException
{
EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
}
- protected long readTimestamp(DataInputStream buffer) throws IOException
+ protected long readTimestamp(DataInput buffer) throws IOException
{
return EncodingUtils.readTimestamp(buffer);
}
- protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
+ protected void writeTimestamp(DataOutput buffer, long t) throws IOException
{
EncodingUtils.writeTimestamp(buffer, t);
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Wed Dec 28 13:02:41 2011
@@ -21,11 +21,12 @@
package org.apache.qpid.framing;
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
+
import java.io.IOException;
public abstract interface AMQMethodBodyInstanceFactory
{
- public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
+ public AMQMethodBody newInstance(MarkableDataInput buffer, long size) throws AMQFrameDecodingException, IOException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Wed Dec 28 13:02:41 2011
@@ -24,8 +24,9 @@ package org.apache.qpid.framing;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import java.util.*;
import java.lang.ref.WeakReference;
@@ -93,22 +94,44 @@ public final class AMQShortString implem
private AMQShortString substring(final int from, final int to)
{
- return new AMQShortString(_data, from+_offset, to+_offset);
+ return new AMQShortString(_data, from+_offset, to-from);
}
- private static final ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>> _localInternMap =
- new ThreadLocal<Map<AMQShortString, WeakReference<AMQShortString>>>()
+ private static final int LOCAL_INTERN_CACHE_SIZE = 2048;
+
+ private static final ThreadLocal<Map<AMQShortString, AMQShortString>> _localInternMap =
+ new ThreadLocal<Map<AMQShortString, AMQShortString>>()
{
- protected Map<AMQShortString, WeakReference<AMQShortString>> initialValue()
+ protected Map<AMQShortString, AMQShortString> initialValue()
{
- return new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+ return new LinkedHashMap<AMQShortString, AMQShortString>()
+ {
+
+ protected boolean removeEldestEntry(Map.Entry<AMQShortString, AMQShortString> eldest)
+ {
+ return size() > LOCAL_INTERN_CACHE_SIZE;
+ }
+ };
};
};
private static final Map<AMQShortString, WeakReference<AMQShortString>> _globalInternMap =
new WeakHashMap<AMQShortString, WeakReference<AMQShortString>>();
+
+ private static final ThreadLocal<Map<String, WeakReference<AMQShortString>>> _localStringMap =
+ new ThreadLocal<Map<String, WeakReference<AMQShortString>>>()
+ {
+ protected Map<String, WeakReference<AMQShortString>> initialValue()
+ {
+ return new WeakHashMap<String, WeakReference<AMQShortString>>();
+ };
+ };
+
+ private static final Map<String, WeakReference<AMQShortString>> _globalStringMap =
+ new WeakHashMap<String, WeakReference<AMQShortString>>();
+
private static final Logger _logger = LoggerFactory.getLogger(AMQShortString.class);
private final byte[] _data;
@@ -200,32 +223,32 @@ public final class AMQShortString implem
}
- private AMQShortString(DataInputStream data, final int length) throws IOException
+ private AMQShortString(DataInput data, final int length) throws IOException
{
if (length > MAX_LENGTH)
{
throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
}
byte[] dataBytes = new byte[length];
- data.read(dataBytes);
+ data.readFully(dataBytes);
_data = dataBytes;
_offset = 0;
_length = length;
}
- private AMQShortString(final byte[] data, final int from, final int to)
+ public AMQShortString(byte[] data, final int offset, final int length)
{
- if (data == null)
- {
- throw new NullPointerException("Cannot create AMQShortString with null data[]");
- }
- int length = to - from;
if (length > MAX_LENGTH)
{
throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
}
- _offset = from;
+ if (data == null)
+ {
+ throw new NullPointerException("Cannot create AMQShortString with null data[]");
+ }
+
+ _offset = offset;
_length = length;
_data = data;
}
@@ -234,9 +257,7 @@ public final class AMQShortString implem
{
if(_data.length != _length)
{
- byte[] dataBytes = new byte[_length];
- System.arraycopy(_data,_offset,dataBytes,0,_length);
- return new AMQShortString(dataBytes,0,_length);
+ return copy();
}
else
{
@@ -265,7 +286,7 @@ public final class AMQShortString implem
return new CharSubSequence(start, end);
}
- public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
+ public static AMQShortString readFromBuffer(DataInput buffer) throws IOException
{
final int length = buffer.readUnsignedByte();
if (length == 0)
@@ -293,12 +314,12 @@ public final class AMQShortString implem
}
}
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+ public void writeToBuffer(DataOutput buffer) throws IOException
{
final int size = length();
//buffer.setAutoExpand(true);
- buffer.write((byte) size);
+ buffer.writeByte(size);
buffer.write(_data, _offset, size);
}
@@ -420,7 +441,17 @@ public final class AMQShortString implem
{
if (_asString == null)
{
- _asString = new String(asChars());
+ AMQShortString intern = intern();
+
+ if(intern == this)
+ {
+ _asString = new String(asChars());
+ }
+ else
+ {
+ _asString = intern.asString();
+ }
+
}
return _asString;
}
@@ -609,42 +640,51 @@ public final class AMQShortString implem
public AMQShortString intern()
{
+ return intern(true);
+ }
+
+ public AMQShortString intern(boolean keep)
+ {
hashCode();
- Map<AMQShortString, WeakReference<AMQShortString>> localMap =
+ Map<AMQShortString, AMQShortString> localMap =
_localInternMap.get();
- WeakReference<AMQShortString> ref = localMap.get(this);
- AMQShortString internString;
+ AMQShortString internString = localMap.get(this);
- if(ref != null)
+
+ if(internString != null)
{
- internString = ref.get();
- if(internString != null)
- {
- return internString;
- }
+ return internString;
}
+ WeakReference<AMQShortString> ref;
synchronized(_globalInternMap)
{
ref = _globalInternMap.get(this);
if((ref == null) || ((internString = ref.get()) == null))
{
- internString = shrink();
+ internString = keep ? shrink() : copy();
ref = new WeakReference(internString);
_globalInternMap.put(internString, ref);
}
}
- localMap.put(internString, ref);
+ localMap.put(internString, internString);
return internString;
}
+ private AMQShortString copy()
+ {
+ byte[] dataBytes = new byte[_length];
+ System.arraycopy(_data,_offset,dataBytes,0,_length);
+ return new AMQShortString(dataBytes,0,_length);
+ }
+
private int occurences(final byte delim)
{
int count = 0;
@@ -761,7 +801,46 @@ public final class AMQShortString implem
public static AMQShortString valueOf(Object obj)
{
- return obj == null ? null : new AMQShortString(String.valueOf(obj));
+ return obj == null ? null : AMQShortString.valueOf(String.valueOf(obj));
+ }
+
+ public static AMQShortString valueOf(String obj)
+ {
+ if(obj == null)
+ {
+ return null;
+ }
+
+ Map<String, WeakReference<AMQShortString>> localMap =
+ _localStringMap.get();
+
+ WeakReference<AMQShortString> ref = localMap.get(obj);
+ AMQShortString internString;
+
+ if(ref != null)
+ {
+ internString = ref.get();
+ if(internString != null)
+ {
+ return internString;
+ }
+ }
+
+
+ synchronized(_globalStringMap)
+ {
+
+ ref = _globalStringMap.get(obj);
+ if((ref == null) || ((internString = ref.get()) == null))
+ {
+ internString = (new AMQShortString(obj)).intern();
+ ref = new WeakReference<AMQShortString>(internString);
+ _globalStringMap.put(obj, ref);
+ }
+
+ }
+ localMap.put(obj, ref);
+ return internString;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataInput;
+import java.io.DataOutput;
import java.io.IOException;
import java.math.BigDecimal;
@@ -61,12 +61,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -107,12 +107,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readUnsignedInteger(buffer);
}
@@ -138,7 +138,7 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
BigDecimal bd = (BigDecimal) value;
@@ -151,7 +151,7 @@ public enum AMQType
EncodingUtils.writeInteger(buffer, unscaled);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
byte places = EncodingUtils.readByte(buffer);
@@ -183,12 +183,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -247,7 +247,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
// Ensure that the value is a FieldTable.
if (!(value instanceof FieldTable))
@@ -268,7 +268,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
try
{
@@ -302,10 +302,10 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer)
+ public void writeValueImpl(Object value, DataOutput buffer)
{ }
- public Object readValueFromBuffer(DataInputStream buffer)
+ public Object readValueFromBuffer(DataInput buffer)
{
return null;
}
@@ -331,12 +331,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongstr(buffer);
}
@@ -361,12 +361,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -392,12 +392,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLongString(buffer);
}
@@ -427,12 +427,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readBoolean(buffer);
}
@@ -462,12 +462,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeChar(buffer, (Character) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readChar(buffer);
}
@@ -497,12 +497,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeByte(buffer, (Byte) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readByte(buffer);
}
@@ -536,12 +536,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeShort(buffer, (Short) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
@@ -578,12 +578,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeInteger(buffer, (Integer) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readInteger(buffer);
}
@@ -596,6 +596,22 @@ public enum AMQType
return EncodingUtils.encodedLongLength();
}
+ public int getEncodingSize(long value)
+ {
+ return EncodingUtils.encodedLongLength();
+ }
+
+ public AMQTypedValue asTypedValue(long value)
+ {
+ return AMQTypedValue.createAMQTypedValue(value);
+ }
+
+ public void writeToBuffer(long value, DataOutput buffer) throws IOException
+ {
+ buffer.writeByte(identifier());
+ EncodingUtils.writeLong(buffer, value);
+ }
+
public Object toNativeValue(Object value)
{
if (value instanceof Long)
@@ -625,12 +641,18 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public long readLongFromBuffer(DataInput buffer) throws IOException
+ {
+ return EncodingUtils.readLong(buffer);
+ }
+
+
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readLong(buffer);
}
@@ -660,12 +682,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeFloat(buffer, (Float) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readFloat(buffer);
}
@@ -699,12 +721,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, DataOutput buffer) throws IOException
{
EncodingUtils.writeDouble(buffer, (Double) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(DataInput buffer) throws IOException
{
return EncodingUtils.readDouble(buffer);
}
@@ -761,7 +783,7 @@ public enum AMQType
*/
public AMQTypedValue asTypedValue(Object value)
{
- return new AMQTypedValue(this, toNativeValue(value));
+ return AMQTypedValue.createAMQTypedValue(this, toNativeValue(value));
}
/**
@@ -771,7 +793,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
+ public void writeToBuffer(Object value, DataOutput buffer) throws IOException
{
buffer.writeByte(identifier());
writeValueImpl(value, buffer);
@@ -783,7 +805,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
+ abstract void writeValueImpl(Object value, DataOutput buffer) throws IOException;
/**
* Reads an instance of the type from a specified byte buffer.
@@ -792,5 +814,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
+ abstract Object readValueFromBuffer(DataInput buffer) throws IOException;
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Wed Dec 28 13:02:41 2011
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import java.io.*;
import java.util.Date;
import java.util.Map;
import java.math.BigDecimal;
@@ -42,81 +40,218 @@ import java.math.BigDecimal;
* <tr><td> Extract the value from a fully typed AMQP value.
* </table>
*/
-public class AMQTypedValue
+public abstract class AMQTypedValue
{
- /** The type of the value. */
- private final AMQType _type;
- /** The Java native representation of the AMQP typed value. */
- private final Object _value;
+ public abstract AMQType getType();
- public AMQTypedValue(AMQType type, Object value)
+ public abstract Object getValue();
+
+ public abstract void writeToBuffer(DataOutput buffer) throws IOException;
+
+ public abstract int getEncodingSize();
+
+
+ private static final class GenericTypedValue extends AMQTypedValue
{
- if (type == null)
+ /** The type of the value. */
+ private final AMQType _type;
+
+ /** The Java native representation of the AMQP typed value. */
+ private final Object _value;
+
+ private GenericTypedValue(AMQType type, Object value)
{
- throw new NullPointerException("Cannot create a typed value with null type");
+ if (type == null)
+ {
+ throw new NullPointerException("Cannot create a typed value with null type");
+ }
+
+ _type = type;
+ _value = type.toNativeValue(value);
}
- _type = type;
- _value = type.toNativeValue(value);
- }
+ private GenericTypedValue(AMQType type, DataInput buffer) throws IOException
+ {
+ _type = type;
+ _value = type.readValueFromBuffer(buffer);
+ }
+
+
+ public AMQType getType()
+ {
+ return _type;
+ }
+
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ public void writeToBuffer(DataOutput buffer) throws IOException
+ {
+ _type.writeToBuffer(_value, buffer);
+ }
+
+ public int getEncodingSize()
+ {
+ return _type.getEncodingSize(_value);
+ }
+
+
+ public String toString()
+ {
+ return "[" + getType() + ": " + getValue() + "]";
+ }
+
+
+ public boolean equals(Object o)
+ {
+ if(o instanceof GenericTypedValue)
+ {
+ GenericTypedValue other = (GenericTypedValue) o;
+ return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
+ }
+ else
+ {
+ return false;
+ }
+ }
+
+ public int hashCode()
+ {
+ return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
+ }
- private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
- {
- _type = type;
- _value = type.readValueFromBuffer(buffer);
}
- public AMQType getType()
+ private static final class LongTypedValue extends AMQTypedValue
{
- return _type;
+
+ private final long _value;
+
+ private LongTypedValue(long value)
+ {
+ _value = value;
+ }
+
+ public LongTypedValue(DataInput buffer) throws IOException
+ {
+ _value = EncodingUtils.readLong(buffer);
+ }
+
+ public AMQType getType()
+ {
+ return AMQType.LONG;
+ }
+
+
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ public void writeToBuffer(DataOutput buffer) throws IOException
+ {
+ EncodingUtils.writeByte(buffer,AMQType.LONG.identifier());
+ EncodingUtils.writeLong(buffer,_value);
+ }
+
+
+ public int getEncodingSize()
+ {
+ return EncodingUtils.encodedLongLength();
+ }
}
- public Object getValue()
+ private static final class IntTypedValue extends AMQTypedValue
{
- return _value;
+
+ private final int _value;
+
+ public IntTypedValue(int value)
+ {
+ _value = value;
+ }
+
+ public AMQType getType()
+ {
+ return AMQType.INT;
+ }
+
+
+ public Object getValue()
+ {
+ return _value;
+ }
+
+ public void writeToBuffer(DataOutput buffer) throws IOException
+ {
+ EncodingUtils.writeByte(buffer,AMQType.INT.identifier());
+ EncodingUtils.writeInteger(buffer, _value);
+ }
+
+
+ public int getEncodingSize()
+ {
+ return EncodingUtils.encodedIntegerLength();
+ }
}
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+
+ public static AMQTypedValue readFromBuffer(DataInput buffer) throws IOException
{
- _type.writeToBuffer(_value, buffer);
+ AMQType type = AMQTypeMap.getType(buffer.readByte());
+
+ switch(type)
+ {
+ case LONG:
+ return new LongTypedValue(buffer);
+
+ case INT:
+ int value = EncodingUtils.readInteger(buffer);
+ return createAMQTypedValue(value);
+
+ default:
+ return new GenericTypedValue(type, buffer);
+ }
+
}
- public int getEncodingSize()
+ private static final AMQTypedValue[] INT_VALUES = new AMQTypedValue[16];
+ static
{
- return _type.getEncodingSize(_value);
+ for(int i = 0 ; i < 16; i ++)
+ {
+ INT_VALUES[i] = new IntTypedValue(i);
+ }
}
- public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
+ public static AMQTypedValue createAMQTypedValue(int i)
{
- AMQType type = AMQTypeMap.getType(buffer.readByte());
-
- return new AMQTypedValue(type, buffer);
+ return (i & 0x0f) == i ? INT_VALUES[i] : new IntTypedValue(i);
}
- public String toString()
+
+ public static AMQTypedValue createAMQTypedValue(long value)
{
- return "[" + getType() + ": " + getValue() + "]";
+ return new LongTypedValue(value);
}
-
- public boolean equals(Object o)
+ public static AMQTypedValue createAMQTypedValue(AMQType type, Object value)
{
- if(o instanceof AMQTypedValue)
+ switch(type)
{
- AMQTypedValue other = (AMQTypedValue) o;
- return _type == other._type && (_value == null ? other._value == null : _value.equals(other._value));
- }
- else
- {
- return false;
+ case LONG:
+ return new LongTypedValue((Long) AMQType.LONG.toNativeValue(value));
+ case INT:
+ return new IntTypedValue((Integer) AMQType.INT.toNativeValue(value));
+
+ default:
+ return new GenericTypedValue(type, value);
}
}
- public int hashCode()
- {
- return _type.hashCode() ^ (_value == null ? 0 : _value.hashCode());
- }
public static AMQTypedValue toTypedValue(Object val)
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BasicContentHeaderProperties.java Wed Dec 28 13:02:41 2011
@@ -20,8 +20,9 @@
*/
package org.apache.qpid.framing;
+import java.io.DataInput;
import java.io.DataInputStream;
-import java.io.DataOutputStream;
+import java.io.DataOutput;
import java.io.IOException;
import org.slf4j.Logger;
@@ -80,6 +81,7 @@ public class BasicContentHeaderPropertie
private static final int USER_ID_MASK = 1 << 4;
private static final int APPLICATION_ID_MASK = 1 << 3;
private static final int CLUSTER_ID_MASK = 1 << 2;
+ private byte[] _encodedForm;
public BasicContentHeaderProperties()
@@ -87,6 +89,12 @@ public class BasicContentHeaderPropertie
public int getPropertyListSize()
{
+ if(_encodedForm != null && (_headers == null || _headers.isClean()))
+ {
+ return _encodedForm.length;
+ }
+ else
+ {
int size = 0;
if ((_propertyFlags & (CONTENT_TYPE_MASK)) > 0)
@@ -167,6 +175,7 @@ public class BasicContentHeaderPropertie
}
return size;
+ }
}
public void setPropertyFlags(int propertyFlags)
@@ -179,87 +188,94 @@ public class BasicContentHeaderPropertie
return _propertyFlags;
}
- public void writePropertyListPayload(DataOutputStream buffer) throws IOException
+ public void writePropertyListPayload(DataOutput buffer) throws IOException
{
- if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ if(_encodedForm != null && (_headers == null || !_headers.isClean()))
{
- EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ buffer.write(_encodedForm);
}
-
- if ((_propertyFlags & ENCODING_MASK) != 0)
+ else
{
- EncodingUtils.writeShortStringBytes(buffer, _encoding);
- }
+ if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _contentType);
+ }
- if ((_propertyFlags & HEADERS_MASK) != 0)
- {
- EncodingUtils.writeFieldTableBytes(buffer, _headers);
- }
+ if ((_propertyFlags & ENCODING_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _encoding);
+ }
- if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
- {
- buffer.writeByte(_deliveryMode);
- }
+ if ((_propertyFlags & HEADERS_MASK) != 0)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, _headers);
+ }
- if ((_propertyFlags & PRIORITY_MASK) != 0)
- {
- buffer.writeByte(_priority);
- }
+ if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
+ {
+ buffer.writeByte(_deliveryMode);
+ }
- if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _correlationId);
- }
+ if ((_propertyFlags & PRIORITY_MASK) != 0)
+ {
+ buffer.writeByte(_priority);
+ }
- if ((_propertyFlags & REPLY_TO_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _replyTo);
- }
+ if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _correlationId);
+ }
- if ((_propertyFlags & EXPIRATION_MASK) != 0)
- {
- if (_expiration == 0L)
+ if ((_propertyFlags & REPLY_TO_MASK) != 0)
{
- EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+ EncodingUtils.writeShortStringBytes(buffer, _replyTo);
}
- else
+
+ if ((_propertyFlags & EXPIRATION_MASK) != 0)
{
- EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ if (_expiration == 0L)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, ZERO_STRING);
+ }
+ else
+ {
+ EncodingUtils.writeShortStringBytes(buffer, String.valueOf(_expiration));
+ }
}
- }
- if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _messageId);
- }
+ if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _messageId);
+ }
- if ((_propertyFlags & TIMESTAMP_MASK) != 0)
- {
- EncodingUtils.writeTimestamp(buffer, _timestamp);
- }
+ if ((_propertyFlags & TIMESTAMP_MASK) != 0)
+ {
+ EncodingUtils.writeTimestamp(buffer, _timestamp);
+ }
- if ((_propertyFlags & TYPE_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _type);
- }
+ if ((_propertyFlags & TYPE_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _type);
+ }
- if ((_propertyFlags & USER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _userId);
- }
+ if ((_propertyFlags & USER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _userId);
+ }
- if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _appId);
- }
+ if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _appId);
+ }
- if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
- {
- EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, _clusterId);
+ }
}
}
- public void populatePropertiesFromBuffer(DataInputStream buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
+ public void populatePropertiesFromBuffer(DataInput buffer, int propertyFlags, int size) throws AMQFrameDecodingException, IOException
{
_propertyFlags = propertyFlags;
@@ -268,26 +284,40 @@ public class BasicContentHeaderPropertie
_logger.debug("Property flags: " + _propertyFlags);
}
- decode(buffer);
+ _encodedForm = new byte[size];
+ buffer.readFully(_encodedForm);
+
+ ByteArrayDataInput input = new ByteArrayDataInput(_encodedForm);
+
+ decode(input);
+
}
- private void decode(DataInputStream buffer) throws IOException, AMQFrameDecodingException
+ private void decode(ByteArrayDataInput buffer) throws IOException, AMQFrameDecodingException
{
// ByteBuffer buffer = ByteBuffer.wrap(_encodedForm);
+ int headersOffset = 0;
+
if ((_propertyFlags & (CONTENT_TYPE_MASK)) != 0)
{
- _contentType = EncodingUtils.readAMQShortString(buffer);
+ _contentType = buffer.readAMQShortString();
+ headersOffset += EncodingUtils.encodedShortStringLength(_contentType);
}
if ((_propertyFlags & ENCODING_MASK) != 0)
{
- _encoding = EncodingUtils.readAMQShortString(buffer);
+ _encoding = buffer.readAMQShortString();
+ headersOffset += EncodingUtils.encodedShortStringLength(_encoding);
}
if ((_propertyFlags & HEADERS_MASK) != 0)
{
- _headers = EncodingUtils.readFieldTable(buffer);
+ long length = EncodingUtils.readUnsignedInteger(buffer);
+
+ _headers = new FieldTable(_encodedForm, headersOffset+4, (int)length);
+
+ buffer.skipBytes((int)length);
}
if ((_propertyFlags & DELIVERY_MODE_MASK) != 0)
@@ -302,12 +332,12 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & CORRELATION_ID_MASK) != 0)
{
- _correlationId = EncodingUtils.readAMQShortString(buffer);
+ _correlationId = buffer.readAMQShortString();
}
if ((_propertyFlags & REPLY_TO_MASK) != 0)
{
- _replyTo = EncodingUtils.readAMQShortString(buffer);
+ _replyTo = buffer.readAMQShortString();
}
if ((_propertyFlags & EXPIRATION_MASK) != 0)
@@ -317,7 +347,7 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & MESSAGE_ID_MASK) != 0)
{
- _messageId = EncodingUtils.readAMQShortString(buffer);
+ _messageId = buffer.readAMQShortString();
}
if ((_propertyFlags & TIMESTAMP_MASK) != 0)
@@ -327,22 +357,22 @@ public class BasicContentHeaderPropertie
if ((_propertyFlags & TYPE_MASK) != 0)
{
- _type = EncodingUtils.readAMQShortString(buffer);
+ _type = buffer.readAMQShortString();
}
if ((_propertyFlags & USER_ID_MASK) != 0)
{
- _userId = EncodingUtils.readAMQShortString(buffer);
+ _userId = buffer.readAMQShortString();
}
if ((_propertyFlags & APPLICATION_ID_MASK) != 0)
{
- _appId = EncodingUtils.readAMQShortString(buffer);
+ _appId = buffer.readAMQShortString();
}
if ((_propertyFlags & CLUSTER_ID_MASK) != 0)
{
- _clusterId = EncodingUtils.readAMQShortString(buffer);
+ _clusterId = buffer.readAMQShortString();
}
@@ -363,11 +393,12 @@ public class BasicContentHeaderPropertie
{
_propertyFlags |= (CONTENT_TYPE_MASK);
_contentType = contentType;
+ _encodedForm = null;
}
public void setContentType(String contentType)
{
- setContentType((contentType == null) ? null : new AMQShortString(contentType));
+ setContentType((contentType == null) ? null : AMQShortString.valueOf(contentType));
}
public String getEncodingAsString()
@@ -384,13 +415,15 @@ public class BasicContentHeaderPropertie
public void setEncoding(String encoding)
{
_propertyFlags |= ENCODING_MASK;
- _encoding = (encoding == null) ? null : new AMQShortString(encoding);
+ _encoding = (encoding == null) ? null : AMQShortString.valueOf(encoding);
+ _encodedForm = null;
}
public void setEncoding(AMQShortString encoding)
{
_propertyFlags |= ENCODING_MASK;
_encoding = encoding;
+ _encodedForm = null;
}
public FieldTable getHeaders()
@@ -407,6 +440,7 @@ public class BasicContentHeaderPropertie
{
_propertyFlags |= HEADERS_MASK;
_headers = headers;
+ _encodedForm = null;
}
public byte getDeliveryMode()
@@ -418,6 +452,7 @@ public class BasicContentHeaderPropertie
{
_propertyFlags |= DELIVERY_MODE_MASK;
_deliveryMode = deliveryMode;
+ _encodedForm = null;
}
public byte getPriority()
@@ -429,6 +464,7 @@ public class BasicContentHeaderPropertie
{
_propertyFlags |= PRIORITY_MASK;
_priority = priority;
+ _encodedForm = null;
}
public AMQShortString getCorrelationId()
@@ -443,13 +479,14 @@ public class BasicContentHeaderPropertie
public void setCorrelationId(String correlationId)
{
- setCorrelationId((correlationId == null) ? null : new AMQShortString(correlationId));
+ setCorrelationId((correlationId == null) ? null : AMQShortString.valueOf(correlationId));
}
public void setCorrelationId(AMQShortString correlationId)
{
_propertyFlags |= CORRELATION_ID_MASK;
_correlationId = correlationId;
+ _encodedForm = null;
}
public String getReplyToAsString()
@@ -464,13 +501,14 @@ public class BasicContentHeaderPropertie
public void setReplyTo(String replyTo)
{
- setReplyTo((replyTo == null) ? null : new AMQShortString(replyTo));
+ setReplyTo((replyTo == null) ? null : AMQShortString.valueOf(replyTo));
}
public void setReplyTo(AMQShortString replyTo)
{
_propertyFlags |= REPLY_TO_MASK;
_replyTo = replyTo;
+ _encodedForm = null;
}
public long getExpiration()
@@ -482,6 +520,7 @@ public class BasicContentHeaderPropertie
{
_propertyFlags |= EXPIRATION_MASK;
_expiration = expiration;
+ _encodedForm = null;
}
public AMQShortString getMessageId()
@@ -498,12 +537,14 @@ public class BasicContentHeaderPropertie
{
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = (messageId == null) ? null : new AMQShortString(messageId);
+ _encodedForm = null;
}
public void setMessageId(AMQShortString messageId)
{
_propertyFlags |= MESSAGE_ID_MASK;
_messageId = messageId;
+ _encodedForm = null;
}
public long getTimestamp()
@@ -515,6 +556,7 @@ public class BasicContentHeaderPropertie
{
_propertyFlags |= TIMESTAMP_MASK;
_timestamp = timestamp;
+ _encodedForm = null;
}
public String getTypeAsString()
@@ -529,13 +571,14 @@ public class BasicContentHeaderPropertie
public void setType(String type)
{
- setType((type == null) ? null : new AMQShortString(type));
+ setType((type == null) ? null : AMQShortString.valueOf(type));
}
public void setType(AMQShortString type)
{
_propertyFlags |= TYPE_MASK;
_type = type;
+ _encodedForm = null;
}
public String getUserIdAsString()
@@ -550,13 +593,14 @@ public class BasicContentHeaderPropertie
public void setUserId(String userId)
{
- setUserId((userId == null) ? null : new AMQShortString(userId));
+ setUserId((userId == null) ? null : AMQShortString.valueOf(userId));
}
public void setUserId(AMQShortString userId)
{
_propertyFlags |= USER_ID_MASK;
_userId = userId;
+ _encodedForm = null;
}
public String getAppIdAsString()
@@ -571,13 +615,14 @@ public class BasicContentHeaderPropertie
public void setAppId(String appId)
{
- setAppId((appId == null) ? null : new AMQShortString(appId));
+ setAppId((appId == null) ? null : AMQShortString.valueOf(appId));
}
public void setAppId(AMQShortString appId)
{
_propertyFlags |= APPLICATION_ID_MASK;
_appId = appId;
+ _encodedForm = null;
}
public String getClusterIdAsString()
@@ -592,13 +637,14 @@ public class BasicContentHeaderPropertie
public void setClusterId(String clusterId)
{
- setClusterId((clusterId == null) ? null : new AMQShortString(clusterId));
+ setClusterId((clusterId == null) ? null : AMQShortString.valueOf(clusterId));
}
public void setClusterId(AMQShortString clusterId)
{
_propertyFlags |= CLUSTER_ID_MASK;
_clusterId = clusterId;
+ _encodedForm = null;
}
public String toString()
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java?rev=1225178&r1=1225177&r2=1225178&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/BodyFactory.java Wed Dec 28 13:02:41 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
+import org.apache.qpid.codec.MarkableDataInput;
+
import java.io.IOException;
/**
@@ -28,5 +29,5 @@ import java.io.IOException;
*/
public interface BodyFactory
{
- AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException;
+ AMQBody createBody(MarkableDataInput in, long bodySize) throws AMQFrameDecodingException, IOException;
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org