You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kg...@apache.org on 2011/09/19 17:13:38 UTC
svn commit: r1172657 [16/21] - in /qpid/branches/qpid-3346/qpid: ./ cpp/
cpp/bindings/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qpid/dotnet/
cpp/bindings/qpid/dotnet/examples/csharp.direct.receiver/Properties/
cpp/bindings/qpid/dotnet/examples/csha...
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Mon Sep 19 15:13:18 2011
@@ -20,11 +20,12 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.*;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.nio.ByteBuffer;
import java.nio.charset.Charset;
public class EncodingUtils
@@ -218,7 +219,7 @@ public class EncodingUtils
return 0;
}
- public static void writeShortStringBytes(ByteBuffer buffer, String s)
+ public static void writeShortStringBytes(DataOutputStream buffer, String s) throws IOException
{
if (s != null)
{
@@ -231,18 +232,18 @@ public class EncodingUtils
// TODO: check length fits in an unsigned byte
writeUnsignedByte(buffer, (short)encodedString.length);
- buffer.put(encodedString);
+ buffer.write(encodedString);
}
else
{
// really writing out unsigned byte
- buffer.put((byte) 0);
+ buffer.write((byte) 0);
}
}
- public static void writeShortStringBytes(ByteBuffer buffer, AMQShortString s)
+ public static void writeShortStringBytes(DataOutputStream buffer, AMQShortString s) throws IOException
{
if (s != null)
{
@@ -252,11 +253,11 @@ public class EncodingUtils
else
{
// really writing out unsigned byte
- buffer.put((byte) 0);
+ buffer.write((byte) 0);
}
}
- public static void writeLongStringBytes(ByteBuffer buffer, String s)
+ public static void writeLongStringBytes(DataOutputStream buffer, String s) throws IOException
{
assert (s == null) || (s.length() <= 0xFFFE);
if (s != null)
@@ -270,7 +271,7 @@ public class EncodingUtils
encodedString[i] = (byte) cha[i];
}
- buffer.put(encodedString);
+ buffer.write(encodedString);
}
else
{
@@ -278,7 +279,7 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(ByteBuffer buffer, char[] s)
+ public static void writeLongStringBytes(DataOutputStream buffer, char[] s) throws IOException
{
assert (s == null) || (s.length <= 0xFFFE);
if (s != null)
@@ -291,7 +292,7 @@ public class EncodingUtils
encodedString[i] = (byte) s[i];
}
- buffer.put(encodedString);
+ buffer.write(encodedString);
}
else
{
@@ -299,13 +300,13 @@ public class EncodingUtils
}
}
- public static void writeLongStringBytes(ByteBuffer buffer, byte[] bytes)
+ public static void writeLongStringBytes(DataOutputStream buffer, byte[] bytes) throws IOException
{
assert (bytes == null) || (bytes.length <= 0xFFFE);
if (bytes != null)
{
writeUnsignedInteger(buffer, bytes.length);
- buffer.put(bytes);
+ buffer.write(bytes);
}
else
{
@@ -313,24 +314,24 @@ public class EncodingUtils
}
}
- public static void writeUnsignedByte(ByteBuffer buffer, short b)
+ public static void writeUnsignedByte(DataOutputStream buffer, short b) throws IOException
{
byte bv = (byte) b;
- buffer.put(bv);
+ buffer.write(bv);
}
- public static void writeUnsignedShort(ByteBuffer buffer, int s)
+ public static void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (s < Short.MAX_VALUE)
{
- buffer.putShort((short) s);
+ buffer.writeShort(s);
}
else
{
short sv = (short) s;
- buffer.put((byte) (0xFF & (sv >> 8)));
- buffer.put((byte) (0xFF & sv));
+ buffer.write((byte) (0xFF & (sv >> 8)));
+ buffer.write((byte) (0xFF & sv));
}
}
@@ -339,12 +340,12 @@ public class EncodingUtils
return 4;
}
- public static void writeUnsignedInteger(ByteBuffer buffer, long l)
+ public static void writeUnsignedInteger(DataOutputStream buffer, long l) throws IOException
{
// TODO: Is this comparison safe? Do I need to cast RHS to long?
if (l < Integer.MAX_VALUE)
{
- buffer.putInt((int) l);
+ buffer.writeInt((int) l);
}
else
{
@@ -352,14 +353,14 @@ public class EncodingUtils
// FIXME: This *may* go faster if we build this into a local 4-byte array and then
// put the array in a single call.
- buffer.put((byte) (0xFF & (iv >> 24)));
- buffer.put((byte) (0xFF & (iv >> 16)));
- buffer.put((byte) (0xFF & (iv >> 8)));
- buffer.put((byte) (0xFF & iv));
+ buffer.write((byte) (0xFF & (iv >> 24)));
+ buffer.write((byte) (0xFF & (iv >> 16)));
+ buffer.write((byte) (0xFF & (iv >> 8)));
+ buffer.write((byte) (0xFF & iv));
}
}
- public static void writeFieldTableBytes(ByteBuffer buffer, FieldTable table)
+ public static void writeFieldTableBytes(DataOutputStream buffer, FieldTable table) throws IOException
{
if (table != null)
{
@@ -371,12 +372,12 @@ public class EncodingUtils
}
}
- public static void writeContentBytes(ByteBuffer buffer, Content content)
+ public static void writeContentBytes(DataOutputStream buffer, Content content)
{
// TODO: New Content class required for AMQP 0-9.
}
- public static void writeBooleans(ByteBuffer buffer, boolean[] values)
+ public static void writeBooleans(DataOutputStream buffer, boolean[] values) throws IOException
{
byte packedValue = 0;
for (int i = 0; i < values.length; i++)
@@ -387,16 +388,16 @@ public class EncodingUtils
}
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value)
+ public static void writeBooleans(DataOutputStream buffer, boolean value) throws IOException
{
- buffer.put(value ? (byte) 1 : (byte) 0);
+ buffer.write(value ? (byte) 1 : (byte) 0);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -405,10 +406,10 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 1));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -422,10 +423,10 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 2));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -444,11 +445,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 3));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -472,11 +473,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 4));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4, boolean value5)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -505,11 +506,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 5));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4, boolean value5, boolean value6)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5, boolean value6) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -543,11 +544,11 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 6));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
- public static void writeBooleans(ByteBuffer buffer, boolean value0, boolean value1, boolean value2, boolean value3,
- boolean value4, boolean value5, boolean value6, boolean value7)
+ public static void writeBooleans(DataOutputStream buffer, boolean value0, boolean value1, boolean value2, boolean value3,
+ boolean value4, boolean value5, boolean value6, boolean value7) throws IOException
{
byte packedValue = value0 ? (byte) 1 : (byte) 0;
@@ -586,7 +587,7 @@ public class EncodingUtils
packedValue = (byte) (packedValue | (byte) (1 << 7));
}
- buffer.put(packedValue);
+ buffer.write(packedValue);
}
/**
@@ -595,12 +596,12 @@ public class EncodingUtils
* @param buffer
* @param data
*/
- public static void writeLongstr(ByteBuffer buffer, byte[] data)
+ public static void writeLongstr(DataOutputStream buffer, byte[] data) throws IOException
{
if (data != null)
{
writeUnsignedInteger(buffer, data.length);
- buffer.put(data);
+ buffer.write(data);
}
else
{
@@ -608,14 +609,14 @@ public class EncodingUtils
}
}
- public static void writeTimestamp(ByteBuffer buffer, long timestamp)
+ public static void writeTimestamp(DataOutputStream buffer, long timestamp) throws IOException
{
writeLong(buffer, timestamp);
}
- public static boolean[] readBooleans(ByteBuffer buffer)
+ public static boolean[] readBooleans(DataInputStream buffer) throws IOException
{
- final byte packedValue = buffer.get();
+ final byte packedValue = buffer.readByte();
if (packedValue == 0)
{
return ALL_FALSE_ARRAY;
@@ -640,9 +641,9 @@ public class EncodingUtils
return result;
}
- public static FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ public static FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return null;
@@ -653,21 +654,21 @@ public class EncodingUtils
}
}
- public static Content readContent(ByteBuffer buffer) throws AMQFrameDecodingException
+ public static Content readContent(DataInputStream buffer) throws AMQFrameDecodingException
{
// TODO: New Content class required for AMQP 0-9.
return null;
}
- public static AMQShortString readAMQShortString(ByteBuffer buffer)
+ public static AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
{
return AMQShortString.readFromBuffer(buffer);
}
- public static String readShortString(ByteBuffer buffer)
+ public static String readShortString(DataInputStream buffer) throws IOException
{
- short length = buffer.getUnsigned();
+ short length = (short) (((short)buffer.readByte()) & 0xFF);
if (length == 0)
{
return null;
@@ -680,7 +681,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[length];
- buffer.get(stringBytes, 0, length);
+ buffer.read(stringBytes, 0, length);
char[] stringChars = new char[length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -691,9 +692,9 @@ public class EncodingUtils
}
}
- public static String readLongString(ByteBuffer buffer)
+ public static String readLongString(DataInputStream buffer) throws IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return "";
@@ -706,7 +707,7 @@ public class EncodingUtils
// this approach here is valid since we know that all the chars are
// ASCII (0-127)
byte[] stringBytes = new byte[(int) length];
- buffer.get(stringBytes, 0, (int) length);
+ buffer.read(stringBytes, 0, (int) length);
char[] stringChars = new char[(int) length];
for (int i = 0; i < stringChars.length; i++)
{
@@ -717,9 +718,9 @@ public class EncodingUtils
}
}
- public static byte[] readLongstr(ByteBuffer buffer)
+ public static byte[] readLongstr(DataInputStream buffer) throws IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return null;
@@ -727,17 +728,17 @@ public class EncodingUtils
else
{
byte[] result = new byte[(int) length];
- buffer.get(result);
+ buffer.read(result);
return result;
}
}
- public static long readTimestamp(ByteBuffer buffer)
+ public static long readTimestamp(DataInputStream buffer) throws IOException
{
// Discard msb from AMQ timestamp
// buffer.getUnsignedInt();
- return buffer.getLong();
+ return buffer.readLong();
}
static byte[] hexToByteArray(String id)
@@ -817,14 +818,14 @@ public class EncodingUtils
// AMQP_BOOLEAN_PROPERTY_PREFIX
- public static void writeBoolean(ByteBuffer buffer, Boolean aBoolean)
+ public static void writeBoolean(DataOutputStream buffer, Boolean aBoolean) throws IOException
{
- buffer.put((byte) (aBoolean ? 1 : 0));
+ buffer.write(aBoolean ? 1 : 0);
}
- public static boolean readBoolean(ByteBuffer buffer)
+ public static boolean readBoolean(DataInputStream buffer) throws IOException
{
- byte packedValue = buffer.get();
+ byte packedValue = buffer.readByte();
return (packedValue == 1);
}
@@ -835,14 +836,14 @@ public class EncodingUtils
}
// AMQP_BYTE_PROPERTY_PREFIX
- public static void writeByte(ByteBuffer buffer, Byte aByte)
+ public static void writeByte(DataOutputStream buffer, Byte aByte) throws IOException
{
- buffer.put(aByte);
+ buffer.writeByte(aByte);
}
- public static byte readByte(ByteBuffer buffer)
+ public static byte readByte(DataInputStream buffer) throws IOException
{
- return buffer.get();
+ return buffer.readByte();
}
public static int encodedByteLength()
@@ -851,14 +852,14 @@ public class EncodingUtils
}
// AMQP_SHORT_PROPERTY_PREFIX
- public static void writeShort(ByteBuffer buffer, Short aShort)
+ public static void writeShort(DataOutputStream buffer, Short aShort) throws IOException
{
- buffer.putShort(aShort);
+ buffer.writeShort(aShort);
}
- public static short readShort(ByteBuffer buffer)
+ public static short readShort(DataInputStream buffer) throws IOException
{
- return buffer.getShort();
+ return buffer.readShort();
}
public static int encodedShortLength()
@@ -867,14 +868,14 @@ public class EncodingUtils
}
// INTEGER_PROPERTY_PREFIX
- public static void writeInteger(ByteBuffer buffer, Integer aInteger)
+ public static void writeInteger(DataOutputStream buffer, Integer aInteger) throws IOException
{
- buffer.putInt(aInteger);
+ buffer.writeInt(aInteger);
}
- public static int readInteger(ByteBuffer buffer)
+ public static int readInteger(DataInputStream buffer) throws IOException
{
- return buffer.getInt();
+ return buffer.readInt();
}
public static int encodedIntegerLength()
@@ -883,14 +884,14 @@ public class EncodingUtils
}
// AMQP_LONG_PROPERTY_PREFIX
- public static void writeLong(ByteBuffer buffer, Long aLong)
+ public static void writeLong(DataOutputStream buffer, Long aLong) throws IOException
{
- buffer.putLong(aLong);
+ buffer.writeLong(aLong);
}
- public static long readLong(ByteBuffer buffer)
+ public static long readLong(DataInputStream buffer) throws IOException
{
- return buffer.getLong();
+ return buffer.readLong();
}
public static int encodedLongLength()
@@ -899,14 +900,14 @@ public class EncodingUtils
}
// Float_PROPERTY_PREFIX
- public static void writeFloat(ByteBuffer buffer, Float aFloat)
+ public static void writeFloat(DataOutputStream buffer, Float aFloat) throws IOException
{
- buffer.putFloat(aFloat);
+ buffer.writeFloat(aFloat);
}
- public static float readFloat(ByteBuffer buffer)
+ public static float readFloat(DataInputStream buffer) throws IOException
{
- return buffer.getFloat();
+ return buffer.readFloat();
}
public static int encodedFloatLength()
@@ -915,14 +916,14 @@ public class EncodingUtils
}
// Double_PROPERTY_PREFIX
- public static void writeDouble(ByteBuffer buffer, Double aDouble)
+ public static void writeDouble(DataOutputStream buffer, Double aDouble) throws IOException
{
- buffer.putDouble(aDouble);
+ buffer.writeDouble(aDouble);
}
- public static double readDouble(ByteBuffer buffer)
+ public static double readDouble(DataInputStream buffer) throws IOException
{
- return buffer.getDouble();
+ return buffer.readDouble();
}
public static int encodedDoubleLength()
@@ -930,9 +931,9 @@ public class EncodingUtils
return 8;
}
- public static byte[] readBytes(ByteBuffer buffer)
+ public static byte[] readBytes(DataInputStream buffer) throws IOException
{
- long length = buffer.getUnsignedInt();
+ long length = ((long)(buffer.readInt())) & 0xFFFFFFFFL;
if (length == 0)
{
return null;
@@ -940,19 +941,19 @@ public class EncodingUtils
else
{
byte[] dataBytes = new byte[(int)length];
- buffer.get(dataBytes, 0, (int)length);
+ buffer.read(dataBytes, 0, (int) length);
return dataBytes;
}
}
- public static void writeBytes(ByteBuffer buffer, byte[] data)
+ public static void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
{
if (data != null)
{
// TODO: check length fits in an unsigned byte
writeUnsignedInteger(buffer, (long)data.length);
- buffer.put(data);
+ buffer.write(data);
}
else
{
@@ -968,35 +969,35 @@ public class EncodingUtils
return encodedByteLength();
}
- public static char readChar(ByteBuffer buffer)
+ public static char readChar(DataInputStream buffer) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
- return (char) buffer.get();
+ return (char) buffer.read();
}
- public static void writeChar(ByteBuffer buffer, char character)
+ public static void writeChar(DataOutputStream buffer, char character) throws IOException
{
// This is valid as we know that the Character is ASCII 0..127
writeByte(buffer, (byte) character);
}
- public static long readLongAsShortString(ByteBuffer buffer)
+ public static long readLongAsShortString(DataInputStream buffer) throws IOException
{
- short length = buffer.getUnsigned();
+ short length = (short) buffer.readUnsignedByte();
short pos = 0;
if (length == 0)
{
return 0L;
}
- byte digit = buffer.get();
+ byte digit = buffer.readByte();
boolean isNegative;
long result = 0;
if (digit == (byte) '-')
{
isNegative = true;
pos++;
- digit = buffer.get();
+ digit = buffer.readByte();
}
else
{
@@ -1009,7 +1010,7 @@ public class EncodingUtils
while (pos < length)
{
pos++;
- digit = buffer.get();
+ digit = buffer.readByte();
result = (result << 3) + (result << 1);
result += digit - (byte) '0';
}
@@ -1017,15 +1018,15 @@ public class EncodingUtils
return result;
}
- public static long readUnsignedInteger(ByteBuffer buffer)
+ public static long readUnsignedInteger(DataInputStream buffer) throws IOException
{
- long l = 0xFF & buffer.get();
+ long l = 0xFF & buffer.readByte();
l <<= 8;
- l = l | (0xFF & buffer.get());
+ l = l | (0xFF & buffer.readByte());
l <<= 8;
- l = l | (0xFF & buffer.get());
+ l = l | (0xFF & buffer.readByte());
l <<= 8;
- l = l | (0xFF & buffer.get());
+ l = l | (0xFF & buffer.readByte());
return l;
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTable.java Mon Sep 19 15:13:18 2011
@@ -20,12 +20,16 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
import org.apache.qpid.AMQPInvalidClassException;
+import java.io.ByteArrayInputStream;
+import java.io.ByteArrayOutputStream;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.math.BigDecimal;
import java.util.Collections;
import java.util.Enumeration;
@@ -43,8 +47,8 @@ public class FieldTable
private static final String STRICT_AMQP = "STRICT_AMQP";
private final boolean _strictAMQP = Boolean.valueOf(System.getProperty(STRICT_AMQP, "false"));
- private ByteBuffer _encodedForm;
- private LinkedHashMap<AMQShortString, AMQTypedValue> _properties;
+ private byte[] _encodedForm;
+ private LinkedHashMap<AMQShortString, AMQTypedValue> _properties = null;
private long _encodedSize;
private static final int INITIAL_HASHMAP_CAPACITY = 16;
private static final int INITIAL_ENCODED_FORM_SIZE = 256;
@@ -52,9 +56,6 @@ public class FieldTable
public FieldTable()
{
super();
- // _encodedForm = ByteBuffer.allocate(INITIAL_ENCODED_FORM_SIZE);
- // _encodedForm.setAutoExpand(true);
- // _encodedForm.limit(0);
}
/**
@@ -63,16 +64,12 @@ public class FieldTable
* @param buffer the buffer from which to read data. The length byte must be read already
* @param length the length of the field table. Must be > 0.
*/
- public FieldTable(ByteBuffer buffer, long length)
+ public FieldTable(DataInputStream buffer, long length) throws IOException
{
this();
- ByteBuffer encodedForm = buffer.slice();
- encodedForm.limit((int) length);
- _encodedForm = ByteBuffer.allocate((int)length);
- _encodedForm.put(encodedForm);
- _encodedForm.flip();
+ _encodedForm = new byte[(int) length];
+ buffer.read(_encodedForm);
_encodedSize = length;
- buffer.skip((int) length);
}
public AMQTypedValue getProperty(AMQShortString string)
@@ -108,13 +105,19 @@ public class FieldTable
{
try
{
- setFromBuffer(_encodedForm, _encodedSize);
+ setFromBuffer();
}
catch (AMQFrameDecodingException e)
{
_logger.error("Error decoding FieldTable in deferred decoding mode ", e);
throw new IllegalArgumentException(e);
}
+ catch (IOException e)
+ {
+ _logger.error("Unexpected IO exception decoding field table");
+ throw new IllegalArgumentException(e);
+
+ }
}
private AMQTypedValue setProperty(AMQShortString key, AMQTypedValue val)
@@ -766,7 +769,7 @@ public class FieldTable
// ************************* Byte Buffer Processing
- public void writeToBuffer(ByteBuffer buffer)
+ public void writeToBuffer(DataOutputStream buffer) throws IOException
{
final boolean trace = _logger.isDebugEnabled();
@@ -786,17 +789,21 @@ public class FieldTable
public byte[] getDataAsBytes()
{
- final int encodedSize = (int) getEncodedSize();
- final ByteBuffer buffer = ByteBuffer.allocate(encodedSize); // FIXME XXX: Is cast a problem?
-
- putDataInBuffer(buffer);
-
- final byte[] result = new byte[encodedSize];
- buffer.flip();
- buffer.get(result);
- buffer.release();
+ if(_encodedForm == null)
+ {
+ ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ try
+ {
+ putDataInBuffer(new DataOutputStream(baos));
+ return baos.toByteArray();
+ }
+ catch (IOException e)
+ {
+ throw new IllegalArgumentException("IO Exception should never be thrown here");
+ }
- return result;
+ }
+ return _encodedForm.clone();
}
public long getEncodedSize()
@@ -926,15 +933,8 @@ public class FieldTable
public Iterator<Map.Entry<AMQShortString, AMQTypedValue>> iterator()
{
- if(_encodedForm != null)
- {
- return new FieldTableIterator(_encodedForm.duplicate().rewind(),(int)_encodedSize);
- }
- else
- {
- initMapIfNecessary();
- return _properties.entrySet().iterator();
- }
+ initMapIfNecessary();
+ return _properties.entrySet().iterator();
}
public Object get(String key)
@@ -1002,26 +1002,12 @@ public class FieldTable
return _properties.keySet();
}
- private void putDataInBuffer(ByteBuffer buffer)
+ private void putDataInBuffer(DataOutputStream buffer) throws IOException
{
if (_encodedForm != null)
{
- if(buffer.isDirect() || buffer.isReadOnly())
- {
- ByteBuffer encodedForm = _encodedForm.duplicate();
-
- if (encodedForm.position() != 0)
- {
- encodedForm.flip();
- }
-
- buffer.put(encodedForm);
- }
- else
- {
- buffer.put(_encodedForm.array(),_encodedForm.arrayOffset(),(int)_encodedSize);
- }
+ buffer.write(_encodedForm);
}
else if (_properties != null)
{
@@ -1035,41 +1021,27 @@ public class FieldTable
final Map.Entry<AMQShortString, AMQTypedValue> me = it.next();
try
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
- + me.getValue().getValue());
- _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
- }
-
// Write the actual parameter name
EncodingUtils.writeShortStringBytes(buffer, me.getKey());
me.getValue().writeToBuffer(buffer);
}
catch (Exception e)
{
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Exception thrown:" + e);
- _logger.debug("Writing Property:" + me.getKey() + " Type:" + me.getValue().getType() + " Value:"
- + me.getValue().getValue());
- _logger.debug("Buffer Position:" + buffer.position() + " Remaining:" + buffer.remaining());
- }
-
throw new RuntimeException(e);
}
}
}
}
- private void setFromBuffer(ByteBuffer buffer, long length) throws AMQFrameDecodingException
+ private void setFromBuffer() throws AMQFrameDecodingException, IOException
{
+ final ByteArrayInputStream in = new ByteArrayInputStream(_encodedForm);
+ DataInputStream buffer = new DataInputStream(in);
final boolean trace = _logger.isDebugEnabled();
- if (length > 0)
+ if (_encodedSize > 0)
{
- final int expectedRemaining = buffer.remaining() - (int) length;
_properties = new LinkedHashMap<AMQShortString, AMQTypedValue>(INITIAL_HASHMAP_CAPACITY);
@@ -1077,121 +1049,16 @@ public class FieldTable
{
final AMQShortString key = EncodingUtils.readAMQShortString(buffer);
-
- _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read key '" + key);
-
AMQTypedValue value = AMQTypedValue.readFromBuffer(buffer);
-
- if (trace)
- {
- _logger.debug("FieldTable::PropFieldTable(buffer," + length + "): Read type '" + value.getType()
- + "', key '" + key + "', value '" + value.getValue() + "'");
- }
-
_properties.put(key, value);
}
- while (buffer.remaining() > expectedRemaining);
-
- }
-
- _encodedSize = length;
-
- if (trace)
- {
- _logger.debug("FieldTable::FieldTable(buffer," + length + "): Done.");
- }
- }
-
- private static final class FieldTableEntry implements Map.Entry<AMQShortString, AMQTypedValue>
- {
- private final AMQTypedValue _value;
- private final AMQShortString _key;
-
- public FieldTableEntry(final AMQShortString key, final AMQTypedValue value)
- {
- _key = key;
- _value = value;
- }
-
- public AMQShortString getKey()
- {
- return _key;
- }
-
- public AMQTypedValue getValue()
- {
- return _value;
- }
-
- public AMQTypedValue setValue(final AMQTypedValue value)
- {
- throw new UnsupportedOperationException();
- }
-
- public boolean equals(Object o)
- {
- if(o instanceof FieldTableEntry)
- {
- FieldTableEntry other = (FieldTableEntry) o;
- return (_key == null ? other._key == null : _key.equals(other._key))
- && (_value == null ? other._value == null : _value.equals(other._value));
- }
- else
- {
- return false;
- }
- }
-
- public int hashCode()
- {
- return (getKey()==null ? 0 : getKey().hashCode())
- ^ (getValue()==null ? 0 : getValue().hashCode());
- }
-
- }
-
-
- private static final class FieldTableIterator implements Iterator<Map.Entry<AMQShortString, AMQTypedValue>>
- {
+ while (in.available() > 0);
- private final ByteBuffer _buffer;
- private int _expectedRemaining;
-
- public FieldTableIterator(ByteBuffer buffer, int length)
- {
- _buffer = buffer;
- _expectedRemaining = buffer.remaining() - length;
- }
-
- public boolean hasNext()
- {
- return (_buffer.remaining() > _expectedRemaining);
}
- public Map.Entry<AMQShortString, AMQTypedValue> next()
- {
- if(hasNext())
- {
- final AMQShortString key = EncodingUtils.readAMQShortString(_buffer);
- AMQTypedValue value = AMQTypedValue.readFromBuffer(_buffer);
- return new FieldTableEntry(key, value);
- }
- else
- {
- return null;
- }
- }
-
- public void remove()
- {
- throw new UnsupportedOperationException();
- }
}
-
-
-
public int hashCode()
{
initMapIfNecessary();
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/FieldTableFactory.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
public class FieldTableFactory
{
@@ -29,7 +30,7 @@ public class FieldTableFactory
return new FieldTable();
}
- public static FieldTable newFieldTable(ByteBuffer byteBuffer, long length) throws AMQFrameDecodingException
+ public static FieldTable newFieldTable(DataInputStream byteBuffer, long length) throws AMQFrameDecodingException, IOException
{
return new FieldTable(byteBuffer, length);
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBody.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -34,12 +37,12 @@ public class HeartbeatBody implements AM
}
- public HeartbeatBody(ByteBuffer buffer, long size)
+ public HeartbeatBody(DataInputStream buffer, long size) throws IOException
{
if(size > 0)
{
//allow other implementations to have a payload, but ignore it:
- buffer.skip((int) size);
+ buffer.skip(size);
}
}
@@ -53,7 +56,7 @@ public class HeartbeatBody implements AM
return 0;//heartbeats we generate have no payload
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer)
{
}
@@ -63,12 +66,12 @@ public class HeartbeatBody implements AM
session.heartbeatBodyReceived(channelId, this);
}
- protected void populateFromBuffer(ByteBuffer buffer, long size) throws AMQFrameDecodingException
+ protected void populateFromBuffer(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException
{
if(size > 0)
{
//allow other implementations to have a payload, but ignore it:
- buffer.skip((int) size);
+ buffer.skip(size);
}
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/HeartbeatBodyFactory.java Mon Sep 19 15:13:18 2011
@@ -20,11 +20,11 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
public class HeartbeatBodyFactory implements BodyFactory
{
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException
{
return new HeartbeatBody();
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Mon Sep 19 15:13:18 2011
@@ -22,6 +22,10 @@ package org.apache.qpid.framing;
import org.apache.qpid.AMQException;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import java.io.UnsupportedEncodingException;
import java.nio.ByteBuffer;
import java.util.Arrays;
@@ -62,35 +66,30 @@ public class ProtocolInitiation extends
pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
- public ProtocolInitiation(ByteBuffer in)
+ public ProtocolInitiation(DataInputStream in) throws IOException
{
_protocolHeader = new byte[4];
- in.get(_protocolHeader);
+ in.read(_protocolHeader);
- _protocolClass = in.get();
- _protocolInstance = in.get();
- _protocolMajor = in.get();
- _protocolMinor = in.get();
+ _protocolClass = in.readByte();
+ _protocolInstance = in.readByte();
+ _protocolMajor = in.readByte();
+ _protocolMinor = in.readByte();
}
- public void writePayload(org.apache.mina.common.ByteBuffer buffer)
- {
- writePayload(buffer.buf());
- }
-
public long getSize()
{
return 4 + 1 + 1 + 1 + 1;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
- buffer.put(_protocolHeader);
- buffer.put(_protocolClass);
- buffer.put(_protocolInstance);
- buffer.put(_protocolMajor);
- buffer.put(_protocolMinor);
+ buffer.write(_protocolHeader);
+ buffer.write(_protocolClass);
+ buffer.write(_protocolInstance);
+ buffer.write(_protocolMajor);
+ buffer.write(_protocolMinor);
}
public boolean equals(Object o)
@@ -144,9 +143,9 @@ public class ProtocolInitiation extends
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
*/
- public boolean decodable(ByteBuffer in)
+ public boolean decodable(DataInputStream in) throws IOException
{
- return (in.remaining() >= 8);
+ return (in.available() >= 8);
}
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/SmallCompositeAMQDataBlock.java Mon Sep 19 15:13:18 2011
@@ -21,7 +21,8 @@
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class SmallCompositeAMQDataBlock extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -68,7 +69,7 @@ public class SmallCompositeAMQDataBlock
return frameSize;
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
if (_firstFrame != null)
{
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Mon Sep 19 15:13:18 2011
@@ -20,7 +20,8 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
@@ -144,7 +145,7 @@ public class VersionSpecificRegistry
}
- public AMQMethodBody get(short classID, short methodID, ByteBuffer in, long size) throws AMQFrameDecodingException
+ public AMQMethodBody get(short classID, short methodID, DataInputStream in, long size) throws AMQFrameDecodingException, IOException
{
AMQMethodBodyInstanceFactory bodyFactory;
try
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java Mon Sep 19 15:13:18 2011
@@ -21,12 +21,10 @@
package org.apache.qpid.framing.abstraction;
-import org.apache.mina.common.ByteBuffer;
-
public interface ContentChunk
{
int getSize();
- ByteBuffer getData();
+ byte[] getData();
void reduceToFit();
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Mon Sep 19 15:13:18 2011
@@ -23,8 +23,6 @@ package org.apache.qpid.framing.abstract
import org.apache.qpid.framing.AMQBody;
-import java.nio.ByteBuffer;
-
public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
{
AMQBody convertToBody(ContentChunk contentBody);
@@ -32,5 +30,5 @@ public interface ProtocolVersionMethodCo
void configure();
- AMQBody convertToBody(ByteBuffer buf);
+ AMQBody convertToBody(byte[] input);
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java Mon Sep 19 15:13:18 2011
@@ -21,16 +21,13 @@
package org.apache.qpid.framing.amqp_0_9;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.*;
-import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl;
+
public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
@@ -72,9 +69,9 @@ public class MethodConverter_0_9 extends
}
- public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ public AMQBody convertToBody(byte[] data)
{
- return new ContentBody(ByteBuffer.wrap(buf));
+ return new ContentBody(data);
}
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
@@ -116,9 +113,9 @@ public class MethodConverter_0_9 extends
return _contentBodyChunk.getSize();
}
- public ByteBuffer getData()
+ public byte[] getData()
{
- return _contentBodyChunk.payload;
+ return _contentBodyChunk._payload;
}
public void reduceToFit()
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java Mon Sep 19 15:13:18 2011
@@ -21,8 +21,6 @@
package org.apache.qpid.framing.amqp_0_91;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.framing.abstraction.ContentChunk;
@@ -70,9 +68,9 @@ public class MethodConverter_0_91 extend
}
- public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ public AMQBody convertToBody(byte[] data)
{
- return new ContentBody(ByteBuffer.wrap(buf));
+ return new ContentBody(data);
}
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
@@ -114,9 +112,9 @@ public class MethodConverter_0_91 extend
return _contentBodyChunk.getSize();
}
- public ByteBuffer getData()
+ public byte[] getData()
{
- return _contentBodyChunk.payload;
+ return _contentBodyChunk._payload;
}
public void reduceToFit()
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/MethodConverter_8_0.java Mon Sep 19 15:13:18 2011
@@ -26,11 +26,8 @@ import org.apache.qpid.framing.abstracti
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
-import org.apache.qpid.framing.amqp_8_0.BasicPublishBodyImpl;
import org.apache.qpid.framing.*;
-import org.apache.mina.common.ByteBuffer;
-
public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
private int _basicPublishClassId;
@@ -60,9 +57,9 @@ public class MethodConverter_8_0 extends
return contentBodyChunk.getSize();
}
- public ByteBuffer getData()
+ public byte[] getData()
{
- return contentBodyChunk.payload;
+ return contentBodyChunk._payload;
}
public void reduceToFit()
@@ -81,9 +78,9 @@ public class MethodConverter_8_0 extends
}
- public AMQBody convertToBody(java.nio.ByteBuffer buf)
+ public AMQBody convertToBody(byte[] data)
{
- return new ContentBody(ByteBuffer.wrap(buf));
+ return new ContentBody(data);
}
public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Mon Sep 19 15:13:18 2011
@@ -80,7 +80,7 @@ public final class AMQConstant
/**
* An operator intervened to close the connection for some reason. The client may retry at some later date.
*/
- public static final AMQConstant CONTEXT_IN_USE = new AMQConstant(320, "context in use", true);
+ public static final AMQConstant CONNECTION_FORCED = new AMQConstant(320, "connection forced", true);
/** The client tried to work with an unknown virtual host or cluster. */
public static final AMQConstant INVALID_PATH = new AMQConstant(402, "invalid path", true);
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngine.java Mon Sep 19 15:13:18 2011
@@ -21,8 +21,11 @@
package org.apache.qpid.protocol;
import java.net.SocketAddress;
+import java.nio.ByteBuffer;
import org.apache.qpid.transport.Receiver;
+import org.apache.qpid.transport.Sender;
+import org.apache.qpid.transport.network.NetworkConnection;
/**
* A ProtocolEngine is a Receiver for java.nio.ByteBuffers. It takes the data passed to it in the received
@@ -53,4 +56,6 @@ public interface ProtocolEngine extends
void readerIdle();
+ public void setNetworkConnection(NetworkConnection network, Sender<ByteBuffer> sender);
+
}
\ No newline at end of file
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/protocol/ProtocolEngineFactory.java Mon Sep 19 15:13:18 2011
@@ -26,6 +26,6 @@ public interface ProtocolEngineFactory
{
// Returns a new instance of a ProtocolEngine
- ProtocolEngine newProtocolEngine(NetworkConnection network);
+ ProtocolEngine newProtocolEngine();
}
\ No newline at end of file
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/ssl/SSLContextFactory.java Mon Sep 19 15:13:18 2011
@@ -20,18 +20,17 @@
*/
package org.apache.qpid.ssl;
-import java.io.File;
-import java.io.FileInputStream;
import java.io.IOException;
-import java.io.InputStream;
import java.security.GeneralSecurityException;
import java.security.KeyStore;
import javax.net.ssl.KeyManager;
import javax.net.ssl.KeyManagerFactory;
import javax.net.ssl.SSLContext;
+import javax.net.ssl.TrustManager;
import javax.net.ssl.TrustManagerFactory;
+import org.apache.qpid.transport.network.security.ssl.QpidClientX509KeyManager;
import org.apache.qpid.transport.network.security.ssl.SSLUtil;
/**
@@ -39,157 +38,92 @@ import org.apache.qpid.transport.network
* before this will work.
*
*/
-public class SSLContextFactory {
-
- /**
- * Path to the Java keystore file
- */
- private String _keyStorePath;
-
- /**
- * Password for the keystore
- */
- private String _keyStorePassword;
-
- /**
- * Cert type to use in keystore
- */
- private String _keyStoreCertType;
-
- /**
- * Path to the Java truststore file
- */
- private String _trustStorePath;
-
- /**
- * Password for the truststore
- */
- private String _trustStorePassword;
-
- /**
- * Cert type to use in truststore
- */
- private String _trustStoreCertType;
-
- private KeyManager customKeyManager;
-
- public SSLContextFactory(String trustStorePath, String trustStorePassword,
- String trustStoreCertType)
+public class SSLContextFactory
+{
+ public static final String JAVA_KEY_STORE_CODE = "JKS";
+ public static final String TRANSPORT_LAYER_SECURITY_CODE = "TLS";
+ public static final String KEY_STORE_CERTIFICATE_TYPE = "SunX509";
+
+ private SSLContextFactory()
{
- this(trustStorePath,trustStorePassword,trustStoreCertType,
- trustStorePath,trustStorePassword,trustStoreCertType);
+ //no instances
}
- /**
- * Create a factory instance
- * @param keystorePath path to the Java keystore file
- * @param keystorePassword password for the Java keystore
- * @param certType certificate type
- */
- public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
- String keyStorePath, String keyStorePassword, String keyStoreCertType)
- {
-
- _trustStorePath = trustStorePath;
- _trustStorePassword = trustStorePassword;
-
- if (_trustStorePassword != null && _trustStorePassword.equals("none"))
- {
- _trustStorePassword = null;
- }
- _trustStoreCertType = trustStoreCertType;
-
- _keyStorePath = keyStorePath;
- _keyStorePassword = keyStorePassword;
-
- if (_keyStorePassword != null && _keyStorePassword.equals("none"))
- {
- _keyStorePassword = null;
- }
- _keyStoreCertType = keyStoreCertType;
-
- if (_trustStorePath == null) {
- throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
- }
- if (_trustStoreCertType == null) {
- throw new IllegalArgumentException("Cert type must be specified");
- }
- }
-
- public SSLContextFactory(String trustStorePath, String trustStorePassword, String trustStoreCertType,
- KeyManager customKeyManager)
+ public static SSLContext buildServerContext(final String keyStorePath,
+ final String keyStorePassword, final String keyStoreCertType)
+ throws GeneralSecurityException, IOException
{
+ return buildContext(null, null, null, keyStorePath, keyStorePassword,
+ keyStoreCertType, null);
+ }
- _trustStorePath = trustStorePath;
- _trustStorePassword = trustStorePassword;
-
- if (_trustStorePassword != null && _trustStorePassword.equals("none"))
- {
- _trustStorePassword = null;
- }
- _trustStoreCertType = trustStoreCertType;
-
- if (_trustStorePath == null) {
- throw new IllegalArgumentException("A TrustStore path or KeyStore path must be specified");
- }
- if (_trustStoreCertType == null) {
- throw new IllegalArgumentException("Cert type must be specified");
- }
-
- this.customKeyManager = customKeyManager;
+ public static SSLContext buildClientContext(final String trustStorePath,
+ final String trustStorePassword, final String trustStoreCertType,
+ final String keyStorePath, final String keyStorePassword,
+ final String keyStoreCertType, final String certAlias)
+ throws GeneralSecurityException, IOException
+ {
+ return buildContext(trustStorePath, trustStorePassword,
+ trustStoreCertType, keyStorePath, keyStorePassword,
+ keyStoreCertType, certAlias);
}
-
-
- /**
- * Builds a SSLContext appropriate for use with a server
- * @return SSLContext
- * @throws GeneralSecurityException
- * @throws IOException
- */
-
- public SSLContext buildServerContext() throws GeneralSecurityException, IOException
- {
- KeyStore ts = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
- tmf.init(ts);
-
+
+ private static SSLContext buildContext(final String trustStorePath,
+ final String trustStorePassword, final String trustStoreCertType,
+ final String keyStorePath, final String keyStorePassword,
+ final String keyStoreCertType, final String certAlias)
+ throws GeneralSecurityException, IOException
+ {
// Initialize the SSLContext to work with our key managers.
- SSLContext sslContext = SSLContext.getInstance("TLS");
-
- if (customKeyManager != null)
+ final SSLContext sslContext = SSLContext
+ .getInstance(TRANSPORT_LAYER_SECURITY_CODE);
+
+ final TrustManager[] trustManagers;
+ final KeyManager[] keyManagers;
+
+ if (trustStorePath != null)
{
- sslContext.init(new KeyManager[]{customKeyManager},
- tmf.getTrustManagers(), null);
-
+ final KeyStore ts = SSLUtil.getInitializedKeyStore(trustStorePath,
+ trustStorePassword);
+ final TrustManagerFactory tmf = TrustManagerFactory
+ .getInstance(trustStoreCertType);
+ tmf.init(ts);
+
+ trustManagers = tmf.getTrustManagers();
}
else
{
- // Create keystore
- KeyStore ks = SSLUtil.getInitializedKeyStore(_keyStorePath,_keyStorePassword);
- // Set up key manager factory to use our key store
- KeyManagerFactory kmf = KeyManagerFactory.getInstance(_keyStoreCertType);
- kmf.init(ks, _keyStorePassword.toCharArray());
+ trustManagers = null;
+ }
- sslContext.init(kmf.getKeyManagers(), tmf.getTrustManagers(), null);
+ if (keyStorePath != null)
+ {
+ if (certAlias != null)
+ {
+ keyManagers = new KeyManager[] { new QpidClientX509KeyManager(
+ certAlias, keyStorePath, keyStorePassword,
+ keyStoreCertType) };
+ }
+ else
+ {
+ final KeyStore ks = SSLUtil.getInitializedKeyStore(
+ keyStorePath, keyStorePassword);
+
+ char[] keyStoreCharPassword = keyStorePassword == null ? null : keyStorePassword.toCharArray();
+ // Set up key manager factory to use our key store
+ final KeyManagerFactory kmf = KeyManagerFactory
+ .getInstance(keyStoreCertType);
+ kmf.init(ks, keyStoreCharPassword);
+ keyManagers = kmf.getKeyManagers();
+ }
}
-
- return sslContext;
- }
-
- /**
- * Creates a SSLContext factory appropriate for use with a client
- * @return SSLContext
- * @throws GeneralSecurityException
- * @throws IOException
- */
- public SSLContext buildClientContext() throws GeneralSecurityException, IOException
- {
- KeyStore ks = SSLUtil.getInitializedKeyStore(_trustStorePath,_trustStorePassword);
- TrustManagerFactory tmf = TrustManagerFactory.getInstance(_trustStoreCertType);
- tmf.init(ks);
- SSLContext context = SSLContext.getInstance("TLS");
- context.init(null, tmf.getTrustManagers(), null);
- return context;
- }
-
+ else
+ {
+ keyManagers = null;
+ }
+
+ sslContext.init(keyManagers, trustManagers, null);
+
+ return sslContext;
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/thread/QpidThreadExecutor.java Mon Sep 19 15:13:18 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.thread;
import org.apache.qpid.thread.Threading;
-import edu.emory.mathcs.backport.java.util.concurrent.Executor;
+import java.util.concurrent.Executor;
public class QpidThreadExecutor implements Executor
{
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ClientDelegate.java Mon Sep 19 15:13:18 2011
@@ -20,28 +20,20 @@
*/
package org.apache.qpid.transport;
-import org.ietf.jgss.GSSContext;
-import org.ietf.jgss.GSSException;
-import org.ietf.jgss.GSSManager;
-import org.ietf.jgss.GSSName;
-import org.ietf.jgss.Oid;
-
-import org.apache.qpid.security.UsernamePasswordCallbackHandler;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.RESUMING;
-import org.apache.qpid.transport.util.Logger;
-import javax.security.sasl.Sasl;
-import javax.security.sasl.SaslClient;
-import javax.security.sasl.SaslException;
import java.lang.management.ManagementFactory;
import java.lang.management.RuntimeMXBean;
-import java.util.ArrayList;
-import java.util.Arrays;
import java.util.HashMap;
import java.util.List;
import java.util.Map;
+import javax.security.sasl.SaslClient;
+import javax.security.sasl.SaslException;
+
+import org.apache.qpid.transport.util.Logger;
+
/**
* ClientDelegate
@@ -52,31 +44,13 @@ public class ClientDelegate extends Conn
{
private static final Logger log = Logger.get(ClientDelegate.class);
- private static final String KRB5_OID_STR = "1.2.840.113554.1.2.2";
- protected static final Oid KRB5_OID;
- static
- {
- Oid oid;
- try
- {
- oid = new Oid(KRB5_OID_STR);
- }
- catch (GSSException ignore)
- {
- oid = null;
- }
- KRB5_OID = oid;
- }
-
- private List<String> clientMechs;
- private ConnectionSettings conSettings;
+ protected final ConnectionSettings _conSettings;
public ClientDelegate(ConnectionSettings settings)
{
- this.conSettings = settings;
- this.clientMechs = Arrays.asList(settings.getSaslMechs().split(" "));
+ this._conSettings = settings;
}
public void init(Connection conn, ProtocolHeader hdr)
@@ -92,9 +66,9 @@ public class ClientDelegate extends Conn
{
Map<String,Object> clientProperties = new HashMap<String,Object>();
- if(this.conSettings.getClientProperties() != null)
+ if(this._conSettings.getClientProperties() != null)
{
- clientProperties.putAll(this.conSettings.getClientProperties());
+ clientProperties.putAll(_conSettings.getClientProperties());
}
clientProperties.put("qpid.session_flow", 1);
@@ -109,41 +83,12 @@ public class ClientDelegate extends Conn
(clientProperties, null, null, conn.getLocale());
return;
}
-
- List<String> choosenMechs = new ArrayList<String>();
- for (String mech:clientMechs)
- {
- if (brokerMechs.contains(mech))
- {
- choosenMechs.add(mech);
- }
- }
-
- if (choosenMechs.size() == 0)
- {
- conn.exception(new ConnectionException("The following SASL mechanisms " +
- clientMechs.toString() +
- " specified by the client are not supported by the broker"));
- return;
- }
-
- String[] mechs = new String[choosenMechs.size()];
- choosenMechs.toArray(mechs);
-
conn.setServerProperties(start.getServerProperties());
try
{
- Map<String,Object> saslProps = new HashMap<String,Object>();
- if (conSettings.isUseSASLEncryption())
- {
- saslProps.put(Sasl.QOP, "auth-conf");
- }
- UsernamePasswordCallbackHandler handler =
- new UsernamePasswordCallbackHandler();
- handler.initialise(conSettings.getUsername(), conSettings.getPassword());
- SaslClient sc = Sasl.createSaslClient
- (mechs, null, conSettings.getSaslProtocol(), conSettings.getSaslServerName(), saslProps, handler);
+ final SaslClient sc = createSaslClient(brokerMechs);
+
conn.setSaslClient(sc);
byte[] response = sc.hasInitialResponse() ?
@@ -152,12 +97,22 @@ public class ClientDelegate extends Conn
(clientProperties, sc.getMechanismName(), response,
conn.getLocale());
}
+ catch (ConnectionException ce)
+ {
+ conn.exception(ce);
+ }
catch (SaslException e)
{
conn.exception(e);
}
}
+
+ protected SaslClient createSaslClient(List<Object> brokerMechs) throws ConnectionException, SaslException
+ {
+ throw new UnsupportedOperationException();
+ }
+
@Override
public void connectionSecure(Connection conn, ConnectionSecure secure)
{
@@ -176,7 +131,7 @@ public class ClientDelegate extends Conn
@Override
public void connectionTune(Connection conn, ConnectionTune tune)
{
- int hb_interval = calculateHeartbeatInterval(conSettings.getHeartbeatInterval(),
+ int hb_interval = calculateHeartbeatInterval(_conSettings.getHeartbeatInterval(),
tune.getHeartbeatMin(),
tune.getHeartbeatMax()
);
@@ -191,32 +146,12 @@ public class ClientDelegate extends Conn
//(or that forced by protocol limitations [0xFFFF])
conn.setChannelMax(channelMax == 0 ? Connection.MAX_CHANNEL_MAX : channelMax);
- conn.connectionOpen(conSettings.getVhost(), null, Option.INSIST);
+ conn.connectionOpen(_conSettings.getVhost(), null, Option.INSIST);
}
@Override
public void connectionOpenOk(Connection conn, ConnectionOpenOk ok)
{
- SaslClient sc = conn.getSaslClient();
- if (sc != null)
- {
- if (sc.getMechanismName().equals("GSSAPI"))
- {
- String id = getKerberosUser();
- if (id != null)
- {
- conn.setUserID(id);
- }
- }
- else if (sc.getMechanismName().equals("EXTERNAL"))
- {
- if (conn.getSecurityLayer() != null)
- {
- conn.setUserID(conn.getSecurityLayer().getUserID());
- }
- }
- }
-
if (conn.isConnectionResuming())
{
conn.setState(RESUMING);
@@ -286,35 +221,7 @@ public class ClientDelegate extends Conn
}
- private String getKerberosUser()
- {
- log.debug("Obtaining userID from kerberos");
- String service = conSettings.getSaslProtocol() + "@" + conSettings.getSaslServerName();
- GSSManager manager = GSSManager.getInstance();
-
- try
- {
- GSSName acceptorName = manager.createName(service,
- GSSName.NT_HOSTBASED_SERVICE, KRB5_OID);
-
- GSSContext secCtx = manager.createContext(acceptorName,
- KRB5_OID,
- null,
- GSSContext.INDEFINITE_LIFETIME);
- secCtx.initSecContext(new byte[0], 0, 1);
- if (secCtx.getSrcName() != null)
- {
- return secCtx.getSrcName().toString();
- }
- }
- catch (GSSException e)
- {
- log.warn("Unable to retrieve userID from Kerberos due to error",e);
- }
-
- return null;
- }
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/Connection.java Mon Sep 19 15:13:18 2011
@@ -25,7 +25,6 @@ import static org.apache.qpid.transport.
import static org.apache.qpid.transport.Connection.State.NEW;
import static org.apache.qpid.transport.Connection.State.OPEN;
import static org.apache.qpid.transport.Connection.State.OPENING;
-import static org.apache.qpid.transport.Connection.State.RESUMING;
import java.nio.ByteBuffer;
import java.util.ArrayList;
@@ -36,18 +35,19 @@ import java.util.List;
import java.util.Map;
import java.util.UUID;
import java.util.concurrent.atomic.AtomicBoolean;
-import java.util.concurrent.atomic.AtomicLong;
import javax.security.sasl.SaslClient;
import javax.security.sasl.SaslServer;
+import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.transport.network.Assembler;
import org.apache.qpid.transport.network.Disassembler;
import org.apache.qpid.transport.network.InputHandler;
import org.apache.qpid.transport.network.NetworkConnection;
import org.apache.qpid.transport.network.OutgoingNetworkTransport;
-import org.apache.qpid.transport.network.io.IoNetworkTransport;
+import org.apache.qpid.transport.network.Transport;
import org.apache.qpid.transport.network.security.SecurityLayer;
+import org.apache.qpid.transport.network.security.SecurityLayerFactory;
import org.apache.qpid.transport.util.Logger;
import org.apache.qpid.transport.util.Waiter;
import org.apache.qpid.util.Strings;
@@ -73,6 +73,7 @@ public class Connection extends Connecti
public static final int MAX_CHANNEL_MAX = 0xFFFF;
public static final int MIN_USABLE_CHANNEL_NUM = 0;
+
public enum State { NEW, CLOSED, OPENING, OPEN, CLOSING, CLOSE_RCVD, RESUMING }
static class DefaultConnectionListener implements ConnectionListener
@@ -120,17 +121,14 @@ public class Connection extends Connecti
private SaslServer saslServer;
private SaslClient saslClient;
private int idleTimeout = 0;
- private String _authorizationID;
private Map<String,Object> _serverProperties;
private String userID;
private ConnectionSettings conSettings;
private SecurityLayer securityLayer;
private String _clientId;
-
- private static final AtomicLong idGenerator = new AtomicLong(0);
- private final long _connectionId = idGenerator.incrementAndGet();
+
private final AtomicBoolean connectionLost = new AtomicBoolean(false);
-
+
public Connection() {}
public void setConnectionDelegate(ConnectionDelegate delegate)
@@ -241,15 +239,23 @@ public class Connection extends Connecti
conSettings = settings;
state = OPENING;
userID = settings.getUsername();
- delegate = new ClientDelegate(settings);
- securityLayer = new SecurityLayer();
- securityLayer.init(this);
+ securityLayer = SecurityLayerFactory.newInstance(getConnectionSettings());
- OutgoingNetworkTransport transport = new IoNetworkTransport();
- Receiver<ByteBuffer> receiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
- NetworkConnection network = transport.connect(settings, receiver, null);
- sender = new Disassembler(securityLayer.sender(network.getSender()), settings.getMaxFrameSize());
+ OutgoingNetworkTransport transport = Transport.getOutgoingTransportInstance(ProtocolVersion.v0_10);
+ Receiver<ByteBuffer> secureReceiver = securityLayer.receiver(new InputHandler(new Assembler(this)));
+ if(secureReceiver instanceof ConnectionListener)
+ {
+ addConnectionListener((ConnectionListener)secureReceiver);
+ }
+
+ NetworkConnection network = transport.connect(settings, secureReceiver, null);
+ final Sender<ByteBuffer> secureSender = securityLayer.sender(network.getSender());
+ if(secureSender instanceof ConnectionListener)
+ {
+ addConnectionListener((ConnectionListener)secureSender);
+ }
+ sender = new Disassembler(secureSender, settings.getMaxFrameSize());
send(new ProtocolHeader(1, 0, 10));
@@ -331,23 +337,31 @@ public class Connection extends Connecti
Waiter w = new Waiter(lock, timeout);
while (w.hasTime() && state != OPEN && error == null)
{
- w.await();
+ w.await();
}
-
+
if (state != OPEN)
{
throw new ConnectionException("Timed out waiting for connection to be ready. Current state is :" + state);
}
-
+
Session ssn = _sessionFactory.newSession(this, name, expiry);
- sessions.put(name, ssn);
+ registerSession(ssn);
map(ssn);
ssn.attach();
return ssn;
}
}
- void removeSession(Session ssn)
+ public void registerSession(Session ssn)
+ {
+ synchronized (lock)
+ {
+ sessions.put(ssn.getName(),ssn);
+ }
+ }
+
+ public void removeSession(Session ssn)
{
synchronized (lock)
{
@@ -362,11 +376,6 @@ public class Connection extends Connecti
_sessionFactory = sessionFactory;
}
- public long getConnectionId()
- {
- return _connectionId;
- }
-
public ConnectionDelegate getConnectionDelegate()
{
return delegate;
@@ -415,7 +424,7 @@ public class Connection extends Connecti
else
{
throw new ProtocolViolationException(
- "Received frames for an already dettached session", null);
+ "Received frames for an already detached session", null);
}
}
@@ -464,7 +473,7 @@ public class Connection extends Connecti
}
}
- protected Session getSession(int channel)
+ public Session getSession(int channel)
{
synchronized (lock)
{
@@ -485,13 +494,13 @@ public class Connection extends Connecti
ssn.setState(Session.State.CLOSED);
}
else
- {
+ {
map(ssn);
ssn.attach();
ssn.resume();
}
}
-
+
for (Binary ssn_name : transactedSessions)
{
sessions.remove(ssn_name);
@@ -582,12 +591,12 @@ public class Connection extends Connecti
{
close(ConnectionCloseCode.NORMAL, null);
}
-
+
public void mgmtClose()
{
close(ConnectionCloseCode.CONNECTION_FORCED, "The connection was closed using the broker's management interface.");
}
-
+
public void close(ConnectionCloseCode replyCode, String replyText, Option ... _options)
{
synchronized (lock)
@@ -661,16 +670,6 @@ public class Connection extends Connecti
return idleTimeout;
}
- public void setAuthorizationID(String authorizationID)
- {
- _authorizationID = authorizationID;
- }
-
- public String getAuthorizationID()
- {
- return _authorizationID;
- }
-
public String getUserID()
{
return userID;
@@ -700,12 +699,12 @@ public class Connection extends Connecti
{
return conSettings;
}
-
+
public SecurityLayer getSecurityLayer()
{
return securityLayer;
}
-
+
public boolean isConnectionResuming()
{
return connectionLost.get();
@@ -715,4 +714,9 @@ public class Connection extends Connecti
{
return channels.values();
}
+
+ public boolean hasSessionWithName(final String name)
+ {
+ return sessions.containsKey(new Binary(name.getBytes()));
+ }
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionDelegate.java Mon Sep 19 15:13:18 2011
@@ -85,7 +85,7 @@ public abstract class ConnectionDelegate
@Override public void sessionDetach(Connection conn, SessionDetach dtc)
{
Session ssn = conn.getSession(dtc.getChannel());
- ssn.sessionDetached(dtc.getName(), SessionDetachCode.NORMAL);
+ ssn.sessionDetached(dtc.getName(), ssn.getDetachCode() == null? SessionDetachCode.NORMAL: ssn.getDetachCode());
conn.unmap(ssn);
ssn.closed();
}
@@ -95,6 +95,7 @@ public abstract class ConnectionDelegate
Session ssn = conn.getSession(dtc.getChannel());
if (ssn != null)
{
+ ssn.setDetachCode(dtc.getCode());
conn.unmap(ssn);
ssn.closed();
}
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ConnectionSettings.java Mon Sep 19 15:13:18 2011
@@ -58,7 +58,7 @@ public class ConnectionSettings
boolean verifyHostname;
// SASL props
- String saslMechs = System.getProperty("qpid.sasl_mechs", "PLAIN");
+ String saslMechs = System.getProperty("qpid.sasl_mechs", null);
String saslProtocol = System.getProperty("qpid.sasl_protocol", "AMQP");
String saslServerName = System.getProperty("qpid.sasl_server_name", "localhost");
boolean useSASLEncryption;
Modified: qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java
URL: http://svn.apache.org/viewvc/qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java?rev=1172657&r1=1172656&r2=1172657&view=diff
==============================================================================
--- qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java (original)
+++ qpid/branches/qpid-3346/qpid/java/common/src/main/java/org/apache/qpid/transport/ServerDelegate.java Mon Sep 19 15:13:18 2011
@@ -75,10 +75,7 @@ public class ServerDelegate extends Conn
if (mechanism == null || mechanism.length() == 0)
{
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
+ tuneAuthorizedConnection(conn);
return;
}
@@ -97,8 +94,7 @@ public class ServerDelegate extends Conn
}
catch (SaslException e)
{
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ connectionAuthFailed(conn, e);
}
}
@@ -109,33 +105,52 @@ public class ServerDelegate extends Conn
return ss;
}
- private void secure(Connection conn, byte[] response)
+ protected void secure(final SaslServer ss, final Connection conn, final byte[] response)
{
- SaslServer ss = conn.getSaslServer();
try
{
byte[] challenge = ss.evaluateResponse(response);
if (ss.isComplete())
{
ss.dispose();
- conn.connectionTune
- (getChannelMax(),
- org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
- 0, getHeartbeatMax());
- conn.setAuthorizationID(ss.getAuthorizationID());
+ tuneAuthorizedConnection(conn);
}
else
{
- conn.connectionSecure(challenge);
+ connectionAuthContinue(conn, challenge);
}
}
catch (SaslException e)
{
- conn.exception(e);
- conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ connectionAuthFailed(conn, e);
}
}
+ protected void connectionAuthFailed(final Connection conn, Exception e)
+ {
+ conn.exception(e);
+ conn.connectionClose(ConnectionCloseCode.CONNECTION_FORCED, e.getMessage());
+ }
+
+ protected void connectionAuthContinue(final Connection conn, byte[] challenge)
+ {
+ conn.connectionSecure(challenge);
+ }
+
+ protected void tuneAuthorizedConnection(final Connection conn)
+ {
+ conn.connectionTune
+ (getChannelMax(),
+ org.apache.qpid.transport.network.ConnectionBinding.MAX_FRAME_SIZE,
+ 0, getHeartbeatMax());
+ }
+
+ protected void secure(final Connection conn, final byte[] response)
+ {
+ final SaslServer ss = conn.getSaslServer();
+ secure(ss, conn, response);
+ }
+
protected int getHeartbeatMax()
{
return 0xFFFF;
@@ -155,22 +170,7 @@ public class ServerDelegate extends Conn
@Override
public void connectionTuneOk(Connection conn, ConnectionTuneOk ok)
{
- int okChannelMax = ok.getChannelMax();
-
- if (okChannelMax > getChannelMax())
- {
- _logger.error("Connection '" + conn.getConnectionId() + "' being severed, " +
- "client connectionTuneOk returned a channelMax (" + okChannelMax +
- ") above the servers offered limit (" + getChannelMax() +")");
- //Due to the error we must forcefully close the connection without negotiation
- conn.getSender().close();
- return;
- }
-
- //0 means no implied limit, except available server resources
- //(or that forced by protocol limitations [0xFFFF])
- conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
}
@Override
@@ -200,4 +200,11 @@ public class ServerDelegate extends Conn
ssn.sessionAttached(atc.getName());
ssn.setState(Session.State.OPEN);
}
+
+ protected void setConnectionTuneOkChannelMax(final Connection conn, final int okChannelMax)
+ {
+ //0 means no implied limit, except available server resources
+ //(or that forced by protocol limitations [0xFFFF])
+ conn.setChannelMax(okChannelMax == 0 ? Connection.MAX_CHANNEL_MAX : okChannelMax);
+ }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org