You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/21 03:20:13 UTC

svn commit: r1187150 [33/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/ cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/ cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/ cpp/bindings/qmf2/python/ cpp/bindings/qmf...

Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Fri Oct 21 01:19:00 2011
@@ -300,6 +300,53 @@ public class ConnectionURLTest extends T
         assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
     }
 
+    public void testSinglevmURL() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@/test?brokerlist='vm://:2'";
+
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        assertTrue(connectionurl.getFailoverMethod() == null);
+        assertTrue(connectionurl.getUsername().equals("guest"));
+        assertTrue(connectionurl.getPassword().equals("guest"));
+        assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+        assertTrue(connectionurl.getBrokerCount() == 1);
+
+        BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+        assertTrue(service.getTransport().equals("vm"));
+        assertTrue(service.getHost().equals(""));
+        assertTrue(service.getPort() == 2);
+
+    }
+
+    public void testFailoverVMURL() throws URLSyntaxException
+    {
+        String url = "amqp://ritchiem:bob@/test?brokerlist='vm://:2;vm://:3',failover='roundrobin'";
+
+        ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+        assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
+        assertTrue(connectionurl.getUsername().equals("ritchiem"));
+        assertTrue(connectionurl.getPassword().equals("bob"));
+        assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+        assertTrue(connectionurl.getBrokerCount() == 2);
+
+        BrokerDetails service = connectionurl.getBrokerDetails(0);
+
+        assertTrue(service.getTransport().equals("vm"));
+        assertTrue(service.getHost().equals(""));
+        assertTrue(service.getPort() == 2);
+
+        service = connectionurl.getBrokerDetails(1);
+        assertTrue(service.getTransport().equals("vm"));
+        assertTrue(service.getHost().equals(""));
+        assertTrue(service.getPort() == 3);
+    }
+
+
     public void testNoVirtualHostURL()
     {
         String url = "amqp://user@?brokerlist='tcp://localhost:5672'";
@@ -440,6 +487,27 @@ public class ConnectionURLTest extends T
 
     }
 
+    public void testSocketProtocol() throws URLSyntaxException
+    {
+        String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'";
+
+        try
+        {
+            AMQConnectionURL curl = new AMQConnectionURL(url);
+            assertNotNull(curl);
+            assertEquals(1, curl.getBrokerCount());
+            assertNotNull(curl.getBrokerDetails(0));
+            assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport());
+            assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost());
+            assertEquals("URL does not toString as expected",
+                         url.replace(":guest", ":********"), curl.toString());
+        }
+        catch (URLSyntaxException e)
+        {
+            fail(e.getMessage());
+        }
+    }
+
     public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException
     {
         String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
@@ -481,37 +549,6 @@ public class ConnectionURLTest extends T
         assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'"));
     }
 
-    public void testHostNamesWithUnderScore() throws URLSyntaxException
-    {
-        String url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score:6672'";
-
-        ConnectionURL connectionurl = new AMQConnectionURL(url);
-
-        assertTrue(connectionurl.getUsername().equals("guest"));
-        assertTrue(connectionurl.getPassword().equals("guest"));
-        assertTrue(connectionurl.getVirtualHost().equals("/test"));
-
-        assertTrue(connectionurl.getBrokerCount() == 1);
-        BrokerDetails service = connectionurl.getBrokerDetails(0);
-        assertTrue(service.getTransport().equals("tcp"));        
-        assertTrue(service.getHost().equals("under_score"));
-        assertTrue(service.getPort() == 6672);
-        
-        url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'";
-
-        connectionurl = new AMQConnectionURL(url);
-
-        assertTrue(connectionurl.getUsername().equals("guest"));
-        assertTrue(connectionurl.getPassword().equals("guest"));
-        assertTrue(connectionurl.getVirtualHost().equals("/test"));
-
-        assertTrue(connectionurl.getBrokerCount() == 1);
-        service = connectionurl.getBrokerDetails(0);
-        assertTrue(service.getTransport().equals("tcp"));        
-        assertTrue(service.getHost().equals("under_score"));
-        assertTrue(service.getPort() == 5672);
-    }
-    
     public static junit.framework.Test suite()
     {
         return new junit.framework.TestSuite(ConnectionURLTest.class);

Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Fri Oct 21 01:19:00 2011
@@ -21,10 +21,10 @@
 package org.apache.qpid.test.unit.jndi;
 
 import junit.framework.TestCase;
-
 import org.apache.qpid.client.AMQConnectionFactory;
 import org.apache.qpid.jms.BrokerDetails;
 import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.url.URLSyntaxException;
 
 public class ConnectionFactoryTest extends TestCase
 {
@@ -34,9 +34,21 @@ public class ConnectionFactoryTest exten
     public static final String URL = "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
     public static final String URL_STAR_PWD = "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
 
-    public void testConnectionURLStringMasksPassword() throws Exception
+    public void testConnectionURLString()
     {
-        AMQConnectionFactory factory = new AMQConnectionFactory(URL);
+        AMQConnectionFactory factory = new AMQConnectionFactory();
+
+        assertNull("ConnectionURL should have no value at start",
+                   factory.getConnectionURL());
+
+        try
+        {
+            factory.setConnectionURLString(URL);
+        }
+        catch (URLSyntaxException e)
+        {
+            fail(e.getMessage());
+        }
 
         //URL will be returned with the password field swapped for '********'
         assertEquals("Connection URL not correctly set", URL_STAR_PWD, factory.getConnectionURLString());

Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Fri Oct 21 01:19:00 2011
@@ -24,7 +24,6 @@ import java.util.Properties;
 
 import javax.jms.Queue;
 import javax.jms.Topic;
-import javax.naming.ConfigurationException;
 import javax.naming.Context;
 import javax.naming.InitialContext;
 
@@ -68,22 +67,4 @@ public class JNDIPropertyFileTest extend
             assertEquals("Topic" + i + "WithSpace",bindingKey.asString());            
         }
     }
-    
-    public void testConfigurationErrors() throws Exception
-    {
-        Properties properties = new Properties();
-        properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
-        properties.put("destination.my-queue","amq.topic/test;create:always}");
-        
-        try
-        {
-            ctx = new InitialContext(properties);
-            fail("A configuration exception should be thrown with details about the address syntax error");
-        }
-        catch(ConfigurationException e)
-        {
-            assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
-        }
-        
-    }
 }

Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Fri Oct 21 01:19:00 2011
@@ -20,24 +20,17 @@
  */
 package org.apache.qpid.test.unit.message;
 
-import java.util.Map;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.TemporaryQueue;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-import org.apache.qpid.AMQException;
-import org.apache.qpid.client.AMQDestination;
-import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.client.BasicMessageConsumer_0_8;
-import org.apache.qpid.client.BasicMessageProducer_0_8;
-import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.*;
 import org.apache.qpid.client.message.AMQMessageDelegateFactory;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.AMQException;
+
+import javax.jms.*;
+
+import java.util.Map;
 
 public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
 {
@@ -64,12 +57,7 @@ public class TestAMQSession extends AMQS
 
     }
 
-    public void commitImpl() throws AMQException, FailoverException
-    {
-
-    }
-
-    public void acknowledgeImpl()
+    public void sendCommit() throws AMQException, FailoverException
     {
 
     }
@@ -129,7 +117,7 @@ public class TestAMQSession extends AMQS
 
     }
 
-    public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, long producerId)
+    public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
     {
         return null;
     }
@@ -207,10 +195,4 @@ public class TestAMQSession extends AMQS
     {
         return false;
     }
-
-    @Override
-    public AMQException getLastException()
-    {
-        return null;
-    }
 }

Modified: qpid/branches/QPID-2519/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common.xml?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common.xml (original)
+++ qpid/branches/QPID-2519/java/common.xml Fri Oct 21 01:19:00 2011
@@ -23,9 +23,7 @@
   <dirname property="project.root" file="${ant.file.common}"/>
 
   <property name="project.name"          value="qpid"/>
-  <property name="project.version"       value="0.13"/>
-  <property name="project.url"           value="http://qpid.apache.org"/>
-  <property name="project.groupid"       value="org.apache.qpid"/>
+  <property name="project.version"       value="0.9"/>
   <property name="project.namever"       value="${project.name}-${project.version}"/>
 
   <property name="resources"             location="${project.root}/resources"/>
@@ -42,6 +40,7 @@
   <property name="build.report"          location="${build}/report"/>
   <property name="build.release"         location="${build}/release"/>
   <property name="build.release.prepare" location="${build.release}/prepare"/>
+  <property name="build.data"            location="${build.scratch}/data"/>
   <property name="build.plugins"         location="${build}/lib/plugins"/>
   <property name="build.coveragereport"  location="${build}/coverage"/>
   <property name="build.findbugs"        location="${build}/findbugs"/>
@@ -64,11 +63,6 @@
   <property name="mllib.dir" value="${project.root}/../python" />
   <property name="findbugs.dir" value="${project.root}/lib/findbugs" />
 
-  <!-- properties used to control Ant Eclipse for Eclipse classpath/project files etc -->
-  <property name="eclipse.updatealways" value="false"/>
-  <property name="eclipse.compilercompliance" value="5.0"/>
-  <property name="eclipse.container" value="JVM 1.5"/>
-
   <path id="cobertura.classpath">
     <fileset dir="${cobertura.dir}">
         <include name="cobertura.jar" />
@@ -77,7 +71,6 @@
   </path>
 
   <property name="maven.local.repo"      value="${build.scratch}/maven-local-repo"/>
-  <property name="maven.settings.xml"    value="${project.root}/maven-settings.xml"/>
   <property name="maven.unique.version"  value="false"/>
   <property name="maven.snapshot"        value="true"/>
   <condition property="maven.version.suffix" value="" else="-SNAPSHOT">
@@ -131,6 +124,8 @@
  	</sequential>
   </macrodef>
 
+
+
   <macrodef name="jython">
     <attribute name="path"/>
     <element name="args"/>
@@ -327,20 +322,6 @@
     results directory:
 
       ${build.results}
-
-  ant eclipse
-
-    Generates project and classpath files for the Eclispe IDE.  Requires that
-    the Ant Eclipse task (http://ant-eclipse.sourceforge.net/) has been installed
-    in $ANT_HOME/lib.
-
-    The following system properties will be passed to the task. These can be usefully
-    overridden from the command line.
-
-    eclipse.updatealways - forces Eclipse files to be regenerated even if they are newer then the build.xml (default ${eclipse.updatealways}).
-    eclipse.container - controls the Eclipse container (default ${eclipse.container}).
-    eclipse.compilercompliance" - controls the Eclipse compiler compliance (default ${eclipse.compilercompliance}).
-
     </echo>
   </target>
 

Modified: qpid/branches/QPID-2519/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/bin/qpid-run?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/bin/qpid-run (original)
+++ qpid/branches/QPID-2519/java/common/bin/qpid-run Fri Oct 21 01:19:00 2011
@@ -77,10 +77,7 @@ fi
 
 #Set the default system properties that we'll use now that they have
 #all been initialised
-declare -a SYSTEM_PROPS
-SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL"
-SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPID_HOME=$QPID_HOME"
-SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPID_WORK=$QPID_WORK"
+SYSTEM_PROPS="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK"
 
 #If logprefix or logsuffix set to use PID make that happen
 #Otherwise just pass the value through for these props
@@ -93,7 +90,7 @@ if [ -n "$QPID_LOG_PREFIX" ]; then
         log $INFO Using qpid logprefix property
         LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
     fi
-    SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_PREFIX}"
+    SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_PREFIX}"
 fi
 
 if [ -n "$QPID_LOG_SUFFIX" ]; then
@@ -104,10 +101,10 @@ if [ -n "$QPID_LOG_SUFFIX" ]; then
         log $INFO Using qpig logsuffix property
         LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
     fi
-    SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_SUFFIX}"
+    SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_SUFFIX}"
 fi
 
-log $INFO System Properties set to ${SYSTEM_PROPS[@]}
+log $INFO System Properties set to $SYSTEM_PROPS
 log $INFO QPID_OPTS set to $QPID_OPTS
 
 program=$(basename $0)
@@ -257,6 +254,6 @@ if $cygwin; then
   JAVA=$(cygpath -u $JAVA)
 fi
 
-COMMAND=($JAVA $JAVA_VM $QPID_PNAME $JAVA_GC $JAVA_MEM "${SYSTEM_PROPS[@]}" $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
+COMMAND=($JAVA $JAVA_VM $QPID_PNAME $JAVA_GC $JAVA_MEM $SYSTEM_PROPS $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
 
 DISPATCH

Modified: qpid/branches/QPID-2519/java/common/src/main/java/common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/common.bnd?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/common.bnd (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/common.bnd Fri Oct 21 01:19:00 2011
@@ -17,7 +17,7 @@
 # under the License.
 #
 
-ver: 0.13.0
+ver: 0.9.0
 
 Bundle-SymbolicName: qpid-common
 Bundle-Version: ${ver}

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java Fri Oct 21 01:19:00 2011
@@ -54,7 +54,6 @@ public class AMQChannelException extends
     public AMQFrame getCloseFrame(int channel)
     {
         MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
-        return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), getMessageAsShortString(),_classId,_methodId));
+        return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
     }
-
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java Fri Oct 21 01:19:00 2011
@@ -62,10 +62,9 @@ public class AMQConnectionException exte
         MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
         return new AMQFrame(0,
                             reg.createConnectionCloseBody(getErrorCode().getCode(),
-                                                          getMessageAsShortString(),
+                                                          new AMQShortString(getMessage()),
                                                           _classId,
                                                           _methodId));
 
     }
-
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java Fri Oct 21 01:19:00 2011
@@ -20,7 +20,6 @@
  */
 package org.apache.qpid;
 
-import org.apache.qpid.framing.AMQShortString;
 import org.apache.qpid.protocol.AMQConstant;
 
 /**
@@ -122,19 +121,4 @@ public class AMQException extends Except
 
         return newAMQE;
     }
-
-    /**
-     * Truncates the exception message to 255 characters if its length exceeds 255.
-     *
-     * @return exception message
-     */
-    public AMQShortString getMessageAsShortString()
-    {
-        String message = getMessage();
-        if (message != null && message.length() > AMQShortString.MAX_LENGTH)
-        {
-            message = message.substring(0, AMQShortString.MAX_LENGTH - 3) + "...";
-        }
-        return new AMQShortString(message);
-    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java Fri Oct 21 01:19:00 2011
@@ -34,7 +34,7 @@ public class AMQInvalidArgumentException
 {
     public AMQInvalidArgumentException(String message, Throwable cause)
     {
-        super(AMQConstant.ARGUMENT_INVALID, message, cause);
+        super(AMQConstant.INVALID_ARGUMENT, message, cause);
     }
 
     public boolean isHardError()

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Fri Oct 21 01:19:00 2011
@@ -20,6 +20,9 @@
  */
 package org.apache.qpid.codec;
 
+import org.apache.mina.filter.codec.ProtocolCodecFactory;
+import org.apache.mina.filter.codec.ProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolEncoder;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 /**
@@ -28,11 +31,14 @@ import org.apache.qpid.protocol.AMQVersi
  *
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations.
+ * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
  * <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
  * </table>
  */
-public class AMQCodecFactory
+public class AMQCodecFactory implements ProtocolCodecFactory
 {
+    /** Holds the protocol encoder. */
+    private final AMQEncoder _encoder = new AMQEncoder();
 
     /** Holds the protocol decoder. */
     private final AMQDecoder _frameDecoder;
@@ -50,6 +56,15 @@ public class AMQCodecFactory
         _frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
     }
 
+    /**
+     * Gets the AMQP encoder.
+     *
+     * @return The AMQP encoder.
+     */
+    public ProtocolEncoder getEncoder()
+    {
+        return _encoder;
+    }
 
     /**
      * Gets the AMQP decoder.

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,13 @@
  */
 package org.apache.qpid.codec;
 
-import java.io.*;
-import java.nio.ByteBuffer;
-import java.util.*;
+import java.util.ArrayList;
+
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.common.SimpleByteBufferAllocator;
+import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
 
 import org.apache.qpid.framing.AMQDataBlock;
 import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -50,8 +54,11 @@ import org.apache.qpid.protocol.AMQVersi
  * @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
  *       per-session overhead.
  */
-public class AMQDecoder
+public class AMQDecoder extends CumulativeProtocolDecoder
 {
+
+    private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
+
     /** Holds the 'normal' AMQP data decoder. */
     private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
 
@@ -60,11 +67,12 @@ public class AMQDecoder
 
     /** Flag to indicate whether this decoder needs to handle protocol initiation. */
     private boolean _expectProtocolInitiation;
+    private boolean firstDecode = true;
 
     private AMQMethodBodyFactory _bodyFactory;
 
-    private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
-
+    private ByteBuffer _remainingBuf;
+    
     /**
      * Creates a new AMQP decoder.
      *
@@ -76,7 +84,98 @@ public class AMQDecoder
         _bodyFactory = new AMQMethodBodyFactory(session);
     }
 
+    /**
+     * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
+     * intiation decoders.
+     *
+     * @param session The Mina session.
+     * @param in      The raw byte buffer.
+     * @param out     The Mina object output gatherer to write decoded objects to.
+     *
+     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+     *
+     * @throws Exception If the data cannot be decoded for any reason.
+     */
+    protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+    {
+
+        boolean decoded;
+        if (_expectProtocolInitiation  
+            || (firstDecode
+                && (in.remaining() > 0)
+                && (in.get(in.position()) == (byte)'A')))
+        {
+            decoded = doDecodePI(session, in, out);
+        }
+        else
+        {
+            decoded = doDecodeDataBlock(session, in, out);
+        }
+        if(firstDecode && decoded)
+        {
+            firstDecode = false;
+        }
+        return decoded;
+    }
+
+    /**
+     * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
+     *
+     * @param session The Mina session.
+     * @param in      The raw byte buffer.
+     * @param out     The Mina object output gatherer to write decoded objects to.
+     *
+     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+     *
+     * @throws Exception If the data cannot be decoded for any reason.
+     */
+    protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+    {
+        int pos = in.position();
+        boolean enoughData = _dataBlockDecoder.decodable(in.buf());
+        in.position(pos);
+        if (!enoughData)
+        {
+            // returning false means it will leave the contents in the buffer and
+            // call us again when more data has been read
+            return false;
+        }
+        else
+        {
+            _dataBlockDecoder.decode(session, in, out);
+
+            return true;
+        }
+    }
+
+    /**
+     * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
+     *
+     * @param session The Mina session.
+     * @param in      The raw byte buffer.
+     * @param out     The Mina object output gatherer to write decoded objects to.
+     *
+     * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
+     *
+     * @throws Exception If the data cannot be decoded for any reason.
+     */
+    private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+    {
+        boolean enoughData = _piDecoder.decodable(in.buf());
+        if (!enoughData)
+        {
+            // returning false means it will leave the contents in the buffer and
+            // call us again when more data has been read
+            return false;
+        }
+        else
+        {
+            ProtocolInitiation pi = new ProtocolInitiation(in.buf());
+            out.write(pi);
 
+            return true;
+        }
+    }
 
     /**
      * Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
@@ -90,169 +189,152 @@ public class AMQDecoder
         _expectProtocolInitiation = expectProtocolInitiation;
     }
 
-    private class RemainingByteArrayInputStream extends InputStream
-    {
-        private int _currentListPos;
-        private int _markPos;
-
 
-        @Override
-        public int read() throws IOException
+    /**
+     * Cumulates content of <tt>in</tt> into internal buffer and forwards
+     * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
+     * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
+     * and the cumulative buffer is compacted after decoding ends.
+     *
+     * @throws IllegalStateException if your <tt>doDecode()</tt> returned
+     *                               <tt>true</tt> not consuming the cumulative buffer.
+     */
+    public void decode( IoSession session, ByteBuffer in,
+                        ProtocolDecoderOutput out ) throws Exception
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        // if we have a session buffer, append data to that otherwise
+        // use the buffer read from the network directly
+        if( buf != null )
         {
-            ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
-            if(currentStream.available() > 0)
-            {
-                return currentStream.read();
-            }
-            else if((_currentListPos == _remainingBufs.size())
-                    || (++_currentListPos == _remainingBufs.size()))
-            {
-                return -1;
-            }
-            else
-            {
-
-                ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
-                stream.mark(0);
-                return stream.read();
-            }
+            buf.put( in );
+            buf.flip();
         }
-
-        @Override
-        public int read(final byte[] b, final int off, final int len) throws IOException
+        else
         {
+            buf = in;
+        }
 
-            if(_currentListPos == _remainingBufs.size())
-            {
-                return -1;
-            }
-            else
+        for( ;; )
+        {
+            int oldPos = buf.position();
+            boolean decoded = doDecode( session, buf, out );
+            if( decoded )
             {
-                ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
-                final int available = currentStream.available();
-                int read = currentStream.read(b, off, len > available ? available : len);
-                if(read < len)
+                if( buf.position() == oldPos )
                 {
-                    if(_currentListPos++ != _remainingBufs.size())
-                    {
-                        _remainingBufs.get(_currentListPos).mark(0);
-                    }
-                    int correctRead = read == -1 ? 0 : read;
-                    int subRead = read(b, off+correctRead, len-correctRead);
-                    if(subRead == -1)
-                    {
-                        return read;
-                    }
-                    else
-                    {
-                        return correctRead+subRead;
-                    }
+                    throw new IllegalStateException(
+                            "doDecode() can't return true when buffer is not consumed." );
                 }
-                else
+
+                if( !buf.hasRemaining() )
                 {
-                    return len;
+                    break;
                 }
             }
-        }
-
-        @Override
-        public int available() throws IOException
-        {
-            int total = 0;
-            for(int i = _currentListPos; i < _remainingBufs.size(); i++)
+            else
             {
-                total += _remainingBufs.get(i).available();
+                break;
             }
-            return total;
         }
 
-        @Override
-        public void mark(final int readlimit)
+        // if there is any data left that cannot be decoded, we store
+        // it in a buffer in the session and next time this decoder is
+        // invoked the session buffer gets appended to
+        if ( buf.hasRemaining() )
         {
-            _markPos = _currentListPos;
-            final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
-            if(stream != null)
-            {
-                stream.mark(readlimit);
-            }
+            storeRemainingInSession( buf, session );
         }
+        else
+        {
+            removeSessionBuffer( session );
+        }
+    }
+
+    /**
+     * Releases the cumulative buffer used by the specified <tt>session</tt>.
+     * Please don't forget to call <tt>super.dispose( session )</tt> when
+     * you override this method.
+     */
+    public void dispose( IoSession session ) throws Exception
+    {
+        removeSessionBuffer( session );
+    }
 
-        @Override
-        public void reset() throws IOException
+    private void removeSessionBuffer(IoSession session)
+    {
+        ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
+        if( buf != null )
         {
-            _currentListPos = _markPos;
-            final int size = _remainingBufs.size();
-            if(_currentListPos < size)
-            {
-                _remainingBufs.get(_currentListPos).reset();
-            }
-            for(int i = _currentListPos+1; i<size; i++)
-            {
-                _remainingBufs.get(i).reset();
-            }
+            buf.release();
+            session.removeAttribute( BUFFER );
         }
     }
 
+    private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
+
+    private void storeRemainingInSession(ByteBuffer buf, IoSession session)
+    {
+        ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
+        remainingBuf.setAutoExpand( true );
+        remainingBuf.put( buf );
+        session.setAttribute( BUFFER, remainingBuf );
+    }
 
-    public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+    public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
     {
 
         // get prior remaining data from accumulator
         ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
-        DataInputStream msg;
-
-
-        ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
-        if(!_remainingBufs.isEmpty())
+        ByteBuffer msg;
+        // if we have a session buffer, append data to that otherwise
+        // use the buffer read from the network directly
+        if( _remainingBuf != null )
         {
-            _remainingBufs.add(bais);
-            msg = new DataInputStream(new RemainingByteArrayInputStream());
+            _remainingBuf.put(buf);
+            _remainingBuf.flip();
+            msg = _remainingBuf;
         }
         else
         {
-            msg = new DataInputStream(bais);
+            msg = ByteBuffer.wrap(buf);
         }
-
-        boolean enoughData = true;
-        while (enoughData)
+        
+        if (_expectProtocolInitiation  
+            || (firstDecode
+                && (msg.remaining() > 0)
+                && (msg.get(msg.position()) == (byte)'A')))
         {
-            if(!_expectProtocolInitiation)
+            if (_piDecoder.decodable(msg.buf()))
             {
-                enoughData = _dataBlockDecoder.decodable(msg);
-                if (enoughData)
-                {
-                    dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
-                }
+                dataBlocks.add(new ProtocolInitiation(msg.buf()));
             }
-            else
+        }
+        else
+        {
+            boolean enoughData = true;
+            while (enoughData)
             {
-                enoughData = _piDecoder.decodable(msg);
-                if (enoughData)
-                {
-                    dataBlocks.add(new ProtocolInitiation(msg));
-                }
-
-            }
+                int pos = msg.position();
 
-            if(!enoughData)
-            {
-                if(!_remainingBufs.isEmpty())
+                enoughData = _dataBlockDecoder.decodable(msg);
+                msg.position(pos);
+                if (enoughData)
                 {
-                    _remainingBufs.remove(_remainingBufs.size()-1);
-                    ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
-                    while(iterator.hasNext() && iterator.next().available() == 0)
-                    {
-                        iterator.remove();
-                    }
+                    dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
                 }
-                if(bais.available()!=0)
+                else
                 {
-                    byte[] remaining = new byte[bais.available()];
-                    bais.read(remaining);
-                    _remainingBufs.add(new ByteArrayInputStream(remaining));
+                    _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
+                    _remainingBuf.setAutoExpand(true);
+                    _remainingBuf.put(msg);
                 }
             }
         }
+        if(firstDecode && dataBlocks.size() > 0)
+        {
+            firstDecode = false;
+        }
         return dataBlocks;
     }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Fri Oct 21 01:19:00 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
  */
 public class ClientProperties
 {
-
+  
     /**
      * Currently with Qpid it is not possible to change the client ID.
      * If one is not specified upon connection construction, an id is generated automatically.
@@ -68,50 +68,67 @@ public class ClientProperties
      * by the broker in TuneOK it will be used as the heartbeat interval.
      * If not a warning will be printed and the max value specified for
      * heartbeat in TuneOK will be used
-     *
+     * 
      * The default idle timeout is set to 120 secs
      */
     public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
     public static final long DEFAULT_IDLE_TIMEOUT = 120000;
-
+    
     public static final String HEARTBEAT = "qpid.heartbeat";
     public static final int HEARTBEAT_DEFAULT = 120;
-
+    
     /**
      * This value will be used to determine the default destination syntax type.
      * Currently the two types are Binding URL (java only) and the Addressing format (used by
-     * all clients).
+     * all clients). 
      */
     public static final String DEST_SYNTAX = "qpid.dest_syntax";
-
+    
     public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
 
-    public static final String AMQP_VERSION = "qpid.amqp.version";
-
-    public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
+     /**
+     * ==========================================================
+     * Those properties are used when the io size should be bounded
+     * ==========================================================
+     */
 
     /**
-     * System properties to change the default timeout used during
-     * synchronous operations.
+     * When set to true the io layer throttle down producers and consumers
+     * when written or read messages are accumulating and exceeding a certain size.
+     * This is especially useful when a the producer rate is greater than the network
+     * speed.
+     * type: boolean
      */
-    public static final String QPID_SYNC_OP_TIMEOUT = "qpid.sync_op_timeout";
-    public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
+    public static final String PROTECTIO_PROP_NAME = "protectio";
 
+    //=== The following properties are only used when the previous one is true.
     /**
-     * A default timeout value for synchronous operations
+     * Max size of read messages that can be stored within the MINA layer
+     * type: int
      */
-    public static final int DEFAULT_SYNC_OPERATION_TIMEOUT = 60000;
+    public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+    public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
+    /**
+     * Max size of written messages that can be stored within the MINA layer
+     * type: int
+     */
+    public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
+    public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
 
+    public static final String AMQP_VERSION = "qpid.amqp.version";
+    
+    private static ClientProperties _instance = new ClientProperties();
+    
     /*
-    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME =
+    public static final QpidProperty<Boolean>  IGNORE_SET_CLIENTID_PROP_NAME = 
         QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
+    
     public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
         QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
+    
+    
     public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
         QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
+    
+    
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 import org.apache.qpid.AMQException;
 
@@ -36,7 +34,7 @@ public interface AMQBody
      */
     public abstract int getSize();
     
-    public void writePayload(DataOutputStream buffer) throws IOException;
+    public void writePayload(ByteBuffer buffer);
     
-    void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
+    void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Fri Oct 21 01:19:00 2011
@@ -20,10 +20,7 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
+import org.apache.mina.common.ByteBuffer;
 
 /**
  * A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -42,6 +39,25 @@ public abstract class AMQDataBlock imple
      * Writes the datablock to the specified buffer.
      * @param buffer
      */
-    public abstract void writePayload(DataOutputStream buffer) throws IOException;
+    public abstract void writePayload(ByteBuffer buffer);
+
+    public ByteBuffer toByteBuffer()
+    {
+        final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
+
+        writePayload(buffer);    
+        buffer.flip();
+        return buffer;
+    }
+
+    public java.nio.ByteBuffer toNioByteBuffer()
+    {
+        final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
+
+        ByteBuffer buf = ByteBuffer.wrap(buffer);
+        writePayload(buf);    
+        buffer.flip();
+        return buffer;
+    }
 
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Fri Oct 21 01:19:00 2011
@@ -20,14 +20,18 @@
  */
 package org.apache.qpid.framing;
 
+import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.IoSession;
+import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInputStream;
-import java.io.IOException;
-
 public class AMQDataBlockDecoder
 {
+    private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
 
     private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
 
@@ -43,32 +47,27 @@ public class AMQDataBlockDecoder
     public AMQDataBlockDecoder()
     { }
 
-    public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
+    public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
     {
-        final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
+        final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
         // type, channel, body length and end byte
         if (remainingAfterAttributes < 0)
         {
             return false;
         }
 
-        in.mark(8);
-        in.skip(1 + 2);
-
-
+        in.position(in.position() + 1 + 2);
         // Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt() 
-        final long bodySize = in.readInt() & 0xffffffffL;
-
-        in.reset();
+        final long bodySize = in.getInt() & 0xffffffffL; 
 
         return (remainingAfterAttributes >= bodySize);
 
     }
 
-    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
-            throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
+    public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
+        throws AMQFrameDecodingException, AMQProtocolVersionException
     {
-        final byte type = in.readByte();
+        final byte type = in.get();
 
         BodyFactory bodyFactory;
         if (type == AMQMethodBody.TYPE)
@@ -85,8 +84,8 @@ public class AMQDataBlockDecoder
             throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
         }
 
-        final int channel = in.readUnsignedShort();
-        final long bodySize = EncodingUtils.readUnsignedInteger(in);
+        final int channel = in.getUnsignedShort();
+        final long bodySize = in.getUnsignedInt();
 
         // bodySize can be zero
         if ((channel < 0) || (bodySize < 0))
@@ -97,7 +96,7 @@ public class AMQDataBlockDecoder
 
         AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
 
-        byte marker = in.readByte();
+        byte marker = in.get();
         if ((marker & 0xFF) != 0xCE)
         {
             throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
@@ -107,4 +106,26 @@ public class AMQDataBlockDecoder
         return frame;
     }
 
+    public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
+    {
+        AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
+        if (bodyFactory == null)
+        {
+            AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
+            bodyFactory = new AMQMethodBodyFactory(protocolSession);
+            session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
+        }
+        
+        out.write(createAndPopulateFrame(bodyFactory, in));
+    }
+
+    public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
+    {
+        return decodable(msg.buf());
+    }
+
+    public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
+    {
+        return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
+    }
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,7 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
 
 public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
 {
@@ -38,7 +36,7 @@ public class AMQFrame extends AMQDataBlo
         _bodyFrame = bodyFrame;
     }
 
-    public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
+    public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
     {
         this._channel = channel;
         this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -55,13 +53,13 @@ public class AMQFrame extends AMQDataBlo
     }
 
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+    public void writePayload(ByteBuffer buffer)
     {
-        buffer.writeByte(_bodyFrame.getFrameType());
+        buffer.put(_bodyFrame.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, _channel);
         EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
         _bodyFrame.writePayload(buffer);
-        buffer.writeByte(FRAME_END_BYTE);
+        buffer.put(FRAME_END_BYTE);
     }
 
     public final int getChannel()
@@ -79,48 +77,48 @@ public class AMQFrame extends AMQDataBlo
         return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
     }
 
-    public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
+    public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
     {
-        buffer.writeByte(body.getFrameType());
+        buffer.put(body.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
         body.writePayload(buffer);
-        buffer.writeByte(FRAME_END_BYTE);
+        buffer.put(FRAME_END_BYTE);
 
     }
 
-    public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
+    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
     {
-        buffer.writeByte(body1.getFrameType());
+        buffer.put(body1.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
         body1.writePayload(buffer);
-        buffer.writeByte(FRAME_END_BYTE);
-        buffer.writeByte(body2.getFrameType());
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body2.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
         body2.writePayload(buffer);
-        buffer.writeByte(FRAME_END_BYTE);
+        buffer.put(FRAME_END_BYTE);
 
     }
 
-    public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
+    public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
     {
-        buffer.writeByte(body1.getFrameType());
+        buffer.put(body1.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
         body1.writePayload(buffer);
-        buffer.writeByte(FRAME_END_BYTE);
-        buffer.writeByte(body2.getFrameType());
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body2.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
         body2.writePayload(buffer);
-        buffer.writeByte(FRAME_END_BYTE);
-        buffer.writeByte(body3.getFrameType());
+        buffer.put(FRAME_END_BYTE);
+        buffer.put(body3.getFrameType());
         EncodingUtils.writeUnsignedShort(buffer, channel);
         EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
         body3.writePayload(buffer);
-        buffer.writeByte(FRAME_END_BYTE);
+        buffer.put(FRAME_END_BYTE);
 
     }
 

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Fri Oct 21 01:19:00 2011
@@ -20,14 +20,12 @@
  */
 package org.apache.qpid.framing;
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 
-import java.io.DataOutputStream;
-import java.io.IOException;
-
 public interface AMQMethodBody extends AMQBody
 {
     public static final byte TYPE = 1;
@@ -45,12 +43,12 @@ public interface AMQMethodBody extends A
     /** @return unsigned short */
     public int getMethod();
 
-    public void writeMethodPayload(DataOutputStream buffer) throws IOException;
+    public void writeMethodPayload(ByteBuffer buffer);
 
 
     public int getSize();
 
-    public void writePayload(DataOutputStream buffer) throws IOException;
+    public void writePayload(ByteBuffer buffer);
 
     //public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
 

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Fri Oct 21 01:19:00 2011
@@ -20,14 +20,13 @@
  */
 package org.apache.qpid.framing;
 
+import org.apache.mina.common.ByteBuffer;
+
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInputStream;
-import java.io.IOException;
-
 public class AMQMethodBodyFactory implements BodyFactory
 {
     private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
@@ -39,7 +38,7 @@ public class AMQMethodBodyFactory implem
         _protocolSession = protocolSession;
     }
 
-    public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
+    public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
     {
         return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
     }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Fri Oct 21 01:19:00 2011
@@ -21,16 +21,13 @@ package org.apache.qpid.framing;
  *
  */
 
+import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQChannelException;
 import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.protocol.AMQConstant;
 import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
-
 public abstract class AMQMethodBodyImpl implements AMQMethodBody
 {
     public static final byte TYPE = 1;
@@ -101,7 +98,7 @@ public abstract class AMQMethodBodyImpl 
         return 2 + 2 + getBodySize();
     }
 
-    public void writePayload(DataOutputStream buffer) throws IOException
+        public void writePayload(ByteBuffer buffer)
     {
         EncodingUtils.writeUnsignedShort(buffer, getClazz());
         EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -109,12 +106,12 @@ public abstract class AMQMethodBodyImpl 
     }
 
 
-    protected byte readByte(DataInputStream buffer) throws IOException
+    protected byte readByte(ByteBuffer buffer)
     {
-        return buffer.readByte();
+        return buffer.get();
     }
 
-    protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
+    protected AMQShortString readAMQShortString(ByteBuffer buffer)
     {
         return EncodingUtils.readAMQShortString(buffer);
     }
@@ -124,27 +121,27 @@ public abstract class AMQMethodBodyImpl 
         return EncodingUtils.encodedShortStringLength(string);
     }
 
-    protected void writeByte(DataOutputStream buffer, byte b) throws IOException
+    protected void writeByte(ByteBuffer buffer, byte b)
     {
-        buffer.writeByte(b);
+        buffer.put(b);
     }
 
-    protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
+    protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
     {
         EncodingUtils.writeShortStringBytes(buffer, string);
     }
 
-    protected int readInt(DataInputStream buffer) throws IOException
+    protected int readInt(ByteBuffer buffer)
     {
-        return buffer.readInt();
+        return buffer.getInt();
     }
 
-    protected void writeInt(DataOutputStream buffer, int i) throws IOException
+    protected void writeInt(ByteBuffer buffer, int i)
     {
-        buffer.writeInt(i);
+        buffer.putInt(i);
     }
 
-    protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
+    protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
     {
         return EncodingUtils.readFieldTable(buffer);
     }
@@ -154,19 +151,19 @@ public abstract class AMQMethodBodyImpl 
         return EncodingUtils.encodedFieldTableLength(table);  //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
+    protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
     {
         EncodingUtils.writeFieldTableBytes(buffer, table);
     }
 
-    protected long readLong(DataInputStream buffer) throws IOException
+    protected long readLong(ByteBuffer buffer)
     {
-        return buffer.readLong();
+        return buffer.getLong();
     }
 
-    protected void writeLong(DataOutputStream buffer, long l) throws IOException
+    protected void writeLong(ByteBuffer buffer, long l)
     {
-        buffer.writeLong(l);
+        buffer.putLong(l);
     }
 
     protected int getSizeOf(byte[] response)
@@ -174,86 +171,87 @@ public abstract class AMQMethodBodyImpl 
         return (response == null) ? 4 : response.length + 4;
     }
 
-    protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
+    protected void writeBytes(ByteBuffer buffer, byte[] data)
     {
         EncodingUtils.writeBytes(buffer,data);
     }
 
-    protected byte[] readBytes(DataInputStream buffer) throws IOException
+    protected byte[] readBytes(ByteBuffer buffer)
     {
         return EncodingUtils.readBytes(buffer);
     }
 
-    protected short readShort(DataInputStream buffer) throws IOException
+    protected short readShort(ByteBuffer buffer)
     {
         return EncodingUtils.readShort(buffer);
     }
 
-    protected void writeShort(DataOutputStream buffer, short s) throws IOException
+    protected void writeShort(ByteBuffer buffer, short s)
     {
         EncodingUtils.writeShort(buffer, s);
     }
 
-    protected Content readContent(DataInputStream buffer)
+    protected Content readContent(ByteBuffer buffer)
     {
-        return null;
+        return null;  //To change body of created methods use File | Settings | File Templates.
     }
 
     protected int getSizeOf(Content body)
     {
-        return 0;
+        return 0;  //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected void writeContent(DataOutputStream buffer, Content body)
+    protected void writeContent(ByteBuffer buffer, Content body)
     {
+        //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected byte readBitfield(DataInputStream buffer) throws IOException
+    protected byte readBitfield(ByteBuffer buffer)
     {
-        return readByte(buffer);
+        return readByte(buffer);  //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected int readUnsignedShort(DataInputStream buffer) throws IOException
+    protected int readUnsignedShort(ByteBuffer buffer)
     {
-        return buffer.readUnsignedShort();
+        return buffer.getUnsignedShort();  //To change body of created methods use File | Settings | File Templates.
     }
 
-    protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
+    protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
     {
-        buffer.writeByte(bitfield0);
+        buffer.put(bitfield0);
     }
 
-    protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
+    protected void writeUnsignedShort(ByteBuffer buffer, int s)
     {
         EncodingUtils.writeUnsignedShort(buffer, s);
     }
 
-    protected long readUnsignedInteger(DataInputStream buffer) throws IOException
+    protected long readUnsignedInteger(ByteBuffer buffer)
     {
-        return EncodingUtils.readUnsignedInteger(buffer);
+        return buffer.getUnsignedInt();
     }
-    protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
+    protected void writeUnsignedInteger(ByteBuffer buffer, long i)
     {
         EncodingUtils.writeUnsignedInteger(buffer, i);
     }
 
 
-    protected short readUnsignedByte(DataInputStream buffer) throws IOException
+    protected short readUnsignedByte(ByteBuffer buffer)
     {
-        return (short) buffer.readUnsignedByte();
+        return buffer.getUnsigned();
     }
 
-    protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
+    protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
     {
         EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
     }
 
-    protected long readTimestamp(DataInputStream buffer) throws IOException
+    protected long readTimestamp(ByteBuffer buffer)
     {
         return EncodingUtils.readTimestamp(buffer);
     }
 
-    protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
+    protected void writeTimestamp(ByteBuffer buffer, long t)
     {
         EncodingUtils.writeTimestamp(buffer, t);
     }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Fri Oct 21 01:19:00 2011
@@ -21,11 +21,10 @@
 
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
 
 
 public abstract interface AMQMethodBodyInstanceFactory
 {
-    public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
+    public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;    
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Fri Oct 21 01:19:00 2011
@@ -21,12 +21,11 @@
 
 package org.apache.qpid.framing;
 
+import org.apache.mina.common.ByteBuffer;
+
 import org.slf4j.Logger;
 import org.slf4j.LoggerFactory;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
 import java.util.*;
 import java.lang.ref.WeakReference;
 
@@ -38,10 +37,6 @@ import java.lang.ref.WeakReference;
  */
 public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
 {
-    /**
-     * The maximum number of octets in AMQ short string as defined in AMQP specification
-     */
-    public static final int MAX_LENGTH = 255;
     private static final byte MINUS = (byte)'-';
     private static final byte ZERO = (byte) '0';
 
@@ -123,19 +118,22 @@ public final class AMQShortString implem
 
     public AMQShortString(byte[] data)
     {
-        if (data == null)
-        {
-            throw new NullPointerException("Cannot create AMQShortString with null data[]");
-        }
-        if (data.length > MAX_LENGTH)
-        {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
-        }
+
         _data = data.clone();
         _length = data.length;
         _offset = 0;
     }
 
+    public AMQShortString(byte[] data, int pos)
+    {
+        final int size = data[pos++];
+        final byte[] dataCopy = new byte[size];
+        System.arraycopy(data,pos,dataCopy,0,size);
+        _length = size;
+        _data = dataCopy;
+        _offset = 0;
+    }
+
     public AMQShortString(String data)
     {
         this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
@@ -148,12 +146,7 @@ public final class AMQShortString implem
         {
             throw new NullPointerException("Cannot create AMQShortString with null char[]");
         }
-        // the current implementation of 0.8/0.9.x short string encoding
-        // supports only ASCII characters
-        if (data.length> MAX_LENGTH)
-        {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
-        }
+
         final int length = data.length;
         final byte[] stringBytes = new byte[length];
         int hash = 0;
@@ -172,17 +165,6 @@ public final class AMQShortString implem
 
     public AMQShortString(CharSequence charSequence)
     {
-        if (charSequence == null)
-        {
-            // it should be possible to create short string for null data
-            charSequence = "";
-        }
-        // the current implementation of 0.8/0.9.x short string encoding
-        // supports only ASCII characters
-        if (charSequence.length() > MAX_LENGTH)
-        {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
-        }
         final int length = charSequence.length();
         final byte[] stringBytes = new byte[length];
         int hash = 0;
@@ -200,33 +182,31 @@ public final class AMQShortString implem
 
     }
 
-    private AMQShortString(DataInputStream data, final int length) throws IOException
+    private AMQShortString(ByteBuffer data, final int length)
     {
-        if (length > MAX_LENGTH)
+        if(data.isDirect() || data.isReadOnly())
         {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+            byte[] dataBytes = new byte[length];
+            data.get(dataBytes);
+            _data = dataBytes;
+            _offset = 0;
+        }
+        else
+        {
+
+            _data = data.array();
+            _offset = data.arrayOffset() + data.position();
+            data.skip(length);
+
         }
-        byte[] dataBytes = new byte[length];
-        data.read(dataBytes);
-        _data = dataBytes;
-        _offset = 0;
         _length = length;
 
     }
 
     private AMQShortString(final byte[] data, final int from, final int to)
     {
-        if (data == null)
-        {
-            throw new NullPointerException("Cannot create AMQShortString with null data[]");
-        }
-        int length = to - from;
-        if (length > MAX_LENGTH)
-        {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
-        }
         _offset = from;
-        _length = length;
+        _length = to - from;
         _data = data;
     }
 
@@ -265,9 +245,32 @@ public final class AMQShortString implem
         return new CharSubSequence(start, end);
     }
 
-    public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
+    public int writeToByteArray(byte[] encoding, int pos)
+    {
+        final int size = length();
+        encoding[pos++] = (byte) size;
+        System.arraycopy(_data,_offset,encoding,pos,size);
+        return pos+size;
+    }
+
+    public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
+    {
+
+
+        final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
+        if(shortString.length() == 0)
+        {
+            return null;
+        }
+        else
+        {
+            return shortString;
+        }
+    }
+
+    public static AMQShortString readFromBuffer(ByteBuffer buffer)
     {
-        final int length = buffer.readUnsignedByte();
+        final short length = buffer.getUnsigned();
         if (length == 0)
         {
             return null;
@@ -293,13 +296,13 @@ public final class AMQShortString implem
         }
     }
 
-    public void writeToBuffer(DataOutputStream buffer) throws IOException
+    public void writeToBuffer(ByteBuffer buffer)
     {
 
         final int size = length();
         //buffer.setAutoExpand(true);
-        buffer.write((byte) size);
-        buffer.write(_data, _offset, size);
+        buffer.put((byte) size);
+        buffer.put(_data, _offset, size);
 
     }
 
@@ -687,10 +690,6 @@ public final class AMQShortString implem
             size += term.length();
         }
 
-        if (size > MAX_LENGTH)
-        {
-            throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
-        }
         byte[] data = new byte[size];
         int pos = 0;
         final byte[] delimData = delim._data;

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQType.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
+
 import java.math.BigDecimal;
 
 /**
@@ -61,12 +60,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -107,12 +106,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeUnsignedInteger(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readUnsignedInteger(buffer);
         }
@@ -138,7 +137,7 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             BigDecimal bd = (BigDecimal) value;
 
@@ -151,7 +150,7 @@ public enum AMQType
             EncodingUtils.writeInteger(buffer, unscaled);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             byte places = EncodingUtils.readByte(buffer);
 
@@ -183,12 +182,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -247,7 +246,7 @@ public enum AMQType
          * @param value  An instance of the type.
          * @param buffer The byte buffer to write it to.
          */
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             // Ensure that the value is a FieldTable.
             if (!(value instanceof FieldTable))
@@ -268,7 +267,7 @@ public enum AMQType
          *
          * @return An instance of the type.
          */
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             try
             {
@@ -302,10 +301,10 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer)
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         { }
 
-        public Object readValueFromBuffer(DataInputStream buffer)
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return null;
         }
@@ -331,12 +330,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeLongstr(buffer, (byte[]) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readLongstr(buffer);
         }
@@ -361,12 +360,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -392,12 +391,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeLongStringBytes(buffer, (String) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readLongString(buffer);
         }
@@ -427,12 +426,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeBoolean(buffer, (Boolean) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readBoolean(buffer);
         }
@@ -462,12 +461,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeChar(buffer, (Character) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readChar(buffer);
         }
@@ -497,12 +496,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeByte(buffer, (Byte) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readByte(buffer);
         }
@@ -536,12 +535,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeShort(buffer, (Short) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readShort(buffer);
         }
@@ -578,12 +577,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeInteger(buffer, (Integer) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readInteger(buffer);
         }
@@ -625,12 +624,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeLong(buffer, (Long) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readLong(buffer);
         }
@@ -660,12 +659,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeFloat(buffer, (Float) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readFloat(buffer);
         }
@@ -699,12 +698,12 @@ public enum AMQType
             }
         }
 
-        public void writeValueImpl(Object value, DataOutputStream buffer) throws IOException
+        public void writeValueImpl(Object value, ByteBuffer buffer)
         {
             EncodingUtils.writeDouble(buffer, (Double) value);
         }
 
-        public Object readValueFromBuffer(DataInputStream buffer) throws IOException
+        public Object readValueFromBuffer(ByteBuffer buffer)
         {
             return EncodingUtils.readDouble(buffer);
         }
@@ -771,9 +770,9 @@ public enum AMQType
      * @param value  An instance of the type.
      * @param buffer The byte buffer to write it to.
      */
-    public void writeToBuffer(Object value, DataOutputStream buffer) throws IOException
+    public void writeToBuffer(Object value, ByteBuffer buffer)
     {
-        buffer.writeByte(identifier());
+        buffer.put(identifier());
         writeValueImpl(value, buffer);
     }
 
@@ -783,7 +782,7 @@ public enum AMQType
      * @param value  An instance of the type.
      * @param buffer The byte buffer to write it to.
      */
-    abstract void writeValueImpl(Object value, DataOutputStream buffer) throws IOException;
+    abstract void writeValueImpl(Object value, ByteBuffer buffer);
 
     /**
      * Reads an instance of the type from a specified byte buffer.
@@ -792,5 +791,5 @@ public enum AMQType
      *
      * @return An instance of the type.
      */
-    abstract Object readValueFromBuffer(DataInputStream buffer) throws IOException;
+    abstract Object readValueFromBuffer(ByteBuffer buffer);
 }

Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java?rev=1187150&r1=1187149&r2=1187150&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQTypedValue.java Fri Oct 21 01:19:00 2011
@@ -20,9 +20,8 @@
  */
 package org.apache.qpid.framing;
 
-import java.io.DataInputStream;
-import java.io.DataOutputStream;
-import java.io.IOException;
+import org.apache.mina.common.ByteBuffer;
+
 import java.util.Date;
 import java.util.Map;
 import java.math.BigDecimal;
@@ -61,7 +60,7 @@ public class AMQTypedValue
         _value = type.toNativeValue(value);
     }
 
-    private AMQTypedValue(AMQType type, DataInputStream buffer) throws IOException
+    private AMQTypedValue(AMQType type, ByteBuffer buffer)
     {
         _type = type;
         _value = type.readValueFromBuffer(buffer);
@@ -77,7 +76,7 @@ public class AMQTypedValue
         return _value;
     }
 
-    public void writeToBuffer(DataOutputStream buffer) throws IOException
+    public void writeToBuffer(ByteBuffer buffer)
     {
         _type.writeToBuffer(_value, buffer);
     }
@@ -87,9 +86,9 @@ public class AMQTypedValue
         return _type.getEncodingSize(_value);
     }
 
-    public static AMQTypedValue readFromBuffer(DataInputStream buffer) throws IOException
+    public static AMQTypedValue readFromBuffer(ByteBuffer buffer)
     {
-        AMQType type = AMQTypeMap.getType(buffer.readByte());
+        AMQType type = AMQTypeMap.getType(buffer.get());
 
         return new AMQTypedValue(type, buffer);
     }



---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project:      http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org