You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by sh...@apache.org on 2011/10/20 20:43:26 UTC
svn commit: r1186990 [33/43] - in /qpid/branches/QPID-2519: ./ bin/ cpp/
cpp/bindings/ cpp/bindings/qmf/python/ cpp/bindings/qmf/ruby/
cpp/bindings/qmf/tests/ cpp/bindings/qmf2/ cpp/bindings/qmf2/examples/cpp/
cpp/bindings/qmf2/python/ cpp/bindings/qmf...
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/message/TestMessageHelper.java Thu Oct 20 18:42:46 2011
@@ -43,4 +43,9 @@ public class TestMessageHelper
{
return new JMSStreamMessage(AMQMessageDelegateFactory.FACTORY_0_8);
}
+
+ public static JMSObjectMessage newJMSObjectMessage()
+ {
+ return new JMSObjectMessage(AMQMessageDelegateFactory.FACTORY_0_8);
+ }
}
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java Thu Oct 20 18:42:46 2011
@@ -20,23 +20,24 @@
*/
package org.apache.qpid.client.protocol;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+
import junit.framework.TestCase;
-import org.apache.qpid.framing.AMQFrame;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQAuthenticationException;
+import org.apache.qpid.client.MockAMQConnection;
+import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.framing.AMQBody;
+import org.apache.qpid.framing.AMQFrame;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.amqp_8_0.BasicRecoverOkBodyImpl;
-import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.transport.TestNetworkDriver;
-import org.apache.qpid.client.MockAMQConnection;
-import org.apache.qpid.client.AMQAuthenticationException;
-import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.transport.TestNetworkConnection;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
-import java.util.concurrent.CountDownLatch;
-import java.util.concurrent.TimeUnit;
-
/**
* This is a test address QPID-1431 where frame listeners would fail to be notified of an incomming exception.
*
@@ -72,8 +73,8 @@ public class AMQProtocolHandlerTest exte
public void setUp() throws Exception
{
//Create a new ProtocolHandler with a fake connection.
- _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='vm://:1'"));
- _handler.setNetworkDriver(new TestNetworkDriver());
+ _handler = new AMQProtocolHandler(new MockAMQConnection("amqp://guest:guest@client/test?brokerlist='tcp://localhost:1'"));
+ _handler.setNetworkConnection(new TestNetworkConnection());
AMQBody body = BasicRecoverOkBodyImpl.getFactory().newInstance(null, 1);
_blockFrame = new AMQFrame(0, body);
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/BrokerDetails/BrokerDetailsTest.java Thu Oct 20 18:42:46 2011
@@ -43,15 +43,6 @@ public class BrokerDetailsTest extends T
assertTrue(broker.getProperty("immediatedelivery").equals("true"));
}
- public void testVMBroker() throws URLSyntaxException
- {
- String url = "vm://:2";
-
- AMQBrokerDetails broker = new AMQBrokerDetails(url);
- assertTrue(broker.getTransport().equals("vm"));
- assertEquals(broker.getPort(), 2);
- }
-
public void testTransportsDefaultToTCP() throws URLSyntaxException
{
String url = "localhost:5672";
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseMethodHandlerNoCloseOk.java Thu Oct 20 18:42:46 2011
@@ -73,7 +73,7 @@ public class ChannelCloseMethodHandlerNo
{
throw new AMQNoRouteException("Error: " + reason, null, null);
}
- else if (errorCode == AMQConstant.INVALID_ARGUMENT)
+ else if (errorCode == AMQConstant.ARGUMENT_INVALID)
{
_logger.debug("Broker responded with Invalid Argument.");
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/client/connectionurl/ConnectionURLTest.java Thu Oct 20 18:42:46 2011
@@ -300,53 +300,6 @@ public class ConnectionURLTest extends T
assertTrue(connectionurl.getOption("immediatedelivery").equals("true"));
}
- public void testSinglevmURL() throws URLSyntaxException
- {
- String url = "amqp://guest:guest@/test?brokerlist='vm://:2'";
-
- ConnectionURL connectionurl = new AMQConnectionURL(url);
-
- assertTrue(connectionurl.getFailoverMethod() == null);
- assertTrue(connectionurl.getUsername().equals("guest"));
- assertTrue(connectionurl.getPassword().equals("guest"));
- assertTrue(connectionurl.getVirtualHost().equals("/test"));
-
- assertTrue(connectionurl.getBrokerCount() == 1);
-
- BrokerDetails service = connectionurl.getBrokerDetails(0);
-
- assertTrue(service.getTransport().equals("vm"));
- assertTrue(service.getHost().equals(""));
- assertTrue(service.getPort() == 2);
-
- }
-
- public void testFailoverVMURL() throws URLSyntaxException
- {
- String url = "amqp://ritchiem:bob@/test?brokerlist='vm://:2;vm://:3',failover='roundrobin'";
-
- ConnectionURL connectionurl = new AMQConnectionURL(url);
-
- assertTrue(connectionurl.getFailoverMethod().equals("roundrobin"));
- assertTrue(connectionurl.getUsername().equals("ritchiem"));
- assertTrue(connectionurl.getPassword().equals("bob"));
- assertTrue(connectionurl.getVirtualHost().equals("/test"));
-
- assertTrue(connectionurl.getBrokerCount() == 2);
-
- BrokerDetails service = connectionurl.getBrokerDetails(0);
-
- assertTrue(service.getTransport().equals("vm"));
- assertTrue(service.getHost().equals(""));
- assertTrue(service.getPort() == 2);
-
- service = connectionurl.getBrokerDetails(1);
- assertTrue(service.getTransport().equals("vm"));
- assertTrue(service.getHost().equals(""));
- assertTrue(service.getPort() == 3);
- }
-
-
public void testNoVirtualHostURL()
{
String url = "amqp://user@?brokerlist='tcp://localhost:5672'";
@@ -487,27 +440,6 @@ public class ConnectionURLTest extends T
}
- public void testSocketProtocol() throws URLSyntaxException
- {
- String url = "amqp://guest:guest@id/test" + "?brokerlist='socket://VM-Unique-socketID'";
-
- try
- {
- AMQConnectionURL curl = new AMQConnectionURL(url);
- assertNotNull(curl);
- assertEquals(1, curl.getBrokerCount());
- assertNotNull(curl.getBrokerDetails(0));
- assertEquals(BrokerDetails.SOCKET, curl.getBrokerDetails(0).getTransport());
- assertEquals("VM-Unique-socketID", curl.getBrokerDetails(0).getHost());
- assertEquals("URL does not toString as expected",
- url.replace(":guest", ":********"), curl.toString());
- }
- catch (URLSyntaxException e)
- {
- fail(e.getMessage());
- }
- }
-
public void testSingleTransportMultiOptionOnBrokerURL() throws URLSyntaxException
{
String url = "amqp://guest:guest@/test?brokerlist='tcp://localhost:5672?foo='jim'&bar='bob'&fred='jimmy'',routingkey='jim',timeout='200',immediatedelivery='true'";
@@ -549,6 +481,37 @@ public class ConnectionURLTest extends T
assertTrue("String representation should contain options and values", url.toString().contains("maxprefetch='12345'"));
}
+ public void testHostNamesWithUnderScore() throws URLSyntaxException
+ {
+ String url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score:6672'";
+
+ ConnectionURL connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ assertTrue(connectionurl.getBrokerCount() == 1);
+ BrokerDetails service = connectionurl.getBrokerDetails(0);
+ assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getHost().equals("under_score"));
+ assertTrue(service.getPort() == 6672);
+
+ url = "amqp://guest:guest@clientid/test?brokerlist='tcp://under_score'";
+
+ connectionurl = new AMQConnectionURL(url);
+
+ assertTrue(connectionurl.getUsername().equals("guest"));
+ assertTrue(connectionurl.getPassword().equals("guest"));
+ assertTrue(connectionurl.getVirtualHost().equals("/test"));
+
+ assertTrue(connectionurl.getBrokerCount() == 1);
+ service = connectionurl.getBrokerDetails(0);
+ assertTrue(service.getTransport().equals("tcp"));
+ assertTrue(service.getHost().equals("under_score"));
+ assertTrue(service.getPort() == 5672);
+ }
+
public static junit.framework.Test suite()
{
return new junit.framework.TestSuite(ConnectionURLTest.class);
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/ConnectionFactoryTest.java Thu Oct 20 18:42:46 2011
@@ -21,10 +21,10 @@
package org.apache.qpid.test.unit.jndi;
import junit.framework.TestCase;
+
import org.apache.qpid.client.AMQConnectionFactory;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.url.URLSyntaxException;
public class ConnectionFactoryTest extends TestCase
{
@@ -34,21 +34,9 @@ public class ConnectionFactoryTest exten
public static final String URL = "amqp://guest:guest@clientID/test?brokerlist='tcp://localhost:5672'";
public static final String URL_STAR_PWD = "amqp://guest:********@clientID/test?brokerlist='tcp://localhost:5672'";
- public void testConnectionURLString()
+ public void testConnectionURLStringMasksPassword() throws Exception
{
- AMQConnectionFactory factory = new AMQConnectionFactory();
-
- assertNull("ConnectionURL should have no value at start",
- factory.getConnectionURL());
-
- try
- {
- factory.setConnectionURLString(URL);
- }
- catch (URLSyntaxException e)
- {
- fail(e.getMessage());
- }
+ AMQConnectionFactory factory = new AMQConnectionFactory(URL);
//URL will be returned with the password field swapped for '********'
assertEquals("Connection URL not correctly set", URL_STAR_PWD, factory.getConnectionURLString());
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/jndi/JNDIPropertyFileTest.java Thu Oct 20 18:42:46 2011
@@ -24,6 +24,7 @@ import java.util.Properties;
import javax.jms.Queue;
import javax.jms.Topic;
+import javax.naming.ConfigurationException;
import javax.naming.Context;
import javax.naming.InitialContext;
@@ -67,4 +68,22 @@ public class JNDIPropertyFileTest extend
assertEquals("Topic" + i + "WithSpace",bindingKey.asString());
}
}
+
+ public void testConfigurationErrors() throws Exception
+ {
+ Properties properties = new Properties();
+ properties.put("java.naming.factory.initial", "org.apache.qpid.jndi.PropertiesFileInitialContextFactory");
+ properties.put("destination.my-queue","amq.topic/test;create:always}");
+
+ try
+ {
+ ctx = new InitialContext(properties);
+ fail("A configuration exception should be thrown with details about the address syntax error");
+ }
+ catch(ConfigurationException e)
+ {
+ assertTrue("Incorrect exception", e.getMessage().contains("Failed to parse entry: amq.topic/test;create:always}"));
+ }
+
+ }
}
Modified: qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java (original)
+++ qpid/branches/QPID-2519/java/client/src/test/java/org/apache/qpid/test/unit/message/TestAMQSession.java Thu Oct 20 18:42:46 2011
@@ -20,17 +20,24 @@
*/
package org.apache.qpid.test.unit.message;
-import org.apache.qpid.client.*;
+import java.util.Map;
+
+import javax.jms.Destination;
+import javax.jms.JMSException;
+import javax.jms.TemporaryQueue;
+import javax.jms.Topic;
+import javax.jms.TopicSubscriber;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQDestination;
+import org.apache.qpid.client.AMQSession;
+import org.apache.qpid.client.BasicMessageConsumer_0_8;
+import org.apache.qpid.client.BasicMessageProducer_0_8;
+import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.message.AMQMessageDelegateFactory;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.AMQException;
-
-import javax.jms.*;
-
-import java.util.Map;
public class TestAMQSession extends AMQSession<BasicMessageConsumer_0_8, BasicMessageProducer_0_8>
{
@@ -57,7 +64,12 @@ public class TestAMQSession extends AMQS
}
- public void sendCommit() throws AMQException, FailoverException
+ public void commitImpl() throws AMQException, FailoverException
+ {
+
+ }
+
+ public void acknowledgeImpl()
{
}
@@ -117,7 +129,7 @@ public class TestAMQSession extends AMQS
}
- public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, boolean waitUntilSent, long producerId)
+ public BasicMessageProducer_0_8 createMessageProducer(Destination destination, boolean mandatory, boolean immediate, long producerId)
{
return null;
}
@@ -195,4 +207,10 @@ public class TestAMQSession extends AMQS
{
return false;
}
+
+ @Override
+ public AMQException getLastException()
+ {
+ return null;
+ }
}
Modified: qpid/branches/QPID-2519/java/common.xml
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common.xml?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common.xml (original)
+++ qpid/branches/QPID-2519/java/common.xml Thu Oct 20 18:42:46 2011
@@ -23,7 +23,9 @@
<dirname property="project.root" file="${ant.file.common}"/>
<property name="project.name" value="qpid"/>
- <property name="project.version" value="0.9"/>
+ <property name="project.version" value="0.13"/>
+ <property name="project.url" value="http://qpid.apache.org"/>
+ <property name="project.groupid" value="org.apache.qpid"/>
<property name="project.namever" value="${project.name}-${project.version}"/>
<property name="resources" location="${project.root}/resources"/>
@@ -40,7 +42,6 @@
<property name="build.report" location="${build}/report"/>
<property name="build.release" location="${build}/release"/>
<property name="build.release.prepare" location="${build.release}/prepare"/>
- <property name="build.data" location="${build.scratch}/data"/>
<property name="build.plugins" location="${build}/lib/plugins"/>
<property name="build.coveragereport" location="${build}/coverage"/>
<property name="build.findbugs" location="${build}/findbugs"/>
@@ -63,6 +64,11 @@
<property name="mllib.dir" value="${project.root}/../python" />
<property name="findbugs.dir" value="${project.root}/lib/findbugs" />
+ <!-- properties used to control Ant Eclipse for Eclipse classpath/project files etc -->
+ <property name="eclipse.updatealways" value="false"/>
+ <property name="eclipse.compilercompliance" value="5.0"/>
+ <property name="eclipse.container" value="JVM 1.5"/>
+
<path id="cobertura.classpath">
<fileset dir="${cobertura.dir}">
<include name="cobertura.jar" />
@@ -71,6 +77,7 @@
</path>
<property name="maven.local.repo" value="${build.scratch}/maven-local-repo"/>
+ <property name="maven.settings.xml" value="${project.root}/maven-settings.xml"/>
<property name="maven.unique.version" value="false"/>
<property name="maven.snapshot" value="true"/>
<condition property="maven.version.suffix" value="" else="-SNAPSHOT">
@@ -124,8 +131,6 @@
</sequential>
</macrodef>
-
-
<macrodef name="jython">
<attribute name="path"/>
<element name="args"/>
@@ -322,6 +327,20 @@
results directory:
${build.results}
+
+ ant eclipse
+
+ Generates project and classpath files for the Eclispe IDE. Requires that
+ the Ant Eclipse task (http://ant-eclipse.sourceforge.net/) has been installed
+ in $ANT_HOME/lib.
+
+ The following system properties will be passed to the task. These can be usefully
+ overridden from the command line.
+
+ eclipse.updatealways - forces Eclipse files to be regenerated even if they are newer then the build.xml (default ${eclipse.updatealways}).
+ eclipse.container - controls the Eclipse container (default ${eclipse.container}).
+ eclipse.compilercompliance" - controls the Eclipse compiler compliance (default ${eclipse.compilercompliance}).
+
</echo>
</target>
Modified: qpid/branches/QPID-2519/java/common/bin/qpid-run
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/bin/qpid-run?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/bin/qpid-run (original)
+++ qpid/branches/QPID-2519/java/common/bin/qpid-run Thu Oct 20 18:42:46 2011
@@ -77,7 +77,10 @@ fi
#Set the default system properties that we'll use now that they have
#all been initialised
-SYSTEM_PROPS="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL -DQPID_HOME=$QPID_HOME -DQPID_WORK=$QPID_WORK"
+declare -a SYSTEM_PROPS
+SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-Damqj.logging.level=$AMQJ_LOGGING_LEVEL"
+SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPID_HOME=$QPID_HOME"
+SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="-DQPID_WORK=$QPID_WORK"
#If logprefix or logsuffix set to use PID make that happen
#Otherwise just pass the value through for these props
@@ -90,7 +93,7 @@ if [ -n "$QPID_LOG_PREFIX" ]; then
log $INFO Using qpid logprefix property
LOG_PREFIX=" -Dlogprefix=$QPID_LOG_PREFIX"
fi
- SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_PREFIX}"
+ SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_PREFIX}"
fi
if [ -n "$QPID_LOG_SUFFIX" ]; then
@@ -101,10 +104,10 @@ if [ -n "$QPID_LOG_SUFFIX" ]; then
log $INFO Using qpig logsuffix property
LOG_SUFFIX=" -Dlogsuffix=$QPID_LOG_SUFFIX"
fi
- SYSTEM_PROPS="${SYSTEM_PROPS} ${LOG_SUFFIX}"
+ SYSTEM_PROPS[${#SYSTEM_PROPS[@]}]="${LOG_SUFFIX}"
fi
-log $INFO System Properties set to $SYSTEM_PROPS
+log $INFO System Properties set to ${SYSTEM_PROPS[@]}
log $INFO QPID_OPTS set to $QPID_OPTS
program=$(basename $0)
@@ -254,6 +257,6 @@ if $cygwin; then
JAVA=$(cygpath -u $JAVA)
fi
-COMMAND=($JAVA $JAVA_VM $QPID_PNAME $JAVA_GC $JAVA_MEM $SYSTEM_PROPS $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
+COMMAND=($JAVA $JAVA_VM $QPID_PNAME $JAVA_GC $JAVA_MEM "${SYSTEM_PROPS[@]}" $JAVA_OPTS $QPID_OPTS "${JAVA_ARGS[@]}")
DISPATCH
Modified: qpid/branches/QPID-2519/java/common/src/main/java/common.bnd
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/common.bnd?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/common.bnd (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/common.bnd Thu Oct 20 18:42:46 2011
@@ -17,7 +17,7 @@
# under the License.
#
-ver: 0.9.0
+ver: 0.13.0
Bundle-SymbolicName: qpid-common
Bundle-Version: ${ver}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQChannelException.java Thu Oct 20 18:42:46 2011
@@ -54,6 +54,7 @@ public class AMQChannelException extends
public AMQFrame getCloseFrame(int channel)
{
MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
- return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), new AMQShortString(getMessage()),_classId,_methodId));
+ return new AMQFrame(channel, reg.createChannelCloseBody(getErrorCode() == null ? AMQConstant.INTERNAL_ERROR.getCode() : getErrorCode().getCode(), getMessageAsShortString(),_classId,_methodId));
}
+
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQConnectionException.java Thu Oct 20 18:42:46 2011
@@ -62,9 +62,10 @@ public class AMQConnectionException exte
MethodRegistry reg = MethodRegistry.getMethodRegistry(new ProtocolVersion(major,minor));
return new AMQFrame(0,
reg.createConnectionCloseBody(getErrorCode().getCode(),
- new AMQShortString(getMessage()),
+ getMessageAsShortString(),
_classId,
_methodId));
}
+
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQException.java Thu Oct 20 18:42:46 2011
@@ -20,6 +20,7 @@
*/
package org.apache.qpid;
+import org.apache.qpid.framing.AMQShortString;
import org.apache.qpid.protocol.AMQConstant;
/**
@@ -121,4 +122,19 @@ public class AMQException extends Except
return newAMQE;
}
+
+ /**
+ * Truncates the exception message to 255 characters if its length exceeds 255.
+ *
+ * @return exception message
+ */
+ public AMQShortString getMessageAsShortString()
+ {
+ String message = getMessage();
+ if (message != null && message.length() > AMQShortString.MAX_LENGTH)
+ {
+ message = message.substring(0, AMQShortString.MAX_LENGTH - 3) + "...";
+ }
+ return new AMQShortString(message);
+ }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/AMQInvalidArgumentException.java Thu Oct 20 18:42:46 2011
@@ -34,7 +34,7 @@ public class AMQInvalidArgumentException
{
public AMQInvalidArgumentException(String message, Throwable cause)
{
- super(AMQConstant.INVALID_ARGUMENT, message, cause);
+ super(AMQConstant.ARGUMENT_INVALID, message, cause);
}
public boolean isHardError()
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQCodecFactory.java Thu Oct 20 18:42:46 2011
@@ -20,9 +20,6 @@
*/
package org.apache.qpid.codec;
-import org.apache.mina.filter.codec.ProtocolCodecFactory;
-import org.apache.mina.filter.codec.ProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolEncoder;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
/**
@@ -31,14 +28,11 @@ import org.apache.qpid.protocol.AMQVersi
*
* <p/><table id="crc"><caption>CRC Card</caption>
* <tr><th> Responsibilities <th> Collaborations.
- * <tr><td> Supply the protocol encoder. <td> {@link AMQEncoder}
* <tr><td> Supply the protocol decoder. <td> {@link AMQDecoder}
* </table>
*/
-public class AMQCodecFactory implements ProtocolCodecFactory
+public class AMQCodecFactory
{
- /** Holds the protocol encoder. */
- private final AMQEncoder _encoder = new AMQEncoder();
/** Holds the protocol decoder. */
private final AMQDecoder _frameDecoder;
@@ -56,15 +50,6 @@ public class AMQCodecFactory implements
_frameDecoder = new AMQDecoder(expectProtocolInitiation, session);
}
- /**
- * Gets the AMQP encoder.
- *
- * @return The AMQP encoder.
- */
- public ProtocolEncoder getEncoder()
- {
- return _encoder;
- }
/**
* Gets the AMQP decoder.
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/codec/AMQDecoder.java Thu Oct 20 18:42:46 2011
@@ -20,13 +20,9 @@
*/
package org.apache.qpid.codec;
-import java.util.ArrayList;
-
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.SimpleByteBufferAllocator;
-import org.apache.mina.filter.codec.CumulativeProtocolDecoder;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
+import java.io.*;
+import java.nio.ByteBuffer;
+import java.util.*;
import org.apache.qpid.framing.AMQDataBlock;
import org.apache.qpid.framing.AMQDataBlockDecoder;
@@ -54,11 +50,8 @@ import org.apache.qpid.protocol.AMQVersi
* @todo If protocol initiation decoder not needed, then don't create it. Probably not a big deal, but it adds to the
* per-session overhead.
*/
-public class AMQDecoder extends CumulativeProtocolDecoder
+public class AMQDecoder
{
-
- private static final String BUFFER = AMQDecoder.class.getName() + ".Buffer";
-
/** Holds the 'normal' AMQP data decoder. */
private AMQDataBlockDecoder _dataBlockDecoder = new AMQDataBlockDecoder();
@@ -67,12 +60,11 @@ public class AMQDecoder extends Cumulati
/** Flag to indicate whether this decoder needs to handle protocol initiation. */
private boolean _expectProtocolInitiation;
- private boolean firstDecode = true;
private AMQMethodBodyFactory _bodyFactory;
- private ByteBuffer _remainingBuf;
-
+ private List<ByteArrayInputStream> _remainingBufs = new ArrayList<ByteArrayInputStream>();
+
/**
* Creates a new AMQP decoder.
*
@@ -84,98 +76,7 @@ public class AMQDecoder extends Cumulati
_bodyFactory = new AMQMethodBodyFactory(session);
}
- /**
- * Delegates decoding AMQP from the data buffer that Mina has retrieved from the wire, to the data or protocol
- * intiation decoders.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- protected boolean doDecode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
-
- boolean decoded;
- if (_expectProtocolInitiation
- || (firstDecode
- && (in.remaining() > 0)
- && (in.get(in.position()) == (byte)'A')))
- {
- decoded = doDecodePI(session, in, out);
- }
- else
- {
- decoded = doDecodeDataBlock(session, in, out);
- }
- if(firstDecode && decoded)
- {
- firstDecode = false;
- }
- return decoded;
- }
-
- /**
- * Decodes AMQP data, delegating the decoding to an {@link AMQDataBlockDecoder}.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- protected boolean doDecodeDataBlock(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
- int pos = in.position();
- boolean enoughData = _dataBlockDecoder.decodable(in.buf());
- in.position(pos);
- if (!enoughData)
- {
- // returning false means it will leave the contents in the buffer and
- // call us again when more data has been read
- return false;
- }
- else
- {
- _dataBlockDecoder.decode(session, in, out);
-
- return true;
- }
- }
-
- /**
- * Decodes an AMQP initiation, delegating the decoding to a {@link ProtocolInitiation.Decoder}.
- *
- * @param session The Mina session.
- * @param in The raw byte buffer.
- * @param out The Mina object output gatherer to write decoded objects to.
- *
- * @return <tt>true</tt> if the data was decoded, <tt>false<tt> if more is needed and the data should accumulate.
- *
- * @throws Exception If the data cannot be decoded for any reason.
- */
- private boolean doDecodePI(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
- boolean enoughData = _piDecoder.decodable(in.buf());
- if (!enoughData)
- {
- // returning false means it will leave the contents in the buffer and
- // call us again when more data has been read
- return false;
- }
- else
- {
- ProtocolInitiation pi = new ProtocolInitiation(in.buf());
- out.write(pi);
- return true;
- }
- }
/**
* Sets the protocol initation flag, that determines whether decoding is handled by the data decoder of the protocol
@@ -189,151 +90,168 @@ public class AMQDecoder extends Cumulati
_expectProtocolInitiation = expectProtocolInitiation;
}
-
- /**
- * Cumulates content of <tt>in</tt> into internal buffer and forwards
- * decoding request to {@link #doDecode(IoSession, ByteBuffer, ProtocolDecoderOutput)}.
- * <tt>doDecode()</tt> is invoked repeatedly until it returns <tt>false</tt>
- * and the cumulative buffer is compacted after decoding ends.
- *
- * @throws IllegalStateException if your <tt>doDecode()</tt> returned
- * <tt>true</tt> not consuming the cumulative buffer.
- */
- public void decode( IoSession session, ByteBuffer in,
- ProtocolDecoderOutput out ) throws Exception
+ private class RemainingByteArrayInputStream extends InputStream
{
- ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
- // if we have a session buffer, append data to that otherwise
- // use the buffer read from the network directly
- if( buf != null )
- {
- buf.put( in );
- buf.flip();
- }
- else
+ private int _currentListPos;
+ private int _markPos;
+
+
+ @Override
+ public int read() throws IOException
{
- buf = in;
+ ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+ if(currentStream.available() > 0)
+ {
+ return currentStream.read();
+ }
+ else if((_currentListPos == _remainingBufs.size())
+ || (++_currentListPos == _remainingBufs.size()))
+ {
+ return -1;
+ }
+ else
+ {
+
+ ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+ stream.mark(0);
+ return stream.read();
+ }
}
- for( ;; )
+ @Override
+ public int read(final byte[] b, final int off, final int len) throws IOException
{
- int oldPos = buf.position();
- boolean decoded = doDecode( session, buf, out );
- if( decoded )
+
+ if(_currentListPos == _remainingBufs.size())
{
- if( buf.position() == oldPos )
+ return -1;
+ }
+ else
+ {
+ ByteArrayInputStream currentStream = _remainingBufs.get(_currentListPos);
+ final int available = currentStream.available();
+ int read = currentStream.read(b, off, len > available ? available : len);
+ if(read < len)
{
- throw new IllegalStateException(
- "doDecode() can't return true when buffer is not consumed." );
+ if(_currentListPos++ != _remainingBufs.size())
+ {
+ _remainingBufs.get(_currentListPos).mark(0);
+ }
+ int correctRead = read == -1 ? 0 : read;
+ int subRead = read(b, off+correctRead, len-correctRead);
+ if(subRead == -1)
+ {
+ return read;
+ }
+ else
+ {
+ return correctRead+subRead;
+ }
}
-
- if( !buf.hasRemaining() )
+ else
{
- break;
+ return len;
}
}
- else
- {
- break;
- }
}
- // if there is any data left that cannot be decoded, we store
- // it in a buffer in the session and next time this decoder is
- // invoked the session buffer gets appended to
- if ( buf.hasRemaining() )
+ @Override
+ public int available() throws IOException
{
- storeRemainingInSession( buf, session );
+ int total = 0;
+ for(int i = _currentListPos; i < _remainingBufs.size(); i++)
+ {
+ total += _remainingBufs.get(i).available();
+ }
+ return total;
}
- else
+
+ @Override
+ public void mark(final int readlimit)
{
- removeSessionBuffer( session );
+ _markPos = _currentListPos;
+ final ByteArrayInputStream stream = _remainingBufs.get(_currentListPos);
+ if(stream != null)
+ {
+ stream.mark(readlimit);
+ }
}
- }
-
- /**
- * Releases the cumulative buffer used by the specified <tt>session</tt>.
- * Please don't forget to call <tt>super.dispose( session )</tt> when
- * you override this method.
- */
- public void dispose( IoSession session ) throws Exception
- {
- removeSessionBuffer( session );
- }
- private void removeSessionBuffer(IoSession session)
- {
- ByteBuffer buf = ( ByteBuffer ) session.getAttribute( BUFFER );
- if( buf != null )
+ @Override
+ public void reset() throws IOException
{
- buf.release();
- session.removeAttribute( BUFFER );
+ _currentListPos = _markPos;
+ final int size = _remainingBufs.size();
+ if(_currentListPos < size)
+ {
+ _remainingBufs.get(_currentListPos).reset();
+ }
+ for(int i = _currentListPos+1; i<size; i++)
+ {
+ _remainingBufs.get(i).reset();
+ }
}
}
- private static final SimpleByteBufferAllocator SIMPLE_BYTE_BUFFER_ALLOCATOR = new SimpleByteBufferAllocator();
-
- private void storeRemainingInSession(ByteBuffer buf, IoSession session)
- {
- ByteBuffer remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate( buf.remaining(), false );
- remainingBuf.setAutoExpand( true );
- remainingBuf.put( buf );
- session.setAttribute( BUFFER, remainingBuf );
- }
- public ArrayList<AMQDataBlock> decodeBuffer(java.nio.ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException
+ public ArrayList<AMQDataBlock> decodeBuffer(ByteBuffer buf) throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
// get prior remaining data from accumulator
ArrayList<AMQDataBlock> dataBlocks = new ArrayList<AMQDataBlock>();
- ByteBuffer msg;
- // if we have a session buffer, append data to that otherwise
- // use the buffer read from the network directly
- if( _remainingBuf != null )
+ DataInputStream msg;
+
+
+ ByteArrayInputStream bais = new ByteArrayInputStream(buf.array(),buf.arrayOffset()+buf.position(), buf.remaining());
+ if(!_remainingBufs.isEmpty())
{
- _remainingBuf.put(buf);
- _remainingBuf.flip();
- msg = _remainingBuf;
+ _remainingBufs.add(bais);
+ msg = new DataInputStream(new RemainingByteArrayInputStream());
}
else
{
- msg = ByteBuffer.wrap(buf);
+ msg = new DataInputStream(bais);
}
-
- if (_expectProtocolInitiation
- || (firstDecode
- && (msg.remaining() > 0)
- && (msg.get(msg.position()) == (byte)'A')))
- {
- if (_piDecoder.decodable(msg.buf()))
- {
- dataBlocks.add(new ProtocolInitiation(msg.buf()));
- }
- }
- else
+
+ boolean enoughData = true;
+ while (enoughData)
{
- boolean enoughData = true;
- while (enoughData)
+ if(!_expectProtocolInitiation)
{
- int pos = msg.position();
-
enoughData = _dataBlockDecoder.decodable(msg);
- msg.position(pos);
if (enoughData)
{
dataBlocks.add(_dataBlockDecoder.createAndPopulateFrame(_bodyFactory, msg));
}
- else
+ }
+ else
+ {
+ enoughData = _piDecoder.decodable(msg);
+ if (enoughData)
{
- _remainingBuf = SIMPLE_BYTE_BUFFER_ALLOCATOR.allocate(msg.remaining(), false);
- _remainingBuf.setAutoExpand(true);
- _remainingBuf.put(msg);
+ dataBlocks.add(new ProtocolInitiation(msg));
+ }
+
+ }
+
+ if(!enoughData)
+ {
+ if(!_remainingBufs.isEmpty())
+ {
+ _remainingBufs.remove(_remainingBufs.size()-1);
+ ListIterator<ByteArrayInputStream> iterator = _remainingBufs.listIterator();
+ while(iterator.hasNext() && iterator.next().available() == 0)
+ {
+ iterator.remove();
+ }
+ }
+ if(bais.available()!=0)
+ {
+ byte[] remaining = new byte[bais.available()];
+ bais.read(remaining);
+ _remainingBufs.add(new ByteArrayInputStream(remaining));
}
}
- }
- if(firstDecode && dataBlocks.size() > 0)
- {
- firstDecode = false;
}
return dataBlocks;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/configuration/ClientProperties.java Thu Oct 20 18:42:46 2011
@@ -23,7 +23,7 @@ package org.apache.qpid.configuration;
*/
public class ClientProperties
{
-
+
/**
* Currently with Qpid it is not possible to change the client ID.
* If one is not specified upon connection construction, an id is generated automatically.
@@ -68,67 +68,50 @@ public class ClientProperties
* by the broker in TuneOK it will be used as the heartbeat interval.
* If not a warning will be printed and the max value specified for
* heartbeat in TuneOK will be used
- *
+ *
* The default idle timeout is set to 120 secs
*/
public static final String IDLE_TIMEOUT_PROP_NAME = "idle_timeout";
public static final long DEFAULT_IDLE_TIMEOUT = 120000;
-
+
public static final String HEARTBEAT = "qpid.heartbeat";
public static final int HEARTBEAT_DEFAULT = 120;
-
+
/**
* This value will be used to determine the default destination syntax type.
* Currently the two types are Binding URL (java only) and the Addressing format (used by
- * all clients).
+ * all clients).
*/
public static final String DEST_SYNTAX = "qpid.dest_syntax";
-
+
public static final String USE_LEGACY_MAP_MESSAGE_FORMAT = "qpid.use_legacy_map_message";
- /**
- * ==========================================================
- * Those properties are used when the io size should be bounded
- * ==========================================================
- */
+ public static final String AMQP_VERSION = "qpid.amqp.version";
- /**
- * When set to true the io layer throttle down producers and consumers
- * when written or read messages are accumulating and exceeding a certain size.
- * This is especially useful when a the producer rate is greater than the network
- * speed.
- * type: boolean
- */
- public static final String PROTECTIO_PROP_NAME = "protectio";
+ public static final String QPID_VERIFY_CLIENT_ID = "qpid.verify_client_id";
- //=== The following properties are only used when the previous one is true.
/**
- * Max size of read messages that can be stored within the MINA layer
- * type: int
+ * System properties to change the default timeout used during
+ * synchronous operations.
*/
- public static final String READ_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String READ_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final String QPID_SYNC_OP_TIMEOUT = "qpid.sync_op_timeout";
+ public static final String AMQJ_DEFAULT_SYNCWRITE_TIMEOUT = "amqj.default_syncwrite_timeout";
+
/**
- * Max size of written messages that can be stored within the MINA layer
- * type: int
+ * A default timeout value for synchronous operations
*/
- public static final String WRITE_BUFFER_LIMIT_PROP_NAME = "qpid.read.buffer.limit";
- public static final String WRITE_BUFFER_LIMIT_DEFAULT = "262144";
+ public static final int DEFAULT_SYNC_OPERATION_TIMEOUT = 60000;
- public static final String AMQP_VERSION = "qpid.amqp.version";
-
- private static ClientProperties _instance = new ClientProperties();
-
/*
- public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
+ public static final QpidProperty<Boolean> IGNORE_SET_CLIENTID_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.ignore_set_client_id","ignore_setclientID");
-
+
public static final QpidProperty<Boolean> SYNC_PERSISTENT_PROP_NAME =
QpidProperty.booleanProperty(false,"qpid.sync_persistence","sync_persistence");
-
-
+
+
public static final QpidProperty<Integer> MAX_PREFETCH_PROP_NAME =
QpidProperty.intProperty(500,"qpid.max_prefetch","max_prefetch"); */
-
-
+
+
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQBody.java Thu Oct 20 18:42:46 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.apache.qpid.AMQException;
@@ -34,7 +36,7 @@ public interface AMQBody
*/
public abstract int getSize();
- public void writePayload(ByteBuffer buffer);
+ public void writePayload(DataOutputStream buffer) throws IOException;
- void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession) throws AMQException;
+ void handle(final int channelId, final AMQVersionAwareProtocolSession amqProtocolSession) throws AMQException;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlock.java Thu Oct 20 18:42:46 2011
@@ -20,7 +20,10 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
/**
* A data block represents something that has a size in bytes and the ability to write itself to a byte
@@ -39,25 +42,6 @@ public abstract class AMQDataBlock imple
* Writes the datablock to the specified buffer.
* @param buffer
*/
- public abstract void writePayload(ByteBuffer buffer);
-
- public ByteBuffer toByteBuffer()
- {
- final ByteBuffer buffer = ByteBuffer.allocate((int)getSize());
-
- writePayload(buffer);
- buffer.flip();
- return buffer;
- }
-
- public java.nio.ByteBuffer toNioByteBuffer()
- {
- final java.nio.ByteBuffer buffer = java.nio.ByteBuffer.allocate((int) getSize());
-
- ByteBuffer buf = ByteBuffer.wrap(buffer);
- writePayload(buf);
- buffer.flip();
- return buffer;
- }
+ public abstract void writePayload(DataOutputStream buffer) throws IOException;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQDataBlockDecoder.java Thu Oct 20 18:42:46 2011
@@ -20,18 +20,14 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.filter.codec.ProtocolDecoderOutput;
-
-import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.IOException;
+
public class AMQDataBlockDecoder
{
- private static final String SESSION_METHOD_BODY_FACTORY = "QPID_SESSION_METHOD_BODY_FACTORY";
private static final BodyFactory[] _bodiesSupported = new BodyFactory[Byte.MAX_VALUE];
@@ -47,27 +43,32 @@ public class AMQDataBlockDecoder
public AMQDataBlockDecoder()
{ }
- public boolean decodable(java.nio.ByteBuffer in) throws AMQFrameDecodingException
+ public boolean decodable(DataInputStream in) throws AMQFrameDecodingException, IOException
{
- final int remainingAfterAttributes = in.remaining() - (1 + 2 + 4 + 1);
+ final int remainingAfterAttributes = in.available() - (1 + 2 + 4 + 1);
// type, channel, body length and end byte
if (remainingAfterAttributes < 0)
{
return false;
}
- in.position(in.position() + 1 + 2);
+ in.mark(8);
+ in.skip(1 + 2);
+
+
// Get an unsigned int, lifted from MINA ByteBuffer getUnsignedInt()
- final long bodySize = in.getInt() & 0xffffffffL;
+ final long bodySize = in.readInt() & 0xffffffffL;
+
+ in.reset();
return (remainingAfterAttributes >= bodySize);
}
- public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, ByteBuffer in)
- throws AMQFrameDecodingException, AMQProtocolVersionException
+ public AMQFrame createAndPopulateFrame(AMQMethodBodyFactory methodBodyFactory, DataInputStream in)
+ throws AMQFrameDecodingException, AMQProtocolVersionException, IOException
{
- final byte type = in.get();
+ final byte type = in.readByte();
BodyFactory bodyFactory;
if (type == AMQMethodBody.TYPE)
@@ -84,8 +85,8 @@ public class AMQDataBlockDecoder
throw new AMQFrameDecodingException(null, "Unsupported frame type: " + type, null);
}
- final int channel = in.getUnsignedShort();
- final long bodySize = in.getUnsignedInt();
+ final int channel = in.readUnsignedShort();
+ final long bodySize = EncodingUtils.readUnsignedInteger(in);
// bodySize can be zero
if ((channel < 0) || (bodySize < 0))
@@ -96,7 +97,7 @@ public class AMQDataBlockDecoder
AMQFrame frame = new AMQFrame(in, channel, bodySize, bodyFactory);
- byte marker = in.get();
+ byte marker = in.readByte();
if ((marker & 0xFF) != 0xCE)
{
throw new AMQFrameDecodingException(null, "End of frame marker not found. Read " + marker + " length=" + bodySize
@@ -106,26 +107,4 @@ public class AMQDataBlockDecoder
return frame;
}
- public void decode(IoSession session, ByteBuffer in, ProtocolDecoderOutput out) throws Exception
- {
- AMQMethodBodyFactory bodyFactory = (AMQMethodBodyFactory) session.getAttribute(SESSION_METHOD_BODY_FACTORY);
- if (bodyFactory == null)
- {
- AMQVersionAwareProtocolSession protocolSession = (AMQVersionAwareProtocolSession) session.getAttachment();
- bodyFactory = new AMQMethodBodyFactory(protocolSession);
- session.setAttribute(SESSION_METHOD_BODY_FACTORY, bodyFactory);
- }
-
- out.write(createAndPopulateFrame(bodyFactory, in));
- }
-
- public boolean decodable(ByteBuffer msg) throws AMQFrameDecodingException
- {
- return decodable(msg.buf());
- }
-
- public AMQDataBlock createAndPopulateFrame(AMQMethodBodyFactory factory, java.nio.ByteBuffer msg) throws AMQProtocolVersionException, AMQFrameDecodingException
- {
- return createAndPopulateFrame(factory, ByteBuffer.wrap(msg));
- }
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQFrame.java Thu Oct 20 18:42:46 2011
@@ -20,7 +20,9 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
public class AMQFrame extends AMQDataBlock implements EncodableAMQDataBlock
{
@@ -36,7 +38,7 @@ public class AMQFrame extends AMQDataBlo
_bodyFrame = bodyFrame;
}
- public AMQFrame(final ByteBuffer in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException
+ public AMQFrame(final DataInputStream in, final int channel, final long bodySize, final BodyFactory bodyFactory) throws AMQFrameDecodingException, IOException
{
this._channel = channel;
this._bodyFrame = bodyFactory.createBody(in,bodySize);
@@ -53,13 +55,13 @@ public class AMQFrame extends AMQDataBlo
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
- buffer.put(_bodyFrame.getFrameType());
+ buffer.writeByte(_bodyFrame.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, _channel);
EncodingUtils.writeUnsignedInteger(buffer, _bodyFrame.getSize());
_bodyFrame.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
public final int getChannel()
@@ -77,48 +79,48 @@ public class AMQFrame extends AMQDataBlo
return "Frame channelId: " + _channel + ", bodyFrame: " + String.valueOf(_bodyFrame);
}
- public static void writeFrame(ByteBuffer buffer, final int channel, AMQBody body)
+ public static void writeFrame(DataOutputStream buffer, final int channel, AMQBody body) throws IOException
{
- buffer.put(body.getFrameType());
+ buffer.writeByte(body.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body.getSize());
body.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
- public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2)
+ public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2) throws IOException
{
- buffer.put(body1.getFrameType());
+ buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
body1.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
- buffer.put(body2.getFrameType());
+ buffer.writeByte(FRAME_END_BYTE);
+ buffer.writeByte(body2.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
body2.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
- public static void writeFrames(ByteBuffer buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3)
+ public static void writeFrames(DataOutputStream buffer, final int channel, AMQBody body1, AMQBody body2, AMQBody body3) throws IOException
{
- buffer.put(body1.getFrameType());
+ buffer.writeByte(body1.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body1.getSize());
body1.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
- buffer.put(body2.getFrameType());
+ buffer.writeByte(FRAME_END_BYTE);
+ buffer.writeByte(body2.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body2.getSize());
body2.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
- buffer.put(body3.getFrameType());
+ buffer.writeByte(FRAME_END_BYTE);
+ buffer.writeByte(body3.getFrameType());
EncodingUtils.writeUnsignedShort(buffer, channel);
EncodingUtils.writeUnsignedInteger(buffer, body3.getSize());
body3.writePayload(buffer);
- buffer.put(FRAME_END_BYTE);
+ buffer.writeByte(FRAME_END_BYTE);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBody.java Thu Oct 20 18:42:46 2011
@@ -20,12 +20,14 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public interface AMQMethodBody extends AMQBody
{
public static final byte TYPE = 1;
@@ -43,12 +45,12 @@ public interface AMQMethodBody extends A
/** @return unsigned short */
public int getMethod();
- public void writeMethodPayload(ByteBuffer buffer);
+ public void writeMethodPayload(DataOutputStream buffer) throws IOException;
public int getSize();
- public void writePayload(ByteBuffer buffer);
+ public void writePayload(DataOutputStream buffer) throws IOException;
//public abstract void populateMethodBodyFromBuffer(ByteBuffer buffer) throws AMQFrameDecodingException;
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyFactory.java Thu Oct 20 18:42:46 2011
@@ -20,13 +20,14 @@
*/
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.IOException;
+
public class AMQMethodBodyFactory implements BodyFactory
{
private static final Logger _log = LoggerFactory.getLogger(AMQMethodBodyFactory.class);
@@ -38,7 +39,7 @@ public class AMQMethodBodyFactory implem
_protocolSession = protocolSession;
}
- public AMQBody createBody(ByteBuffer in, long bodySize) throws AMQFrameDecodingException
+ public AMQBody createBody(DataInputStream in, long bodySize) throws AMQFrameDecodingException, IOException
{
return _protocolSession.getMethodRegistry().convertToBody(in, bodySize);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Thu Oct 20 18:42:46 2011
@@ -21,13 +21,16 @@ package org.apache.qpid.framing;
*
*/
-import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQChannelException;
import org.apache.qpid.AMQConnectionException;
import org.apache.qpid.AMQException;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
+
public abstract class AMQMethodBodyImpl implements AMQMethodBody
{
public static final byte TYPE = 1;
@@ -98,7 +101,7 @@ public abstract class AMQMethodBodyImpl
return 2 + 2 + getBodySize();
}
- public void writePayload(ByteBuffer buffer)
+ public void writePayload(DataOutputStream buffer) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, getClazz());
EncodingUtils.writeUnsignedShort(buffer, getMethod());
@@ -106,12 +109,12 @@ public abstract class AMQMethodBodyImpl
}
- protected byte readByte(ByteBuffer buffer)
+ protected byte readByte(DataInputStream buffer) throws IOException
{
- return buffer.get();
+ return buffer.readByte();
}
- protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ protected AMQShortString readAMQShortString(DataInputStream buffer) throws IOException
{
return EncodingUtils.readAMQShortString(buffer);
}
@@ -121,27 +124,27 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedShortStringLength(string);
}
- protected void writeByte(ByteBuffer buffer, byte b)
+ protected void writeByte(DataOutputStream buffer, byte b) throws IOException
{
- buffer.put(b);
+ buffer.writeByte(b);
}
- protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ protected void writeAMQShortString(DataOutputStream buffer, AMQShortString string) throws IOException
{
EncodingUtils.writeShortStringBytes(buffer, string);
}
- protected int readInt(ByteBuffer buffer)
+ protected int readInt(DataInputStream buffer) throws IOException
{
- return buffer.getInt();
+ return buffer.readInt();
}
- protected void writeInt(ByteBuffer buffer, int i)
+ protected void writeInt(DataOutputStream buffer, int i) throws IOException
{
- buffer.putInt(i);
+ buffer.writeInt(i);
}
- protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ protected FieldTable readFieldTable(DataInputStream buffer) throws AMQFrameDecodingException, IOException
{
return EncodingUtils.readFieldTable(buffer);
}
@@ -151,19 +154,19 @@ public abstract class AMQMethodBodyImpl
return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
}
- protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ protected void writeFieldTable(DataOutputStream buffer, FieldTable table) throws IOException
{
EncodingUtils.writeFieldTableBytes(buffer, table);
}
- protected long readLong(ByteBuffer buffer)
+ protected long readLong(DataInputStream buffer) throws IOException
{
- return buffer.getLong();
+ return buffer.readLong();
}
- protected void writeLong(ByteBuffer buffer, long l)
+ protected void writeLong(DataOutputStream buffer, long l) throws IOException
{
- buffer.putLong(l);
+ buffer.writeLong(l);
}
protected int getSizeOf(byte[] response)
@@ -171,87 +174,86 @@ public abstract class AMQMethodBodyImpl
return (response == null) ? 4 : response.length + 4;
}
- protected void writeBytes(ByteBuffer buffer, byte[] data)
+ protected void writeBytes(DataOutputStream buffer, byte[] data) throws IOException
{
EncodingUtils.writeBytes(buffer,data);
}
- protected byte[] readBytes(ByteBuffer buffer)
+ protected byte[] readBytes(DataInputStream buffer) throws IOException
{
return EncodingUtils.readBytes(buffer);
}
- protected short readShort(ByteBuffer buffer)
+ protected short readShort(DataInputStream buffer) throws IOException
{
return EncodingUtils.readShort(buffer);
}
- protected void writeShort(ByteBuffer buffer, short s)
+ protected void writeShort(DataOutputStream buffer, short s) throws IOException
{
EncodingUtils.writeShort(buffer, s);
}
- protected Content readContent(ByteBuffer buffer)
+ protected Content readContent(DataInputStream buffer)
{
- return null; //To change body of created methods use File | Settings | File Templates.
+ return null;
}
protected int getSizeOf(Content body)
{
- return 0; //To change body of created methods use File | Settings | File Templates.
+ return 0;
}
- protected void writeContent(ByteBuffer buffer, Content body)
+ protected void writeContent(DataOutputStream buffer, Content body)
{
- //To change body of created methods use File | Settings | File Templates.
}
- protected byte readBitfield(ByteBuffer buffer)
+ protected byte readBitfield(DataInputStream buffer) throws IOException
{
- return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
+ return readByte(buffer);
}
- protected int readUnsignedShort(ByteBuffer buffer)
+ protected int readUnsignedShort(DataInputStream buffer) throws IOException
{
- return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
+ return buffer.readUnsignedShort();
}
- protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ protected void writeBitfield(DataOutputStream buffer, byte bitfield0) throws IOException
{
- buffer.put(bitfield0);
+ buffer.writeByte(bitfield0);
}
- protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ protected void writeUnsignedShort(DataOutputStream buffer, int s) throws IOException
{
EncodingUtils.writeUnsignedShort(buffer, s);
}
- protected long readUnsignedInteger(ByteBuffer buffer)
+ protected long readUnsignedInteger(DataInputStream buffer) throws IOException
{
- return buffer.getUnsignedInt();
+ return EncodingUtils.readUnsignedInteger(buffer);
}
- protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ protected void writeUnsignedInteger(DataOutputStream buffer, long i) throws IOException
{
EncodingUtils.writeUnsignedInteger(buffer, i);
}
- protected short readUnsignedByte(ByteBuffer buffer)
+ protected short readUnsignedByte(DataInputStream buffer) throws IOException
{
- return buffer.getUnsigned();
+ return (short) buffer.readUnsignedByte();
}
- protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ protected void writeUnsignedByte(DataOutputStream buffer, short unsignedByte) throws IOException
{
EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
}
- protected long readTimestamp(ByteBuffer buffer)
+ protected long readTimestamp(DataInputStream buffer) throws IOException
{
return EncodingUtils.readTimestamp(buffer);
}
- protected void writeTimestamp(ByteBuffer buffer, long t)
+ protected void writeTimestamp(DataOutputStream buffer, long t) throws IOException
{
EncodingUtils.writeTimestamp(buffer, t);
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyInstanceFactory.java Thu Oct 20 18:42:46 2011
@@ -21,10 +21,11 @@
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
+import java.io.DataInputStream;
+import java.io.IOException;
public abstract interface AMQMethodBodyInstanceFactory
{
- public AMQMethodBody newInstance(ByteBuffer buffer, long size) throws AMQFrameDecodingException;
+ public AMQMethodBody newInstance(DataInputStream buffer, long size) throws AMQFrameDecodingException, IOException;
}
Modified: qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java
URL: http://svn.apache.org/viewvc/qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java?rev=1186990&r1=1186989&r2=1186990&view=diff
==============================================================================
--- qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java (original)
+++ qpid/branches/QPID-2519/java/common/src/main/java/org/apache/qpid/framing/AMQShortString.java Thu Oct 20 18:42:46 2011
@@ -21,11 +21,12 @@
package org.apache.qpid.framing;
-import org.apache.mina.common.ByteBuffer;
-
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import java.io.DataInputStream;
+import java.io.DataOutputStream;
+import java.io.IOException;
import java.util.*;
import java.lang.ref.WeakReference;
@@ -37,6 +38,10 @@ import java.lang.ref.WeakReference;
*/
public final class AMQShortString implements CharSequence, Comparable<AMQShortString>
{
+ /**
+ * The maximum number of octets in AMQ short string as defined in AMQP specification
+ */
+ public static final int MAX_LENGTH = 255;
private static final byte MINUS = (byte)'-';
private static final byte ZERO = (byte) '0';
@@ -118,22 +123,19 @@ public final class AMQShortString implem
public AMQShortString(byte[] data)
{
-
+ if (data == null)
+ {
+ throw new NullPointerException("Cannot create AMQShortString with null data[]");
+ }
+ if (data.length > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+ }
_data = data.clone();
_length = data.length;
_offset = 0;
}
- public AMQShortString(byte[] data, int pos)
- {
- final int size = data[pos++];
- final byte[] dataCopy = new byte[size];
- System.arraycopy(data,pos,dataCopy,0,size);
- _length = size;
- _data = dataCopy;
- _offset = 0;
- }
-
public AMQShortString(String data)
{
this((data == null) ? EMPTY_CHAR_ARRAY : data.toCharArray());
@@ -146,7 +148,12 @@ public final class AMQShortString implem
{
throw new NullPointerException("Cannot create AMQShortString with null char[]");
}
-
+ // the current implementation of 0.8/0.9.x short string encoding
+ // supports only ASCII characters
+ if (data.length> MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+ }
final int length = data.length;
final byte[] stringBytes = new byte[length];
int hash = 0;
@@ -165,6 +172,17 @@ public final class AMQShortString implem
public AMQShortString(CharSequence charSequence)
{
+ if (charSequence == null)
+ {
+ // it should be possible to create short string for null data
+ charSequence = "";
+ }
+ // the current implementation of 0.8/0.9.x short string encoding
+ // supports only ASCII characters
+ if (charSequence.length() > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+ }
final int length = charSequence.length();
final byte[] stringBytes = new byte[length];
int hash = 0;
@@ -182,31 +200,33 @@ public final class AMQShortString implem
}
- private AMQShortString(ByteBuffer data, final int length)
+ private AMQShortString(DataInputStream data, final int length) throws IOException
{
- if(data.isDirect() || data.isReadOnly())
- {
- byte[] dataBytes = new byte[length];
- data.get(dataBytes);
- _data = dataBytes;
- _offset = 0;
- }
- else
+ if (length > MAX_LENGTH)
{
-
- _data = data.array();
- _offset = data.arrayOffset() + data.position();
- data.skip(length);
-
+ throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
}
+ byte[] dataBytes = new byte[length];
+ data.read(dataBytes);
+ _data = dataBytes;
+ _offset = 0;
_length = length;
}
private AMQShortString(final byte[] data, final int from, final int to)
{
+ if (data == null)
+ {
+ throw new NullPointerException("Cannot create AMQShortString with null data[]");
+ }
+ int length = to - from;
+ if (length > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+ }
_offset = from;
- _length = to - from;
+ _length = length;
_data = data;
}
@@ -245,32 +265,9 @@ public final class AMQShortString implem
return new CharSubSequence(start, end);
}
- public int writeToByteArray(byte[] encoding, int pos)
- {
- final int size = length();
- encoding[pos++] = (byte) size;
- System.arraycopy(_data,_offset,encoding,pos,size);
- return pos+size;
- }
-
- public static AMQShortString readFromByteArray(byte[] byteEncodedDestination, int pos)
- {
-
-
- final AMQShortString shortString = new AMQShortString(byteEncodedDestination, pos);
- if(shortString.length() == 0)
- {
- return null;
- }
- else
- {
- return shortString;
- }
- }
-
- public static AMQShortString readFromBuffer(ByteBuffer buffer)
+ public static AMQShortString readFromBuffer(DataInputStream buffer) throws IOException
{
- final short length = buffer.getUnsigned();
+ final int length = buffer.readUnsignedByte();
if (length == 0)
{
return null;
@@ -296,13 +293,13 @@ public final class AMQShortString implem
}
}
- public void writeToBuffer(ByteBuffer buffer)
+ public void writeToBuffer(DataOutputStream buffer) throws IOException
{
final int size = length();
//buffer.setAutoExpand(true);
- buffer.put((byte) size);
- buffer.put(_data, _offset, size);
+ buffer.write((byte) size);
+ buffer.write(_data, _offset, size);
}
@@ -690,6 +687,10 @@ public final class AMQShortString implem
size += term.length();
}
+ if (size > MAX_LENGTH)
+ {
+ throw new IllegalArgumentException("Cannot create AMQShortString with number of octets over 255!");
+ }
byte[] data = new byte[size];
int pos = 0;
final byte[] delimData = delim._data;
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org