You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC
svn commit: r1187150 [33/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Fri Oct 21 01:19:00 2011
@@ -300,6 +300,53 @@ public class ConnectionURLTest extends T
assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
}
+ public void testSinglevmURL() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@/test?brokerlist='vm://:2'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod() == null);
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ assertTrue(connectionurl.getBrokerCount() == 1);
+
+ BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+ assertTrue(service.getTransport().equals("vm"));
+ assertTrue(service.getHost().equals(""));
+ assertTrue(service.getPort() == 2);
+
+ }
+
+ public void testFailoverVMURL() throws URLSyntaxException
+ {
+ String url = "amqp://ritchiem:bob@/test?brokerlist='vm://:2;vm://:3',failover='roundrobin'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
+ assertTrue(connectionurl.getUsername().equals("ritchiem"));
+ assertTrue(connectionurl.getPassword().equals("bob"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ assertTrue(connectionurl.getBrokerCount() == 2);
+
+ BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+ assertTrue(service.getTransport().equals("vm"));
+ assertTrue(service.getHost().equals(""));
+ assertTrue(service.getPort() == 2);
+
+ service = connectionurl.getBrokerDetails(1);
+ assertTrue(service.getTransport().equals("vm"));
+ assertTrue(service.getHost().equals(""));
+ assertTrue(service.getPort() == 3);
+ }
+
+
public void testNoVirtualHostURL()
{
String url = "amqp://user@?brokerlist='tcp://localhost:5672'";
@@ -440,6 +487,27 @@ public class ConnectionURLTest extends T
}
+ public void testSocketProtocol() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'";
+
+ try
+ {
+ AMQConnectionURL curl = new AMQConnectionURL(url);
+ assertNotNull(curl);
+ assertEquals(1, curl.getBrokerCount());
+ assertNotNull(curl.getBrokerDetails(0));
+ assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport());
+ assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost());
+ assertEquals("URL does not toString as expected",
+ url.replace(":guest", ":********"), curl.toString());
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
+ }
+
public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException
{
String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
@@ -481,37 +549,6 @@ public class ConnectionURLTest extends T
assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'"));
}
- public void testHostNamesWithUnderScore() throws URLSyntaxException
- {
- String url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score:6672'";
-
- ConnectionURL connectionurl = new AMQConnectionURL(url);
-
- assertTrue(connectionurl.getUsername().equals("guest"));
- assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/test"));
-
- assertTrue(connectionurl.getBrokerCount() == 1);
- BrokerDetails service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
- assertTrue(service.getHost().equals("under_score"));
- assertTrue(service.getPort() == 6672);
-
- url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'";
-
- connectionurl = new AMQConnectionURL(url);
-
- assertTrue(connectionurl.getUsername().equals("guest"));
- assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/test"));
-
- assertTrue(connectionurl.getBrokerCount() == 1);
- service = connectionurl.getBrokerDetails(0);
- assertTrue(service.getTransport().equals("tcp"));
- assertTrue(service.getHost().equals("under_score"));
- assertTrue(service.getPort() == 5672);
- }
-
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Fri Oct 21 01:19:00 2011
@@ -21,10 +21,10 @@
package org.apache.qpid.test.unit.jndi;
import junit.framework.TestCase;
-
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLSyntaxException;
public class ConnectionFactoryTest extends TestCase
{
@@ -34,9 +34,21 @@ public class ConnectionFactoryTest exten
public static final String URL = "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
public static final String URL_STAR_PWD = "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
- public void testConnectionURLStringMasksPassword() throws Exception
+ public void testConnectionURLString()
{
- AMQConnectionFactory factory = new AMQConnectionFactory(URL);
+ AMQConnectionFactory factory = new AMQConnectionFactory();
+
+ assertNull("ConnectionURL should have no value at start",
+ factory.getConnectionURL());
+
+ try
+ {
+ factory.setConnectionURLString(URL);
+ }
+ catch (URLSyntaxException e)
+ {
+ fail(e.getMessage());
+ }
//URL will be returned with the password field swapped for '********'
assertEquals("Connection URL not correctly set", URL_STAR_PWD, factory.getConnectionURLString());
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@ import java.util.Properties;
import javax.jms.Queue;
import javax.jms.Topic;
-import javax.naming.ConfigurationException;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -68,22 +67,4 @@ public class JNDIPropertyFileTest extend
assertEquals("Topic" + i + "WithSpace",bindingKey.asString());
}
}
-
- public void testConfigurationErrors() throws Exception
- {
- Properties properties = new Properties();
- properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
- properties.put("destination.my-queue","amq.topic/test;create:always}");
-
- try
- {
- ctx = new InitialContext(properties);
- fail("A configuration exception should be thrown with details about the address syntax error");
- }
- catch(ConfigurationException e)
- {
- assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
- }
-
- }
}
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Fri Oct 21 01:19:00 2011
@@ -20,24 +20,17 @@
*/
package org.apache.qpid.test.unit.message;
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BasicMessageConsumer_0_8;
-import org.apache.qpid.client.BasicMessageProducer_0_8;
-import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.*;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import javax.jms.*;
+
+import java.util.Map;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -64,12 +57,7 @@ public class TestAMQSession extends AMQS
}
- public void commitImpl() throws AMQException, FailoverException
- {
-
- }
-
- public void acknowledgeImpl()
+ public void sendCommit() throws AMQException, FailoverException
{
}
@@ -129,7 +117,7 @@ public class TestAMQSession extends AMQS
}
- public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, long producerId)
+ public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
{
return null;
}
@@ -207,10 +195,4 @@ public class TestAMQSession extends AMQS
{
return false;
}
-
- @Override
- public AMQException getLastException()
- {
- return null;
- }
}
Modified: qpid/branches/QPID-2519/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common.xml?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common.xml (original)
+++ qpid/branches/QPID-2519/java/common.xml Fri Oct 21 01:19:00 2011
@@ -23,9 +23,7 @@
<dirname property="project.root" file="${ant.file.common}"/>
<property name="project.name" value="qpid"/>
- <property name="project.version" value="0.13"/>
- <property name="project.url" value="http://qpid.apache.org"/>
- <property name="project.groupid" value="org.apache.qpid"/>
+ <property name="project.version" value="0.9"/>
<property name="project.namever" value="${project.name}-${project.version}"/>
<property name="resources" location="${project.root}/resources"/>
@@ -42,6 +40,7 @@
<property name="build.report" location="${build}/report"/>
<property name="build.release" location="${build}/release"/>
<property name="build.release.prepare" location="${build.release}/prepare"/>
+ <property name="build.data" location="${build.scratch}/data"/>
<property name="build.plugins" location="${build}/lib/plugins"/>
<property name="build.coveragereport" location="${build}/coverage"/>
<property name="build.findbugs" location="${build}/findbugs"/>
@@ -64,11 +63,6 @@
<property name="mllib.dir" value="${project.root}/../python" />
<property name="findbugs.dir" value="${project.root}/lib/findbugs" />
- <!-- properties used to control Ant Eclipse for Eclipse classpath/project files etc -->
- <property name="eclipse.updatealways" value="false"/>
- <property name="eclipse.compilercompliance" value="5.0"/>
- <property name="eclipse.container" value="JVM 1.5"/>
-
<path id="cobertura.classpath">
<fileset dir="${cobertura.dir}">
<include name="cobertura.jar" />
@@ -77,7 +71,6 @@
</path>
<property name="maven.local.repo" value="${build.scratch}/maven-local-repo"/>
- <property name="maven.settings.xml" value="${project.root}/maven-settings.xml"/>
<property name="maven.unique.version" value="false"/>
<property name="maven.snapshot" value="true"/>
<condition property="maven.version.suffix" value="" else="-SNAPSHOT">
@@ -131,6 +124,8 @@
</sequential>
</macrodef>
+
+
<macrodef name="jython">
<attribute name="path"/>
<element name="args"/>
@@ -327,20 +322,6 @@
results directory:
${build.results}
-
- ant eclipse
-
- Generates project and classpath files for the Eclispe IDE. Requires that
- the Ant Eclipse task (http://ant-eclipse.sourceforge.net/) has been installed
- in $ANT_HOME/lib.
-
- The following system properties will be passed to the task. These can be usefully
- overridden from the command line.
-
- eclipse.updatealways - forces Eclipse files to be regenerated even if they are newer then the build.xml (default ${eclipse.updatealways}).
- eclipse.container - controls the Eclipse container (default ${eclipse.container}).
- eclipse.compilercompliance" - controls the Eclipse compiler compliance (default ${eclipse.compilercompliance}).
-
</echo>
</target>
Modified: qpid/branches/QPID-2519/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/bin/qpid-run?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/bin/qpid-run (original)
+++ qpid/branches/QPID-2519/java/common/bin/qpid-run Fri Oct 21 01:19:00 2011
@@ -77,10 +77,7 @@ fi
#Set the default system properties that we'll use now that they have
#all been initialised
-declare -a SYSTEM_PROPS
-SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL"
-SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPID_HOME=$QPID_HOME"
-SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPID_WORK=$QPID_WORK"
+SYSTEM_PROPS="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK"
#If logprefix or logsuffix set to use PID make that happen
#Otherwise just pass the value through for these props
@@ -93,7 +90,7 @@ if [ -n "$QPID_LOG_PREFIX" ]; then
log $INFO Using qpid logprefix property
LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
fi
- SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_PREFIX}"
+ SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_PREFIX}"
fi
if [ -n "$QPID_LOG_SUFFIX" ]; then
@@ -104,10 +101,10 @@ if [ -n "$QPID_LOG_SUFFIX" ]; then
log $INFO Using qpig logsuffix property
LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
fi
- SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_SUFFIX}"
+ SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_SUFFIX}"
fi
-log $INFO System Properties set to ${SYSTEM_PROPS[@]}
+log $INFO System Properties set to $SYSTEM_PROPS
log $INFO QPID_OPTS set to $QPID_OPTS
program=$(basename $0)
@@ -257,6 +254,6 @@ if $cygwin; then
JAVA=$(cygpath -u $JAVA)
fi
-COMMAND=($JAVA $JAVA_VM $QPID_PNAME $JAVA_GC $JAVA_MEM "${SYSTEM_PROPS[@]}" $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
+COMMAND=($JAVA $JAVA_VM $QPID_PNAME $JAVA_GC $JAVA_MEM $SYSTEM_PROPS $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
DISPATCH
Modified: qpid/branches/QPID-2519/java/common/src/main/java/common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/common.bnd?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/common.bnd (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/common.bnd Fri Oct 21 01:19:00 2011
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.13.0
+ver: 0.9.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java Fri Oct 21 01:19:00 2011
@@ -54,7 +54,6 @@ public class AMQChannelException extends
public AMQFrame getCloseFrame(int channel)
{
MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
- return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), getMessageAsShortString(),_classId,_methodId));
+ return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
}
-
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java Fri Oct 21 01:19:00 2011
@@ -62,10 +62,9 @@ public class AMQConnectionException exte
MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
return new AMQFrame(0,
reg.createConnectionCloseBody(getErrorCode().getCode(),
- getMessageAsShortString(),
+ new AMQShortString(getMessage()),
_classId,
_methodId));
}
-
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java Fri Oct 21 01:19:00 2011
@@ -20,7 +20,6 @@
*/
package org.apache.qpid;
-import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
/**
@@ -122,19 +121,4 @@ public class AMQException extends Except
return newAMQE;
}
-
- /**
- * Truncates the exception message to 255 characters if its length exceeds 255.
- *
- * @return exception message
- */
- public AMQShortString getMessageAsShortString()
- {
- String message = getMessage();
- if (message != null && message.length() > AMQShortString.MAX_LENGTH)
- {
- message = message.substring(0, AMQShortString.MAX_LENGTH - 3) + "...";
- }
- return new AMQShortString(message);
- }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java Fri Oct 21 01:19:00 2011
@@ -34,7 +34,7 @@ public class AMQInvalidArgumentException
{
public AMQInvalidArgumentException(String message, Throwable cause)
{
- super(AMQConstant.ARGUMENT_INVALID, message, cause);
+ super(AMQConstant.INVALID_ARGUMENT, message, cause);
}
public boolean isHardError()
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Fri Oct 21 01:19:00 2011
@@ -20,6 +20,9 @@
*/
package org.apache.qpid.codec;
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -28,11 +31,14 @@ import org.apache.qpid.protocol.AMQVersi
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations.
+ * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
* <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
* </table>
*/
-public class AMQCodecFactory
+public class AMQCodecFactory implements ProtocolCodecFactory
{
+ /** Holds the protocol encoder. */
+ private final AMQEncoder _encoder = new AMQEncoder();
/** Holds the protocol decoder. */
private final AMQDecoder _frameDecoder;
@@ -50,6 +56,15 @@ public class AMQCodecFactory
_frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
}
+ /**
+ * Gets the AMQP encoder.
+ *
+ * @return The AMQP encoder.
+ */
+ public ProtocolEncoder getEncoder()
+ {
+ return _encoder;
+ }
/**
* Gets the AMQP decoder.
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,13 @@
*/
package org.apache.qpid.codec;
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -50,8 +54,11 @@ import org.apache.qpid.protocol.AMQVersi
* @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
* per-session overhead.
*/
-public class AMQDecoder
+public class AMQDecoder extends CumulativeProtocolDecoder
{
+
+ private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -60,11 +67,12 @@ public class AMQDecoder
/** Flag to indicate whether this decoder needs to handle protocol initiation. */
private boolean _expectProtocolInitiation;
+ private boolean firstDecode = true;
private AMQMethodBodyFactory _bodyFactory;
- private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
-
+ private ByteBuffer _remainingBuf;
+
/**
* Creates a new AMQP decoder.
*
@@ -76,7 +84,98 @@ public class AMQDecoder
_bodyFactory = new AMQMethodBodyFactory(session);
}
+ /**
+ * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
+ * intiation decoders.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+
+ boolean decoded;
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (in.remaining() > 0)
+ && (in.get(in.position()) == (byte)'A')))
+ {
+ decoded = doDecodePI(session, in, out);
+ }
+ else
+ {
+ decoded = doDecodeDataBlock(session, in, out);
+ }
+ if(firstDecode && decoded)
+ {
+ firstDecode = false;
+ }
+ return decoded;
+ }
+
+ /**
+ * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ int pos = in.position();
+ boolean enoughData = _dataBlockDecoder.decodable(in.buf());
+ in.position(pos);
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ _dataBlockDecoder.decode(session, in, out);
+
+ return true;
+ }
+ }
+
+ /**
+ * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
+ *
+ * @param session The Mina session.
+ * @param in The raw byte buffer.
+ * @param out The Mina object output gatherer to write decoded objects to.
+ *
+ * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+ *
+ * @throws Exception If the data cannot be decoded for any reason.
+ */
+ private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ boolean enoughData = _piDecoder.decodable(in.buf());
+ if (!enoughData)
+ {
+ // returning false means it will leave the contents in the buffer and
+ // call us again when more data has been read
+ return false;
+ }
+ else
+ {
+ ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+ out.write(pi);
+ return true;
+ }
+ }
/**
* Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
@@ -90,169 +189,152 @@ public class AMQDecoder
_expectProtocolInitiation = expectProtocolInitiation;
}
- private class RemainingByteArrayInputStream extends InputStream
- {
- private int _currentListPos;
- private int _markPos;
-
- @Override
- public int read() throws IOException
+ /**
+ * Cumulates content of <tt>in</tt> into internal buffer and forwards
+ * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+ * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+ * and the cumulative buffer is compacted after decoding ends.
+ *
+ * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+ * <tt>true</tt> not consuming the cumulative buffer.
+ */
+ public void decode( IoSession session, ByteBuffer in,
+ ProtocolDecoderOutput out ) throws Exception
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( buf != null )
{
- ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
- if(currentStream.available() > 0)
- {
- return currentStream.read();
- }
- else if((_currentListPos == _remainingBufs.size())
- || (++_currentListPos == _remainingBufs.size()))
- {
- return -1;
- }
- else
- {
-
- ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
- stream.mark(0);
- return stream.read();
- }
+ buf.put( in );
+ buf.flip();
}
-
- @Override
- public int read(final byte[] b, final int off, final int len) throws IOException
+ else
{
+ buf = in;
+ }
- if(_currentListPos == _remainingBufs.size())
- {
- return -1;
- }
- else
+ for( ;; )
+ {
+ int oldPos = buf.position();
+ boolean decoded = doDecode( session, buf, out );
+ if( decoded )
{
- ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
- final int available = currentStream.available();
- int read = currentStream.read(b, off, len > available ? available : len);
- if(read < len)
+ if( buf.position() == oldPos )
{
- if(_currentListPos++ != _remainingBufs.size())
- {
- _remainingBufs.get(_currentListPos).mark(0);
- }
- int correctRead = read == -1 ? 0 : read;
- int subRead = read(b, off+correctRead, len-correctRead);
- if(subRead == -1)
- {
- return read;
- }
- else
- {
- return correctRead+subRead;
- }
+ throw new IllegalStateException(
+ "doDecode() can't return true when buffer is not consumed." );
}
- else
+
+ if( !buf.hasRemaining() )
{
- return len;
+ break;
}
}
- }
-
- @Override
- public int available() throws IOException
- {
- int total = 0;
- for(int i = _currentListPos; i < _remainingBufs.size(); i++)
+ else
{
- total += _remainingBufs.get(i).available();
+ break;
}
- return total;
}
- @Override
- public void mark(final int readlimit)
+ // if there is any data left that cannot be decoded, we store
+ // it in a buffer in the session and next time this decoder is
+ // invoked the session buffer gets appended to
+ if ( buf.hasRemaining() )
{
- _markPos = _currentListPos;
- final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
- if(stream != null)
- {
- stream.mark(readlimit);
- }
+ storeRemainingInSession( buf, session );
}
+ else
+ {
+ removeSessionBuffer( session );
+ }
+ }
+
+ /**
+ * Releases the cumulative buffer used by the specified <tt>session</tt>.
+ * Please don't forget to call <tt>super.dispose( session )</tt> when
+ * you override this method.
+ */
+ public void dispose( IoSession session ) throws Exception
+ {
+ removeSessionBuffer( session );
+ }
- @Override
- public void reset() throws IOException
+ private void removeSessionBuffer(IoSession session)
+ {
+ ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+ if( buf != null )
{
- _currentListPos = _markPos;
- final int size = _remainingBufs.size();
- if(_currentListPos < size)
- {
- _remainingBufs.get(_currentListPos).reset();
- }
- for(int i = _currentListPos+1; i<size; i++)
- {
- _remainingBufs.get(i).reset();
- }
+ buf.release();
+ session.removeAttribute( BUFFER );
}
}
+ private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+ private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+ {
+ ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+ remainingBuf.setAutoExpand( true );
+ remainingBuf.put( buf );
+ session.setAttribute( BUFFER, remainingBuf );
+ }
- public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
{
// get prior remaining data from accumulator
ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
- DataInputStream msg;
-
-
- ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
- if(!_remainingBufs.isEmpty())
+ ByteBuffer msg;
+ // if we have a session buffer, append data to that otherwise
+ // use the buffer read from the network directly
+ if( _remainingBuf != null )
{
- _remainingBufs.add(bais);
- msg = new DataInputStream(new RemainingByteArrayInputStream());
+ _remainingBuf.put(buf);
+ _remainingBuf.flip();
+ msg = _remainingBuf;
}
else
{
- msg = new DataInputStream(bais);
+ msg = ByteBuffer.wrap(buf);
}
-
- boolean enoughData = true;
- while (enoughData)
+
+ if (_expectProtocolInitiation
+ || (firstDecode
+ && (msg.remaining() > 0)
+ && (msg.get(msg.position()) == (byte)'A')))
{
- if(!_expectProtocolInitiation)
+ if (_piDecoder.decodable(msg.buf()))
{
- enoughData = _dataBlockDecoder.decodable(msg);
- if (enoughData)
- {
- dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
- }
+ dataBlocks.add(new ProtocolInitiation(msg.buf()));
}
- else
+ }
+ else
+ {
+ boolean enoughData = true;
+ while (enoughData)
{
- enoughData = _piDecoder.decodable(msg);
- if (enoughData)
- {
- dataBlocks.add(new ProtocolInitiation(msg));
- }
-
- }
+ int pos = msg.position();
- if(!enoughData)
- {
- if(!_remainingBufs.isEmpty())
+ enoughData = _dataBlockDecoder.decodable(msg);
+ msg.position(pos);
+ if (enoughData)
{
- _remainingBufs.remove(_remainingBufs.size()-1);
- ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
- while(iterator.hasNext() && iterator.next().available() == 0)
- {
- iterator.remove();
- }
+ dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
}
- if(bais.available()!=0)
+ else
{
- byte[] remaining = new byte[bais.available()];
- bais.read(remaining);
- _remainingBufs.add(new ByteArrayInputStream(remaining));
+ _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+ _remainingBuf.setAutoExpand(true);
+ _remainingBuf.put(msg);
}
}
}
+ if(firstDecode && dataBlocks.size() > 0)
+ {
+ firstDecode = false;
+ }
return dataBlocks;
}
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Fri Oct 21 01:19:00 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
*/
public class ClientProperties
{
-
+
/**
* Currently with Qpid it is not possible to change the client ID.
* If one is not specified upon connection construction, an id is generated automatically.
@@ -68,50 +68,67 @@ public class ClientProperties
* by the broker in TuneOK it will be used as the heartbeat interval.
* If not a warning will be printed and the max value specified for
* heartbeat in TuneOK will be used
- *
+ *
* The default idle timeout is set to 120 secs
*/
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
public static final long DEFAULT_IDLE_TIMEOUT = 120000;
-
+
public static final String HEARTBEAT = "qpid.heartbeat";
public static final int HEARTBEAT_DEFAULT = 120;
-
+
/**
* This value will be used to determine the default destination syntax type.
* Currently the two types are Binding URL (java only) and the Addressing format (used by
- * all clients).
+ * all clients).
*/
public static final String DEST_SYNTAX = "qpid.dest_syntax";
-
+
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
- public static final String AMQP_VERSION = "qpid.amqp.version";
-
- public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
+ /**
+ * ==========================================================
+ * Those properties are used when the io size should be bounded
+ * ==========================================================
+ */
/**
- * System properties to change the default timeout used during
- * synchronous operations.
+ * When set to true the io layer throttle down producers and consumers
+ * when written or read messages are accumulating and exceeding a certain size.
+ * This is especially useful when a the producer rate is greater than the network
+ * speed.
+ * type: boolean
*/
- public static final String QPID_SYNC_OP_TIMEOUT = "qpid.sync_op_timeout";
- public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
+ public static final String PROTECTIO_PROP_NAME = "protectio";
+ //=== The following properties are only used when the previous one is true.
/**
- * A default timeout value for synchronous operations
+ * Max size of read messages that can be stored within the MINA layer
+ * type: int
*/
- public static final int DEFAULT_SYNC_OPERATION_TIMEOUT = 60000;
+ public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+ public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
+ /**
+ * Max size of written messages that can be stored within the MINA layer
+ * type: int
+ */
+ public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+ public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final String AMQP_VERSION = "qpid.amqp.version";
+
+ private static ClientProperties _instance = new ClientProperties();
+
/*
- public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
+ public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
+
public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
+
+
public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
+
+
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -36,7 +34,7 @@ public interface AMQBody
*/
public abstract int getSize();
- public void writePayload(DataOutputStream buffer) throws IOException;
+ public void writePayload(ByteBuffer buffer);
- void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Fri Oct 21 01:19:00 2011
@@ -20,10 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
+import org.apache.mina.common.ByteBuffer;
/**
* A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -42,6 +39,25 @@ public abstract class AMQDataBlock imple
* Writes the datablock to the specified buffer.
* @param buffer
*/
- public abstract void writePayload(DataOutputStream buffer) throws IOException;
+ public abstract void writePayload(ByteBuffer buffer);
+
+ public ByteBuffer toByteBuffer()
+ {
+ final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
+
+ writePayload(buffer);
+ buffer.flip();
+ return buffer;
+ }
+
+ public java.nio.ByteBuffer toNioByteBuffer()
+ {
+ final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+ ByteBuffer buf = ByteBuffer.wrap(buffer);
+ writePayload(buf);
+ buffer.flip();
+ return buffer;
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Oct 21 01:19:00 2011
@@ -20,14 +20,18 @@
*/
package org.apache.qpid.framing;
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataInputStream;
-import java.io.IOException;
-
public class AMQDataBlockDecoder
{
+ private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
@@ -43,32 +47,27 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
+ public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
{
- final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
+ final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
if (remainingAfterAttributes < 0)
{
return false;
}
- in.mark(8);
- in.skip(1 + 2);
-
-
+ in.position(in.position() + 1 + 2);
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
- final long bodySize = in.readInt() & 0xffffffffL;
-
- in.reset();
+ final long bodySize = in.getInt() & 0xffffffffL;
return (remainingAfterAttributes >= bodySize);
}
- public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
- throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException
{
- final byte type = in.readByte();
+ final byte type = in.get();
BodyFactory bodyFactory;
if (type == AMQMethodBody.TYPE)
@@ -85,8 +84,8 @@ public class AMQDataBlockDecoder
throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
}
- final int channel = in.readUnsignedShort();
- final long bodySize = EncodingUtils.readUnsignedInteger(in);
+ final int channel = in.getUnsignedShort();
+ final long bodySize = in.getUnsignedInt();
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
@@ -97,7 +96,7 @@ public class AMQDataBlockDecoder
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
- byte marker = in.readByte();
+ byte marker = in.get();
if ((marker & 0xFF) != 0xCE)
{
throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
@@ -107,4 +106,26 @@ public class AMQDataBlockDecoder
return frame;
}
+ public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+ {
+ AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+ if (bodyFactory == null)
+ {
+ AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+ bodyFactory = new AMQMethodBodyFactory(protocolSession);
+ session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+ }
+
+ out.write(createAndPopulateFrame(bodyFactory, in));
+ }
+
+ public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+ {
+ return decodable(msg.buf());
+ }
+
+ public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+ {
+ return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -38,7 +36,7 @@ public class AMQFrame extends AMQDataBlo
_bodyFrame = bodyFrame;
}
- public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
+ public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
{
this._channel = channel;
this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -55,13 +53,13 @@ public class AMQFrame extends AMQDataBlo
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(ByteBuffer buffer)
{
- buffer.writeByte(_bodyFrame.getFrameType());
+ buffer.put(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
_bodyFrame.writePayload(buffer);
- buffer.writeByte(FRAME_END_BYTE);
+ buffer.put(FRAME_END_BYTE);
}
public final int getChannel()
@@ -79,48 +77,48 @@ public class AMQFrame extends AMQDataBlo
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
- public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
+ public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
{
- buffer.writeByte(body.getFrameType());
+ buffer.put(body.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
body.writePayload(buffer);
- buffer.writeByte(FRAME_END_BYTE);
+ buffer.put(FRAME_END_BYTE);
}
- public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
{
- buffer.writeByte(body1.getFrameType());
+ buffer.put(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
body1.writePayload(buffer);
- buffer.writeByte(FRAME_END_BYTE);
- buffer.writeByte(body2.getFrameType());
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
body2.writePayload(buffer);
- buffer.writeByte(FRAME_END_BYTE);
+ buffer.put(FRAME_END_BYTE);
}
- public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
+ public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
{
- buffer.writeByte(body1.getFrameType());
+ buffer.put(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
body1.writePayload(buffer);
- buffer.writeByte(FRAME_END_BYTE);
- buffer.writeByte(body2.getFrameType());
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body2.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
body2.writePayload(buffer);
- buffer.writeByte(FRAME_END_BYTE);
- buffer.writeByte(body3.getFrameType());
+ buffer.put(FRAME_END_BYTE);
+ buffer.put(body3.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
body3.writePayload(buffer);
- buffer.writeByte(FRAME_END_BYTE);
+ buffer.put(FRAME_END_BYTE);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Oct 21 01:19:00 2011
@@ -20,14 +20,12 @@
*/
package org.apache.qpid.framing;
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
public interface AMQMethodBody extends AMQBody
{
public static final byte TYPE = 1;
@@ -45,12 +43,12 @@ public interface AMQMethodBody extends A
/** @return unsigned short */
public int getMethod();
- public void writeMethodPayload(DataOutputStream buffer) throws IOException;
+ public void writeMethodPayload(ByteBuffer buffer);
public int getSize();
- public void writePayload(DataOutputStream buffer) throws IOException;
+ public void writePayload(ByteBuffer buffer);
//public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Fri Oct 21 01:19:00 2011
@@ -20,14 +20,13 @@
*/
package org.apache.qpid.framing;
+import org.apache.mina.common.ByteBuffer;
+
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataInputStream;
-import java.io.IOException;
-
public class AMQMethodBodyFactory implements BodyFactory
{
private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
@@ -39,7 +38,7 @@ public class AMQMethodBodyFactory implem
_protocolSession = protocolSession;
}
- public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+ public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
{
return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Oct 21 01:19:00 2011
@@ -21,16 +21,13 @@ package org.apache.qpid.framing;
*
*/
+import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
public static final byte TYPE = 1;
@@ -101,7 +98,7 @@ public abstract class AMQMethodBodyImpl
return 2 + 2 + getBodySize();
}
- public void writePayload(DataOutputStream buffer) throws IOException
+ public void writePayload(ByteBuffer buffer)
{
EncodingUtils.writeUnsignedShort(buffer, getClazz());
EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -109,12 +106,12 @@ public abstract class AMQMethodBodyImpl
}
- protected byte readByte(DataInputStream buffer) throws IOException
+ protected byte readByte(ByteBuffer buffer)
{
- return buffer.readByte();
+ return buffer.get();
}
- protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+ protected AMQShortString readAMQShortString(ByteBuffer buffer)
{
return EncodingUtils.readAMQShortString(buffer);
}
@@ -124,27 +121,27 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedShortStringLength(string);
}
- protected void writeByte(DataOutputStream buffer, byte b) throws IOException
+ protected void writeByte(ByteBuffer buffer, byte b)
{
- buffer.writeByte(b);
+ buffer.put(b);
}
- protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
{
EncodingUtils.writeShortStringBytes(buffer, string);
}
- protected int readInt(DataInputStream buffer) throws IOException
+ protected int readInt(ByteBuffer buffer)
{
- return buffer.readInt();
+ return buffer.getInt();
}
- protected void writeInt(DataOutputStream buffer, int i) throws IOException
+ protected void writeInt(ByteBuffer buffer, int i)
{
- buffer.writeInt(i);
+ buffer.putInt(i);
}
- protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+ protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
{
return EncodingUtils.readFieldTable(buffer);
}
@@ -154,19 +151,19 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
{
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected long readLong(DataInputStream buffer) throws IOException
+ protected long readLong(ByteBuffer buffer)
{
- return buffer.readLong();
+ return buffer.getLong();
}
- protected void writeLong(DataOutputStream buffer, long l) throws IOException
+ protected void writeLong(ByteBuffer buffer, long l)
{
- buffer.writeLong(l);
+ buffer.putLong(l);
}
protected int getSizeOf(byte[] response)
@@ -174,86 +171,87 @@ public abstract class AMQMethodBodyImpl
return (response == null) ? 4 : response.length + 4;
}
- protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
{
EncodingUtils.writeBytes(buffer,data);
}
- protected byte[] readBytes(DataInputStream buffer) throws IOException
+ protected byte[] readBytes(ByteBuffer buffer)
{
return EncodingUtils.readBytes(buffer);
}
- protected short readShort(DataInputStream buffer) throws IOException
+ protected short readShort(ByteBuffer buffer)
{
return EncodingUtils.readShort(buffer);
}
- protected void writeShort(DataOutputStream buffer, short s) throws IOException
+ protected void writeShort(ByteBuffer buffer, short s)
{
EncodingUtils.writeShort(buffer, s);
}
- protected Content readContent(DataInputStream buffer)
+ protected Content readContent(ByteBuffer buffer)
{
- return null;
+ return null; //To change body of created methods use File | Settings | File Templates.
}
protected int getSizeOf(Content body)
{
- return 0;
+ return 0; //To change body of created methods use File | Settings | File Templates.
}
- protected void writeContent(DataOutputStream buffer, Content body)
+ protected void writeContent(ByteBuffer buffer, Content body)
{
+ //To change body of created methods use File | Settings | File Templates.
}
- protected byte readBitfield(DataInputStream buffer) throws IOException
+ protected byte readBitfield(ByteBuffer buffer)
{
- return readByte(buffer);
+ return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
}
- protected int readUnsignedShort(DataInputStream buffer) throws IOException
+ protected int readUnsignedShort(ByteBuffer buffer)
{
- return buffer.readUnsignedShort();
+ return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
{
- buffer.writeByte(bitfield0);
+ buffer.put(bitfield0);
}
- protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
{
EncodingUtils.writeUnsignedShort(buffer, s);
}
- protected long readUnsignedInteger(DataInputStream buffer) throws IOException
+ protected long readUnsignedInteger(ByteBuffer buffer)
{
- return EncodingUtils.readUnsignedInteger(buffer);
+ return buffer.getUnsignedInt();
}
- protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
{
EncodingUtils.writeUnsignedInteger(buffer, i);
}
- protected short readUnsignedByte(DataInputStream buffer) throws IOException
+ protected short readUnsignedByte(ByteBuffer buffer)
{
- return (short) buffer.readUnsignedByte();
+ return buffer.getUnsigned();
}
- protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
{
EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
}
- protected long readTimestamp(DataInputStream buffer) throws IOException
+ protected long readTimestamp(ByteBuffer buffer)
{
return EncodingUtils.readTimestamp(buffer);
}
- protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
+ protected void writeTimestamp(ByteBuffer buffer, long t)
{
EncodingUtils.writeTimestamp(buffer, t);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Fri Oct 21 01:19:00 2011
@@ -21,11 +21,10 @@
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
public abstract interface AMQMethodBodyInstanceFactory
{
- public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
+ public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Fri Oct 21 01:19:00 2011
@@ -21,12 +21,11 @@
package org.apache.qpid.framing;
+import org.apache.mina.common.ByteBuffer;
+
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
import java.util.*;
import java.lang.ref.WeakReference;
@@ -38,10 +37,6 @@ import java.lang.ref.WeakReference;
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
- /**
- * The maximum number of octets in AMQ short string as defined in AMQP specification
- */
- public static final int MAX_LENGTH = 255;
private static final byte MINUS = (byte)'-';
private static final byte ZERO = (byte) '0';
@@ -123,19 +118,22 @@ public final class AMQShortString implem
public AMQShortString(byte[] data)
{
- if (data == null)
- {
- throw new NullPointerException("Cannot create AMQShortString with null data[]");
- }
- if (data.length > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
+
_data = data.clone();
_length = data.length;
_offset = 0;
}
+ public AMQShortString(byte[] data, int pos)
+ {
+ final int size = data[pos++];
+ final byte[] dataCopy = new byte[size];
+ System.arraycopy(data,pos,dataCopy,0,size);
+ _length = size;
+ _data = dataCopy;
+ _offset = 0;
+ }
+
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
@@ -148,12 +146,7 @@ public final class AMQShortString implem
{
throw new NullPointerException("Cannot create AMQShortString with null char[]");
}
- // the current implementation of 0.8/0.9.x short string encoding
- // supports only ASCII characters
- if (data.length> MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
+
final int length = data.length;
final byte[] stringBytes = new byte[length];
int hash = 0;
@@ -172,17 +165,6 @@ public final class AMQShortString implem
public AMQShortString(CharSequence charSequence)
{
- if (charSequence == null)
- {
- // it should be possible to create short string for null data
- charSequence = "";
- }
- // the current implementation of 0.8/0.9.x short string encoding
- // supports only ASCII characters
- if (charSequence.length() > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
final int length = charSequence.length();
final byte[] stringBytes = new byte[length];
int hash = 0;
@@ -200,33 +182,31 @@ public final class AMQShortString implem
}
- private AMQShortString(DataInputStream data, final int length) throws IOException
+ private AMQShortString(ByteBuffer data, final int length)
{
- if (length > MAX_LENGTH)
+ if(data.isDirect() || data.isReadOnly())
{
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+ byte[] dataBytes = new byte[length];
+ data.get(dataBytes);
+ _data = dataBytes;
+ _offset = 0;
+ }
+ else
+ {
+
+ _data = data.array();
+ _offset = data.arrayOffset() + data.position();
+ data.skip(length);
+
}
- byte[] dataBytes = new byte[length];
- data.read(dataBytes);
- _data = dataBytes;
- _offset = 0;
_length = length;
}
private AMQShortString(final byte[] data, final int from, final int to)
{
- if (data == null)
- {
- throw new NullPointerException("Cannot create AMQShortString with null data[]");
- }
- int length = to - from;
- if (length > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
_offset = from;
- _length = length;
+ _length = to - from;
_data = data;
}
@@ -265,9 +245,32 @@ public final class AMQShortString implem
return new CharSubSequence(start, end);
}
- public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
+ public int writeToByteArray(byte[] encoding, int pos)
+ {
+ final int size = length();
+ encoding[pos++] = (byte) size;
+ System.arraycopy(_data,_offset,encoding,pos,size);
+ return pos+size;
+ }
+
+ public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
+ {
+
+
+ final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+ if(shortString.length() == 0)
+ {
+ return null;
+ }
+ else
+ {
+ return shortString;
+ }
+ }
+
+ public static AMQShortString readFromBuffer(ByteBuffer buffer)
{
- final int length = buffer.readUnsignedByte();
+ final short length = buffer.getUnsigned();
if (length == 0)
{
return null;
@@ -293,13 +296,13 @@ public final class AMQShortString implem
}
}
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+ public void writeToBuffer(ByteBuffer buffer)
{
final int size = length();
//buffer.setAutoExpand(true);
- buffer.write((byte) size);
- buffer.write(_data, _offset, size);
+ buffer.put((byte) size);
+ buffer.put(_data, _offset, size);
}
@@ -687,10 +690,6 @@ public final class AMQShortString implem
size += term.length();
}
- if (size > MAX_LENGTH)
- {
- throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
- }
byte[] data = new byte[size];
int pos = 0;
final byte[] delimData = delim._data;
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
+
import java.math.BigDecimal;
/**
@@ -61,12 +60,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -107,12 +106,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readUnsignedInteger(buffer);
}
@@ -138,7 +137,7 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
BigDecimal bd = (BigDecimal) value;
@@ -151,7 +150,7 @@ public enum AMQType
EncodingUtils.writeInteger(buffer, unscaled);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
byte places = EncodingUtils.readByte(buffer);
@@ -183,12 +182,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLong(buffer);
}
@@ -247,7 +246,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
// Ensure that the value is a FieldTable.
if (!(value instanceof FieldTable))
@@ -268,7 +267,7 @@ public enum AMQType
*
* @return An instance of the type.
*/
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
try
{
@@ -302,10 +301,10 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer)
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{ }
- public Object readValueFromBuffer(DataInputStream buffer)
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return null;
}
@@ -331,12 +330,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongstr(buffer, (byte[]) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongstr(buffer);
}
@@ -361,12 +360,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -392,12 +391,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLongStringBytes(buffer, (String) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLongString(buffer);
}
@@ -427,12 +426,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeBoolean(buffer, (Boolean) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readBoolean(buffer);
}
@@ -462,12 +461,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeChar(buffer, (Character) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readChar(buffer);
}
@@ -497,12 +496,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeByte(buffer, (Byte) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readByte(buffer);
}
@@ -536,12 +535,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeShort(buffer, (Short) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readShort(buffer);
}
@@ -578,12 +577,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeInteger(buffer, (Integer) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readInteger(buffer);
}
@@ -625,12 +624,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeLong(buffer, (Long) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readLong(buffer);
}
@@ -660,12 +659,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeFloat(buffer, (Float) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readFloat(buffer);
}
@@ -699,12 +698,12 @@ public enum AMQType
}
}
- public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+ public void writeValueImpl(Object value, ByteBuffer buffer)
{
EncodingUtils.writeDouble(buffer, (Double) value);
}
- public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+ public Object readValueFromBuffer(ByteBuffer buffer)
{
return EncodingUtils.readDouble(buffer);
}
@@ -771,9 +770,9 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
+ public void writeToBuffer(Object value, ByteBuffer buffer)
{
- buffer.writeByte(identifier());
+ buffer.put(identifier());
writeValueImpl(value, buffer);
}
@@ -783,7 +782,7 @@ public enum AMQType
* @param value An instance of the type.
* @param buffer The byte buffer to write it to.
*/
- abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
+ abstract void writeValueImpl(Object value, ByteBuffer buffer);
/**
* Reads an instance of the type from a specified byte buffer.
@@ -792,5 +791,5 @@ public enum AMQType
*
* @return An instance of the type.
*/
- abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
+ abstract Object readValueFromBuffer(ByteBuffer buffer);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,8 @@
*/
package org.apache.qpid.framing;
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
+
import java.util.Date;
import java.util.Map;
import java.math.BigDecimal;
@@ -61,7 +60,7 @@ public class AMQTypedValue
_value = type.toNativeValue(value);
}
- private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
+ private AMQTypedValue(AMQType type, ByteBuffer buffer)
{
_type = type;
_value = type.readValueFromBuffer(buffer);
@@ -77,7 +76,7 @@ public class AMQTypedValue
return _value;
}
- public void writeToBuffer(DataOutputStream buffer) throws IOException
+ public void writeToBuffer(ByteBuffer buffer)
{
_type.writeToBuffer(_value, buffer);
}
@@ -87,9 +86,9 @@ public class AMQTypedValue
return _type.getEncodingSize(_value);
}
- public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
+ public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
{
- AMQType type = AMQTypeMap.getType(buffer.readByte());
+ AMQType type = AMQTypeMap.getType(buffer.get());
return new AMQTypedValue(type, buffer);
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org