You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2009/10/26 21:37:38 UTC
svn commit: r829944 [1/2] - in /qpid/trunk/qpid:
java/broker/src/main/java/org/apache/qpid/server/handler/
java/broker/src/main/java/org/apache/qpid/server/output/
java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/
java/client/src/main/...
Author: rgodfrey
Date: Mon Oct 26 20:37:36 2009
New Revision: 829944
URL: http://svn.apache.org/viewvc?rev=829944&view=rev
Log:
Added AMQP 0-9-1 support
Added:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java
- copied, changed from r829757, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java (contents, props changed)
- copied, changed from r829757, qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java
- copied, changed from r829757, qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java
- copied, changed from r829757, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java
- copied, changed from r829757, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java
qpid/trunk/qpid/specs/amqp0-9-1.stripped.xml
Modified:
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
qpid/trunk/qpid/java/common/build.xml
qpid/trunk/qpid/java/common/protocol-version.xml
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/AccessRequestHandler.java Mon Oct 26 20:37:36 2009
@@ -1,31 +1,34 @@
package org.apache.qpid.server.handler;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.server.state.StateAwareMethodListener;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
/**
* @author Apache Software Foundation
@@ -54,7 +57,20 @@
// We don't implement access control class, but to keep clients happy that expect it
// always use the "0" ticket.
- AccessRequestOkBody response = methodRegistry.createAccessRequestOkBody(0);
+ AccessRequestOkBody response;
+ if(methodRegistry instanceof MethodRegistry_0_9)
+ {
+ response = ((MethodRegistry_0_9)methodRegistry).createAccessRequestOkBody(0);
+ }
+ else if(methodRegistry instanceof MethodRegistry_8_0)
+ {
+ response = ((MethodRegistry_8_0)methodRegistry).createAccessRequestOkBody(0);
+ }
+ else
+ {
+ throw new AMQException(AMQConstant.COMMAND_INVALID, "AccessRequest not present in AMQP versions other than 0-8, 0-9");
+ }
+
session.writeFrame(response.generateFrame(channelId));
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/BasicRecoverSyncMethodHandler.java Mon Oct 26 20:37:36 2009
@@ -1,25 +1,25 @@
package org.apache.qpid.server.handler;
-/*
- *
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- *
- */
-
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
import org.apache.log4j.Logger;
@@ -27,6 +27,7 @@
import org.apache.qpid.framing.ProtocolVersion;
import org.apache.qpid.framing.AMQMethodBody;
import org.apache.qpid.framing.BasicRecoverSyncBody;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.server.state.StateAwareMethodListener;
@@ -70,6 +71,13 @@
session.writeFrame(recoverOk.generateFrame(channelId));
}
+ else if(session.getProtocolVersion().equals(ProtocolVersion.v0_91))
+ {
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) session.getMethodRegistry();
+ AMQMethodBody recoverOk = methodRegistry.createBasicRecoverSyncOkBody();
+ session.writeFrame(recoverOk.generateFrame(channelId));
+
+ }
}
}
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ChannelOpenHandler.java Mon Oct 26 20:37:36 2009
@@ -27,6 +27,7 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQConstant;
@@ -102,6 +103,30 @@
response = methodRegistry.createChannelOpenOkBody(channelName);
}
+ else if(pv.equals(ProtocolVersion.v0_91))
+ {
+ MethodRegistry_0_91 methodRegistry = (MethodRegistry_0_91) MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+ UUID uuid = UUID.randomUUID();
+ ByteArrayOutputStream output = new ByteArrayOutputStream();
+ DataOutputStream dataOut = new DataOutputStream(output);
+ try
+ {
+ dataOut.writeLong(uuid.getMostSignificantBits());
+ dataOut.writeLong(uuid.getLeastSignificantBits());
+ dataOut.flush();
+ dataOut.close();
+ }
+ catch (IOException e)
+ {
+ // This *really* shouldn't happen as we're not doing any I/O
+ throw new RuntimeException("I/O exception when writing to byte array", e);
+ }
+
+ // should really associate this channelId to the session
+ byte[] channelName = output.toByteArray();
+
+ response = methodRegistry.createChannelOpenOkBody(channelName);
+ }
else
{
throw new AMQException(AMQConstant.INTERNAL_ERROR, "Got channel open for protocol version not catered for: " + pv, null);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ConnectionOpenMethodHandler.java Mon Oct 26 20:37:36 2009
@@ -22,6 +22,9 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
+import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
+import org.apache.qpid.framing.amqp_8_0.MethodRegistry_8_0;
import org.apache.qpid.protocol.AMQConstant;
import org.apache.qpid.server.protocol.AMQProtocolSession;
import org.apache.qpid.server.state.AMQState;
@@ -86,10 +89,10 @@
if (session.getContextKey() == null)
{
session.setContextKey(generateClientID());
- }
+ }
MethodRegistry methodRegistry = session.getMethodRegistry();
- AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
+ AMQMethodBody responseBody = methodRegistry.createConnectionOpenOkBody(body.getVirtualHost());
stateManager.changeState(AMQState.CONNECTION_OPEN);
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl.java Mon Oct 26 20:37:36 2009
@@ -59,6 +59,14 @@
return new ServerMethodDispatcherImpl_0_9(stateManager);
}
});
+ _dispatcherFactories.put(ProtocolVersion.v0_91,
+ new DispatcherFactory()
+ {
+ public MethodDispatcher createMethodDispatcher(AMQStateManager stateManager)
+ {
+ return new ServerMethodDispatcherImpl_0_91(stateManager);
+ }
+ });
}
Copied: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java (from r829757, qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java?p2=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java&p1=qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java&r1=829757&r2=829944&rev=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_9.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/handler/ServerMethodDispatcherImpl_0_91.java Mon Oct 26 20:37:36 2009
@@ -21,16 +21,15 @@
package org.apache.qpid.server.handler;
-import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
import org.apache.qpid.server.state.AMQStateManager;
import org.apache.qpid.AMQException;
-
-public class ServerMethodDispatcherImpl_0_9
+public class ServerMethodDispatcherImpl_0_91
extends ServerMethodDispatcherImpl
- implements MethodDispatcher_0_9
+ implements MethodDispatcher_0_91
{
@@ -40,7 +39,7 @@
QueueUnbindHandler.getInstance();
- public ServerMethodDispatcherImpl_0_9(AMQStateManager stateManager)
+ public ServerMethodDispatcherImpl_0_91(AMQStateManager stateManager)
{
super(stateManager);
}
@@ -151,6 +150,11 @@
return false;
}
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false; //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
throw new UnexpectedMethodException(body);
@@ -161,4 +165,4 @@
_queueUnbindHandler.methodReceived(getStateManager(),body,channelId);
return true;
}
-}
+}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java (original)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/ProtocolOutputConverterRegistry.java Mon Oct 26 20:37:36 2009
@@ -44,6 +44,7 @@
{
register(ProtocolVersion.v8_0, org.apache.qpid.server.output.amqp0_8.ProtocolOutputConverterImpl.getInstanceFactory());
register(ProtocolVersion.v0_9, org.apache.qpid.server.output.amqp0_9.ProtocolOutputConverterImpl.getInstanceFactory());
+ register(ProtocolVersion.v0_91, org.apache.qpid.server.output.amqp0_9_1.ProtocolOutputConverterImpl.getInstanceFactory());
}
Added: qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java?rev=829944&view=auto
==============================================================================
--- qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java (added)
+++ qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/output/amqp0_9_1/ProtocolOutputConverterImpl.java Mon Oct 26 20:37:36 2009
@@ -0,0 +1,383 @@
+package org.apache.qpid.server.output.amqp0_9_1;
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+
+import org.apache.mina.common.ByteBuffer;
+
+import org.apache.qpid.server.output.ProtocolOutputConverter;
+import org.apache.qpid.server.output.HeaderPropertiesConverter;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.message.AMQMessage;
+import org.apache.qpid.server.queue.QueueEntry;
+import org.apache.qpid.server.message.MessageContentSource;
+import org.apache.qpid.server.message.MessageTransferMessage;
+import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.BasicGetBodyImpl;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.transport.DeliveryProperties;
+import org.apache.qpid.protocol.AMQVersionAwareProtocolSession;
+
+public class ProtocolOutputConverterImpl implements ProtocolOutputConverter
+{
+ private static final MethodRegistry METHOD_REGISTRY = MethodRegistry.getMethodRegistry(ProtocolVersion.v0_91);
+ private static final ProtocolVersionMethodConverter
+ PROTOCOL_CONVERTER = METHOD_REGISTRY.getProtocolVersionMethodConverter();
+
+
+ public static Factory getInstanceFactory()
+ {
+ return new Factory()
+ {
+
+ public ProtocolOutputConverter newInstance(AMQProtocolSession session)
+ {
+ return new ProtocolOutputConverterImpl(session);
+ }
+ };
+ }
+
+ private final AMQProtocolSession _protocolSession;
+
+ private ProtocolOutputConverterImpl(AMQProtocolSession session)
+ {
+ _protocolSession = session;
+ }
+
+
+ public AMQProtocolSession getProtocolSession()
+ {
+ return _protocolSession;
+ }
+
+ public void writeDeliver(QueueEntry entry, int channelId, long deliveryTag, AMQShortString consumerTag)
+ throws AMQException
+ {
+ AMQBody deliverBody = createEncodedDeliverBody(entry, deliveryTag, consumerTag);
+ writeMessageDelivery(entry, channelId, deliverBody);
+ }
+
+
+ private ContentHeaderBody getContentHeaderBody(QueueEntry entry)
+ throws AMQException
+ {
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ return ((AMQMessage)entry.getMessage()).getContentHeaderBody();
+ }
+ else
+ {
+ final MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ BasicContentHeaderProperties props = HeaderPropertiesConverter.convert(message);
+ ContentHeaderBody chb = new ContentHeaderBody(props, BasicGetBodyImpl.CLASS_ID);
+ chb.bodySize = message.getSize();
+ return chb;
+ }
+ }
+
+
+ private void writeMessageDelivery(QueueEntry entry, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+ writeMessageDelivery(entry.getMessage(), getContentHeaderBody(entry), channelId, deliverBody);
+ }
+
+ private void writeMessageDelivery(MessageContentSource message, ContentHeaderBody contentHeaderBody, int channelId, AMQBody deliverBody)
+ throws AMQException
+ {
+
+
+ int bodySize = (int) message.getSize();
+
+ if(bodySize == 0)
+ {
+ SmallCompositeAMQBodyBlock compositeBlock = new SmallCompositeAMQBodyBlock(channelId, deliverBody,
+ contentHeaderBody);
+
+ writeFrame(compositeBlock);
+ }
+ else
+ {
+ int maxBodySize = (int) getProtocolSession().getMaxFrameSize() - AMQFrame.getFrameOverhead();
+
+
+ final int capacity = bodySize > maxBodySize ? maxBodySize : bodySize;
+ java.nio.ByteBuffer buf = java.nio.ByteBuffer.allocate(capacity);
+
+ int writtenSize = 0;
+
+
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ AMQBody firstContentBody = PROTOCOL_CONVERTER.convertToBody(buf);
+
+ CompositeAMQBodyBlock
+ compositeBlock = new CompositeAMQBodyBlock(channelId, deliverBody, contentHeaderBody, firstContentBody);
+ writeFrame(compositeBlock);
+
+ while(writtenSize < bodySize)
+ {
+ buf = java.nio.ByteBuffer.allocate(capacity);
+
+ writtenSize += message.getContent(buf, writtenSize);
+ buf.flip();
+ writeFrame(new AMQFrame(channelId, PROTOCOL_CONVERTER.convertToBody(buf)));
+ }
+ }
+ }
+
+ private AMQDataBlock createContentHeaderBlock(final int channelId, final ContentHeaderBody contentHeaderBody)
+ {
+
+ AMQDataBlock contentHeader = ContentHeaderBody.createAMQFrame(channelId,
+ contentHeaderBody);
+ return contentHeader;
+ }
+
+
+ public void writeGetOk(QueueEntry entry, int channelId, long deliveryTag, int queueSize) throws AMQException
+ {
+ AMQBody deliver = createEncodedGetOkBody(entry, deliveryTag, queueSize);
+ writeMessageDelivery(entry, channelId, deliver);
+ }
+
+
+ private AMQBody createEncodedDeliverBody(QueueEntry entry,
+ final long deliveryTag,
+ final AMQShortString consumerTag)
+ throws AMQException
+ {
+
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ final AMQBody returnBlock = new AMQBody()
+ {
+
+ public AMQBody _underlyingBody;
+
+ public AMQBody createAMQBody()
+ {
+ return METHOD_REGISTRY.createBasicDeliverBody(consumerTag,
+ deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey);
+
+
+
+
+
+ }
+
+ public byte getFrameType()
+ {
+ return AMQMethodBody.TYPE;
+ }
+
+ public int getSize()
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ return _underlyingBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ if(_underlyingBody == null)
+ {
+ _underlyingBody = createAMQBody();
+ }
+ _underlyingBody.writePayload(buffer);
+ }
+
+ public void handle(final int channelId, final AMQVersionAwareProtocolSession amqMinaProtocolSession)
+ throws AMQException
+ {
+ throw new AMQException("This block should never be dispatched!");
+ }
+ };
+ return returnBlock;
+ }
+
+ private AMQBody createEncodedGetOkBody(QueueEntry entry, long deliveryTag, int queueSize)
+ throws AMQException
+ {
+ final AMQShortString exchangeName;
+ final AMQShortString routingKey;
+
+ if(entry.getMessage() instanceof AMQMessage)
+ {
+ final AMQMessage message = (AMQMessage) entry.getMessage();
+ final MessagePublishInfo pb = message.getMessagePublishInfo();
+ exchangeName = pb.getExchange();
+ routingKey = pb.getRoutingKey();
+ }
+ else
+ {
+ MessageTransferMessage message = (MessageTransferMessage) entry.getMessage();
+ DeliveryProperties delvProps = message.getHeader().get(DeliveryProperties.class);
+ exchangeName = (delvProps == null || delvProps.getExchange() == null) ? null : new AMQShortString(delvProps.getExchange());
+ routingKey = (delvProps == null || delvProps.getRoutingKey() == null) ? null : new AMQShortString(delvProps.getRoutingKey());
+ }
+
+ final boolean isRedelivered = entry.isRedelivered();
+
+ BasicGetOkBody getOkBody =
+ METHOD_REGISTRY.createBasicGetOkBody(deliveryTag,
+ isRedelivered,
+ exchangeName,
+ routingKey,
+ queueSize);
+
+ return getOkBody;
+ }
+
+ public byte getProtocolMinorVersion()
+ {
+ return getProtocolSession().getProtocolMinorVersion();
+ }
+
+ public byte getProtocolMajorVersion()
+ {
+ return getProtocolSession().getProtocolMajorVersion();
+ }
+
+ private AMQBody createEncodedReturnFrame(MessagePublishInfo messagePublishInfo,
+ int replyCode,
+ AMQShortString replyText) throws AMQException
+ {
+
+ BasicReturnBody basicReturnBody =
+ METHOD_REGISTRY.createBasicReturnBody(replyCode,
+ replyText,
+ messagePublishInfo.getExchange(),
+ messagePublishInfo.getRoutingKey());
+
+
+ return basicReturnBody;
+ }
+
+ public void writeReturn(MessagePublishInfo messagePublishInfo, ContentHeaderBody header, MessageContentSource message, int channelId, int replyCode, AMQShortString replyText)
+ throws AMQException
+ {
+
+ AMQBody returnFrame = createEncodedReturnFrame(messagePublishInfo, replyCode, replyText);
+
+ writeMessageDelivery(message, header, channelId, returnFrame);
+ }
+
+
+ public void writeFrame(AMQDataBlock block)
+ {
+ getProtocolSession().writeFrame(block);
+ }
+
+
+ public void confirmConsumerAutoClose(int channelId, AMQShortString consumerTag)
+ {
+
+ BasicCancelOkBody basicCancelOkBody = METHOD_REGISTRY.createBasicCancelOkBody(consumerTag);
+ writeFrame(basicCancelOkBody.generateFrame(channelId));
+
+ }
+
+
+ public static final class CompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 3 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final AMQBody _contentBody;
+ private final int _channel;
+
+
+ public CompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody, AMQBody contentBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+ _contentBody = contentBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() + _contentBody.getSize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody, _contentBody);
+ }
+ }
+
+ public static final class SmallCompositeAMQBodyBlock extends AMQDataBlock
+ {
+ public static final int OVERHEAD = 2 * AMQFrame.getFrameOverhead();
+
+ private final AMQBody _methodBody;
+ private final AMQBody _headerBody;
+ private final int _channel;
+
+
+ public SmallCompositeAMQBodyBlock(int channel, AMQBody methodBody, AMQBody headerBody)
+ {
+ _channel = channel;
+ _methodBody = methodBody;
+ _headerBody = headerBody;
+
+ }
+
+ public long getSize()
+ {
+ return OVERHEAD + _methodBody.getSize() + _headerBody.getSize() ;
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ AMQFrame.writeFrames(buffer, _channel, _methodBody, _headerBody);
+ }
+ }
+
+}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Mon Oct 26 20:37:36 2009
@@ -308,7 +308,7 @@
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
private final ExecutorService _taskPool = Executors.newCachedThreadPool();
private static final long DEFAULT_TIMEOUT = 1000 * 30;
- private ProtocolVersion _protocolVersion = ProtocolVersion.v0_9; // FIXME TGM, shouldn't need this
+ private ProtocolVersion _protocolVersion = ProtocolVersion.v0_91; // FIXME TGM, shouldn't need this
protected AMQConnectionDelegate _delegate;
Copied: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java (from r829757, qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java?p2=qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java&p1=qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java&r1=829757&r2=829944&rev=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_9.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java Mon Oct 26 20:37:36 2009
@@ -21,12 +21,12 @@
package org.apache.qpid.client;
-public class AMQConnectionDelegate_0_9 extends AMQConnectionDelegate_8_0
+public class AMQConnectionDelegate_9_1 extends AMQConnectionDelegate_8_0
{
- public AMQConnectionDelegate_0_9(AMQConnection conn)
+ public AMQConnectionDelegate_9_1(AMQConnection conn)
{
super(conn);
}
-
-}
+
+}
\ No newline at end of file
Propchange: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_9_1.java
------------------------------------------------------------------------------
svn:executable = *
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java Mon Oct 26 20:37:36 2009
@@ -36,6 +36,7 @@
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.common.AMQPFilterTypes;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.amqp_0_91.MethodRegistry_0_91;
import org.apache.qpid.framing.amqp_0_9.MethodRegistry_0_9;
import org.apache.qpid.jms.Session;
import org.apache.qpid.protocol.AMQConstant;
@@ -186,6 +187,11 @@
BasicRecoverSyncBody body = ((MethodRegistry_0_9)getMethodRegistry()).createBasicRecoverSyncBody(false);
_connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
}
+ else if(getProtocolVersion().equals(ProtocolVersion.v0_91))
+ {
+ BasicRecoverSyncBody body = ((MethodRegistry_0_91)getMethodRegistry()).createBasicRecoverSyncBody(false);
+ _connection.getProtocolHandler().syncWrite(body.generateFrame(_channelId), BasicRecoverSyncOkBody.class);
+ }
else
{
throw new RuntimeException("Unsupported version of the AMQP Protocol: " + getProtocolVersion());
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl.java Mon Oct 26 20:37:36 2009
@@ -79,6 +79,16 @@
}
});
+
+ _dispatcherFactories.put(ProtocolVersion.v0_91,
+ new DispatcherFactory()
+ {
+ public ClientMethodDispatcherImpl createMethodDispatcher(AMQProtocolSession session)
+ {
+ return new ClientMethodDispatcherImpl_0_91(session);
+ }
+ });
+
}
public static ClientMethodDispatcherImpl newMethodDispatcher(ProtocolVersion version, AMQProtocolSession session)
Copied: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java (from r829757, qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java?p2=qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java&p1=qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java&r1=829757&r2=829944&rev=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_9.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/ClientMethodDispatcherImpl_0_91.java Mon Oct 26 20:37:36 2009
@@ -21,16 +21,16 @@
package org.apache.qpid.client.handler;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.MethodDispatcher_0_9;
+import org.apache.qpid.framing.amqp_0_91.MethodDispatcher_0_91;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.state.AMQStateManager;
import org.apache.qpid.client.state.AMQMethodNotImplementedException;
import org.apache.qpid.client.protocol.AMQProtocolSession;
-public class ClientMethodDispatcherImpl_0_9 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_9
+public class ClientMethodDispatcherImpl_0_91 extends ClientMethodDispatcherImpl implements MethodDispatcher_0_91
{
- public ClientMethodDispatcherImpl_0_9(AMQProtocolSession session)
+ public ClientMethodDispatcherImpl_0_91(AMQProtocolSession session)
{
super(session);
}
@@ -145,9 +145,14 @@
throw new AMQMethodNotImplementedException(body);
}
+ public boolean dispatchBasicRecoverOk(BasicRecoverOkBody body, int channelId) throws AMQException
+ {
+ return false;
+ }
+
public boolean dispatchQueueUnbindOk(QueueUnbindOkBody body, int channelId) throws AMQException
{
return false;
}
-}
+}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Mon Oct 26 20:37:36 2009
@@ -28,6 +28,7 @@
import java.util.Set;
import java.util.concurrent.CopyOnWriteArraySet;
import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
import org.apache.mina.filter.codec.ProtocolCodecException;
import org.apache.qpid.AMQConnectionClosedException;
@@ -124,6 +125,8 @@
private static final Logger _protocolLogger = LoggerFactory.getLogger("qpid.protocol");
private static final boolean PROTOCOL_DEBUG = (System.getProperty("amqj.protocol.logging.level") != null);
+ private static final long MAXIMUM_STATE_WAIT_TIME = Long.parseLong(System.getProperty("amqj.MaximumStateWait", "30000"));
+
/**
* The connection that this protocol handler is associated with. There is a 1-1 mapping between connection
* instances and protocol handler instances.
@@ -736,7 +739,10 @@
{
if (_failoverLatch != null)
{
- _failoverLatch.await();
+ if(!_failoverLatch.await(MAXIMUM_STATE_WAIT_TIME, TimeUnit.MILLISECONDS))
+ {
+
+ }
}
}
}
Modified: qpid/trunk/qpid/java/common/build.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/build.xml?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/build.xml (original)
+++ qpid/trunk/qpid/java/common/build.xml Mon Oct 26 20:37:36 2009
@@ -28,8 +28,8 @@
<property name="generated.package" value="org/apache/qpid/framing" />
<property name="generated.dir" location="${module.precompiled}/${generated.package}" />
<property name="xml.spec.dir" location="${project.root}/../specs" />
- <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml" />
- <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml" />
+ <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml amqp0-9-1.stripped.xml" />
+ <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml ${xml.spec.dir}/amqp0-9-1.stripped.xml" />
<property name="gentools.timestamp" location="${generated.dir}/gentools.timestamp" />
<property name="jython.timestamp" location="${generated.dir}/jython.timestamp" />
Modified: qpid/trunk/qpid/java/common/protocol-version.xml
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/protocol-version.xml?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/protocol-version.xml (original)
+++ qpid/trunk/qpid/java/common/protocol-version.xml Mon Oct 26 20:37:36 2009
@@ -27,8 +27,8 @@
<property name="generated.dir" location="${generated.path}/${generated.package}" />
<property name="generated.timestamp" location="${generated.dir}/timestamp" />
<property name="xml.spec.dir" location="${topDirectoryLocation}/../specs" />
- <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml" />
- <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml" />
+ <property name="xml.spec.deps" value="amqp.0-8.xml amqp.0-9.xml amqp0-9-1.stripped.xml" />
+ <property name="xml.spec.list" value="${xml.spec.dir}/amqp.0-8.xml ${xml.spec.dir}/amqp.0-9.xml ${xml.spec.dir}/amqp0-9-1.stripped.xml" />
<property name="template.dir" value="${topDirectoryLocation}/common/templates" />
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/AMQMethodBodyImpl.java Mon Oct 26 20:37:36 2009
@@ -93,4 +93,166 @@
session.methodFrameReceived(channelId, this);
}
+ public int getSize()
+ {
+ return 2 + 2 + getBodySize();
+ }
+
+ public void writePayload(ByteBuffer buffer)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, getClazz());
+ EncodingUtils.writeUnsignedShort(buffer, getMethod());
+ writeMethodPayload(buffer);
+ }
+
+
+ protected byte readByte(ByteBuffer buffer)
+ {
+ return buffer.get();
+ }
+
+ protected AMQShortString readAMQShortString(ByteBuffer buffer)
+ {
+ return EncodingUtils.readAMQShortString(buffer);
+ }
+
+ protected int getSizeOf(AMQShortString string)
+ {
+ return EncodingUtils.encodedShortStringLength(string);
+ }
+
+ protected void writeByte(ByteBuffer buffer, byte b)
+ {
+ buffer.put(b);
+ }
+
+ protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
+ {
+ EncodingUtils.writeShortStringBytes(buffer, string);
+ }
+
+ protected int readInt(ByteBuffer buffer)
+ {
+ return buffer.getInt();
+ }
+
+ protected void writeInt(ByteBuffer buffer, int i)
+ {
+ buffer.putInt(i);
+ }
+
+ protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
+ {
+ return EncodingUtils.readFieldTable(buffer);
+ }
+
+ protected int getSizeOf(FieldTable table)
+ {
+ return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
+ {
+ EncodingUtils.writeFieldTableBytes(buffer, table);
+ }
+
+ protected long readLong(ByteBuffer buffer)
+ {
+ return buffer.getLong();
+ }
+
+ protected void writeLong(ByteBuffer buffer, long l)
+ {
+ buffer.putLong(l);
+ }
+
+ protected int getSizeOf(byte[] response)
+ {
+ return (response == null) ? 4 : response.length + 4;
+ }
+
+ protected void writeBytes(ByteBuffer buffer, byte[] data)
+ {
+ EncodingUtils.writeBytes(buffer,data);
+ }
+
+ protected byte[] readBytes(ByteBuffer buffer)
+ {
+ return EncodingUtils.readBytes(buffer);
+ }
+
+ protected short readShort(ByteBuffer buffer)
+ {
+ return EncodingUtils.readShort(buffer);
+ }
+
+ protected void writeShort(ByteBuffer buffer, short s)
+ {
+ EncodingUtils.writeShort(buffer, s);
+ }
+
+ protected Content readContent(ByteBuffer buffer)
+ {
+ return null; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int getSizeOf(Content body)
+ {
+ return 0; //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeContent(ByteBuffer buffer, Content body)
+ {
+ //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected byte readBitfield(ByteBuffer buffer)
+ {
+ return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected int readUnsignedShort(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
+ }
+
+ protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
+ {
+ buffer.put(bitfield0);
+ }
+
+ protected void writeUnsignedShort(ByteBuffer buffer, int s)
+ {
+ EncodingUtils.writeUnsignedShort(buffer, s);
+ }
+
+ protected long readUnsignedInteger(ByteBuffer buffer)
+ {
+ return buffer.getUnsignedInt();
+ }
+ protected void writeUnsignedInteger(ByteBuffer buffer, long i)
+ {
+ EncodingUtils.writeUnsignedInteger(buffer, i);
+ }
+
+
+ protected short readUnsignedByte(ByteBuffer buffer)
+ {
+ return buffer.getUnsigned();
+ }
+
+ protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
+ {
+ EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
+ }
+
+ protected long readTimestamp(ByteBuffer buffer)
+ {
+ return EncodingUtils.readTimestamp(buffer);
+ }
+
+ protected void writeTimestamp(ByteBuffer buffer, long t)
+ {
+ EncodingUtils.writeTimestamp(buffer, t);
+ }
}
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ProtocolInitiation.java Mon Oct 26 20:37:36 2009
@@ -54,7 +54,10 @@
public ProtocolInitiation(ProtocolVersion pv)
{
- this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS, TCP_PROTOCOL_INSTANCE, pv.getMajorVersion(), pv.getMinorVersion());
+ this(AMQP_HEADER, CURRENT_PROTOCOL_CLASS,
+ pv.equals(ProtocolVersion.v0_91) ? 0 : TCP_PROTOCOL_INSTANCE,
+ pv.equals(ProtocolVersion.v0_91) ? 9 : pv.getMajorVersion(),
+ pv.equals(ProtocolVersion.v0_91) ? 1 : pv.getMinorVersion());
}
public ProtocolInitiation(ByteBuffer in)
@@ -124,7 +127,6 @@
{
/**
*
- * @param session the session
* @param in input buffer
* @return true if we have enough data to decode the PI frame fully, false if more
* data is required
@@ -162,13 +164,24 @@
throw new AMQProtocolClassException("Protocol class " + CURRENT_PROTOCOL_CLASS + " was expected; received " +
_protocolClass, null);
}
- if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
+
+ ProtocolVersion pv;
+
+ // Hack for 0-9-1 which changed how the header was defined
+ if(_protocolInstance == 0 && _protocolMajor == 9 && _protocolMinor == 1)
+ {
+ pv = ProtocolVersion.v0_91;
+
+ }
+ else if (_protocolInstance != TCP_PROTOCOL_INSTANCE)
{
throw new AMQProtocolInstanceException("Protocol instance " + TCP_PROTOCOL_INSTANCE + " was expected; received " +
_protocolInstance, null);
}
-
- ProtocolVersion pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+ else
+ {
+ pv = new ProtocolVersion(_protocolMajor, _protocolMinor);
+ }
if (!pv.isSupported())
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java Mon Oct 26 20:37:36 2009
@@ -21,14 +21,6 @@
package org.apache.qpid.framing.amqp_0_9;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.Content;
-
-import org.apache.mina.common.ByteBuffer;
-
public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl
{
@@ -40,170 +32,6 @@
public byte getMinor()
{
return 9;
- }
-
- public int getSize()
- {
- return 2 + 2 + getBodySize();
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- EncodingUtils.writeUnsignedShort(buffer, getClazz());
- EncodingUtils.writeUnsignedShort(buffer, getMethod());
- writeMethodPayload(buffer);
- }
-
-
- protected byte readByte(ByteBuffer buffer)
- {
- return buffer.get();
- }
-
- protected AMQShortString readAMQShortString(ByteBuffer buffer)
- {
- return EncodingUtils.readAMQShortString(buffer);
- }
-
- protected int getSizeOf(AMQShortString string)
- {
- return EncodingUtils.encodedShortStringLength(string);
- }
-
- protected void writeByte(ByteBuffer buffer, byte b)
- {
- buffer.put(b);
- }
-
- protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
- {
- EncodingUtils.writeShortStringBytes(buffer, string);
- }
-
- protected int readInt(ByteBuffer buffer)
- {
- return buffer.getInt();
- }
-
- protected void writeInt(ByteBuffer buffer, int i)
- {
- buffer.putInt(i);
- }
-
- protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
- {
- return EncodingUtils.readFieldTable(buffer);
- }
-
- protected int getSizeOf(FieldTable table)
- {
- return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
- {
- EncodingUtils.writeFieldTableBytes(buffer, table);
- }
-
- protected long readLong(ByteBuffer buffer)
- {
- return buffer.getLong();
- }
-
- protected void writeLong(ByteBuffer buffer, long l)
- {
- buffer.putLong(l);
- }
-
- protected int getSizeOf(byte[] response)
- {
- return (response == null) ? 4 :response.length + 4;
- }
-
- protected void writeBytes(ByteBuffer buffer, byte[] data)
- {
- EncodingUtils.writeBytes(buffer,data);
- }
-
- protected byte[] readBytes(ByteBuffer buffer)
- {
- return EncodingUtils.readBytes(buffer);
- }
-
- protected short readShort(ByteBuffer buffer)
- {
- return EncodingUtils.readShort(buffer);
- }
-
- protected void writeShort(ByteBuffer buffer, short s)
- {
- EncodingUtils.writeShort(buffer, s);
- }
-
- protected Content readContent(ByteBuffer buffer)
- {
- return null; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int getSizeOf(Content body)
- {
- return 0; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeContent(ByteBuffer buffer, Content body)
- {
- //To change body of created methods use File | Settings | File Templates.
- }
-
- protected byte readBitfield(ByteBuffer buffer)
- {
- return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int readUnsignedShort(ByteBuffer buffer)
- {
- return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
- {
- buffer.put(bitfield0);
- }
-
- protected void writeUnsignedShort(ByteBuffer buffer, int s)
- {
- EncodingUtils.writeUnsignedShort(buffer, s);
- }
-
- protected long readUnsignedInteger(ByteBuffer buffer)
- {
- return buffer.getUnsignedInt();
- }
- protected void writeUnsignedInteger(ByteBuffer buffer, long i)
- {
- EncodingUtils.writeUnsignedInteger(buffer, i);
- }
-
-
- protected short readUnsignedByte(ByteBuffer buffer)
- {
- return buffer.getUnsigned();
- }
-
- protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
- {
- EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
- }
-
- protected long readTimestamp(ByteBuffer buffer)
- {
- return EncodingUtils.readTimestamp(buffer);
- }
-
- protected void writeTimestamp(ByteBuffer buffer, long t)
- {
- EncodingUtils.writeTimestamp(buffer, t);
- }
-
+ }
}
Copied: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java (from r829757, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java&r1=829757&r2=829944&rev=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/AMQMethodBody_0_9.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/AMQMethodBody_0_91.java Mon Oct 26 20:37:36 2009
@@ -19,17 +19,9 @@
*
*/
-package org.apache.qpid.framing.amqp_0_9;
+package org.apache.qpid.framing.amqp_0_91;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.Content;
-
-import org.apache.mina.common.ByteBuffer;
-
-public abstract class AMQMethodBody_0_9 extends org.apache.qpid.framing.AMQMethodBodyImpl
+public abstract class AMQMethodBody_0_91 extends org.apache.qpid.framing.AMQMethodBodyImpl
{
public byte getMajor()
@@ -39,171 +31,7 @@
public byte getMinor()
{
- return 9;
- }
-
- public int getSize()
- {
- return 2 + 2 + getBodySize();
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- EncodingUtils.writeUnsignedShort(buffer, getClazz());
- EncodingUtils.writeUnsignedShort(buffer, getMethod());
- writeMethodPayload(buffer);
- }
-
-
- protected byte readByte(ByteBuffer buffer)
- {
- return buffer.get();
- }
-
- protected AMQShortString readAMQShortString(ByteBuffer buffer)
- {
- return EncodingUtils.readAMQShortString(buffer);
- }
-
- protected int getSizeOf(AMQShortString string)
- {
- return EncodingUtils.encodedShortStringLength(string);
- }
-
- protected void writeByte(ByteBuffer buffer, byte b)
- {
- buffer.put(b);
- }
-
- protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
- {
- EncodingUtils.writeShortStringBytes(buffer, string);
- }
-
- protected int readInt(ByteBuffer buffer)
- {
- return buffer.getInt();
- }
-
- protected void writeInt(ByteBuffer buffer, int i)
- {
- buffer.putInt(i);
- }
-
- protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
- {
- return EncodingUtils.readFieldTable(buffer);
- }
-
- protected int getSizeOf(FieldTable table)
- {
- return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
- {
- EncodingUtils.writeFieldTableBytes(buffer, table);
- }
-
- protected long readLong(ByteBuffer buffer)
- {
- return buffer.getLong();
- }
-
- protected void writeLong(ByteBuffer buffer, long l)
- {
- buffer.putLong(l);
- }
-
- protected int getSizeOf(byte[] response)
- {
- return (response == null) ? 4 :response.length + 4;
- }
-
- protected void writeBytes(ByteBuffer buffer, byte[] data)
- {
- EncodingUtils.writeBytes(buffer,data);
- }
-
- protected byte[] readBytes(ByteBuffer buffer)
- {
- return EncodingUtils.readBytes(buffer);
- }
-
- protected short readShort(ByteBuffer buffer)
- {
- return EncodingUtils.readShort(buffer);
- }
-
- protected void writeShort(ByteBuffer buffer, short s)
- {
- EncodingUtils.writeShort(buffer, s);
- }
-
- protected Content readContent(ByteBuffer buffer)
- {
- return null; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int getSizeOf(Content body)
- {
- return 0; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeContent(ByteBuffer buffer, Content body)
- {
- //To change body of created methods use File | Settings | File Templates.
- }
-
- protected byte readBitfield(ByteBuffer buffer)
- {
- return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int readUnsignedShort(ByteBuffer buffer)
- {
- return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
- {
- buffer.put(bitfield0);
- }
-
- protected void writeUnsignedShort(ByteBuffer buffer, int s)
- {
- EncodingUtils.writeUnsignedShort(buffer, s);
- }
-
- protected long readUnsignedInteger(ByteBuffer buffer)
- {
- return buffer.getUnsignedInt();
- }
- protected void writeUnsignedInteger(ByteBuffer buffer, long i)
- {
- EncodingUtils.writeUnsignedInteger(buffer, i);
- }
-
-
- protected short readUnsignedByte(ByteBuffer buffer)
- {
- return buffer.getUnsigned();
- }
-
- protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
- {
- EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
- }
-
- protected long readTimestamp(ByteBuffer buffer)
- {
- return EncodingUtils.readTimestamp(buffer);
- }
-
- protected void writeTimestamp(ByteBuffer buffer, long t)
- {
- EncodingUtils.writeTimestamp(buffer, t);
+ return 91;
}
-
-}
+}
\ No newline at end of file
Copied: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java (from r829757, qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java)
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java?p2=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java&p1=qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java&r1=829757&r2=829944&rev=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_9/MethodConverter_0_9.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_0_91/MethodConverter_0_91.java Mon Oct 26 20:37:36 2009
@@ -19,7 +19,7 @@
*
*/
-package org.apache.qpid.framing.amqp_0_9;
+package org.apache.qpid.framing.amqp_0_91;
import org.apache.mina.common.ByteBuffer;
@@ -29,15 +29,13 @@
import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.framing.abstraction.MessagePublishInfoImpl;
import org.apache.qpid.framing.*;
-import org.apache.qpid.framing.amqp_0_9.*;
-import org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl;
-public class MethodConverter_0_9 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+public class MethodConverter_0_91 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
{
private int _basicPublishClassId;
private int _basicPublishMethodId;
- public MethodConverter_0_9()
+ public MethodConverter_0_91()
{
super((byte)0,(byte)9);
@@ -67,7 +65,7 @@
public void configure()
{
- _basicPublishClassId = org.apache.qpid.framing.amqp_0_9.BasicPublishBodyImpl.CLASS_ID;
+ _basicPublishClassId = BasicPublishBodyImpl.CLASS_ID;
_basicPublishMethodId = BasicPublishBodyImpl.METHOD_ID;
}
@@ -131,4 +129,4 @@
return _contentBodyChunk;
}
}
-}
+}
\ No newline at end of file
Modified: qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java
URL: http://svn.apache.org/viewvc/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java?rev=829944&r1=829943&r2=829944&view=diff
==============================================================================
--- qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java (original)
+++ qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/amqp_8_0/AMQMethodBody_8_0.java Mon Oct 26 20:37:36 2009
@@ -21,14 +21,6 @@
package org.apache.qpid.framing.amqp_8_0;
-import org.apache.qpid.framing.EncodingUtils;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.Content;
-
-import org.apache.mina.common.ByteBuffer;
-
public abstract class AMQMethodBody_8_0 extends org.apache.qpid.framing.AMQMethodBodyImpl
{
@@ -42,168 +34,7 @@
return 0;
}
- public int getSize()
- {
- return 2 + 2 + getBodySize();
- }
-
- public void writePayload(ByteBuffer buffer)
- {
- EncodingUtils.writeUnsignedShort(buffer, getClazz());
- EncodingUtils.writeUnsignedShort(buffer, getMethod());
- writeMethodPayload(buffer);
- }
-
- protected byte readByte(ByteBuffer buffer)
- {
- return buffer.get();
- }
-
- protected AMQShortString readAMQShortString(ByteBuffer buffer)
- {
- return EncodingUtils.readAMQShortString(buffer);
- }
-
- protected int getSizeOf(AMQShortString string)
- {
- return EncodingUtils.encodedShortStringLength(string);
- }
-
- protected void writeByte(ByteBuffer buffer, byte b)
- {
- buffer.put(b);
- }
-
- protected void writeAMQShortString(ByteBuffer buffer, AMQShortString string)
- {
- EncodingUtils.writeShortStringBytes(buffer, string);
- }
-
- protected int readInt(ByteBuffer buffer)
- {
- return buffer.getInt();
- }
-
- protected void writeInt(ByteBuffer buffer, int i)
- {
- buffer.putInt(i);
- }
-
- protected FieldTable readFieldTable(ByteBuffer buffer) throws AMQFrameDecodingException
- {
- return EncodingUtils.readFieldTable(buffer);
- }
-
- protected int getSizeOf(FieldTable table)
- {
- return EncodingUtils.encodedFieldTableLength(table); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeFieldTable(ByteBuffer buffer, FieldTable table)
- {
- EncodingUtils.writeFieldTableBytes(buffer, table);
- }
-
- protected long readLong(ByteBuffer buffer)
- {
- return buffer.getLong();
- }
-
- protected void writeLong(ByteBuffer buffer, long l)
- {
- buffer.putLong(l);
- }
-
- protected int getSizeOf(byte[] response)
- {
- return (response == null) ? 4 : response.length + 4;
- }
-
- protected void writeBytes(ByteBuffer buffer, byte[] data)
- {
- EncodingUtils.writeBytes(buffer,data);
- }
-
- protected byte[] readBytes(ByteBuffer buffer)
- {
- return EncodingUtils.readBytes(buffer);
- }
-
- protected short readShort(ByteBuffer buffer)
- {
- return EncodingUtils.readShort(buffer);
- }
-
- protected void writeShort(ByteBuffer buffer, short s)
- {
- EncodingUtils.writeShort(buffer, s);
- }
-
- protected Content readContent(ByteBuffer buffer)
- {
- return null; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int getSizeOf(Content body)
- {
- return 0; //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeContent(ByteBuffer buffer, Content body)
- {
- //To change body of created methods use File | Settings | File Templates.
- }
-
- protected byte readBitfield(ByteBuffer buffer)
- {
- return readByte(buffer); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected int readUnsignedShort(ByteBuffer buffer)
- {
- return buffer.getUnsignedShort(); //To change body of created methods use File | Settings | File Templates.
- }
-
- protected void writeBitfield(ByteBuffer buffer, byte bitfield0)
- {
- buffer.put(bitfield0);
- }
-
- protected void writeUnsignedShort(ByteBuffer buffer, int s)
- {
- EncodingUtils.writeUnsignedShort(buffer, s);
- }
-
- protected long readUnsignedInteger(ByteBuffer buffer)
- {
- return buffer.getUnsignedInt();
- }
- protected void writeUnsignedInteger(ByteBuffer buffer, long i)
- {
- EncodingUtils.writeUnsignedInteger(buffer, i);
- }
-
-
- protected short readUnsignedByte(ByteBuffer buffer)
- {
- return buffer.getUnsigned();
- }
-
- protected void writeUnsignedByte(ByteBuffer buffer, short unsignedByte)
- {
- EncodingUtils.writeUnsignedByte(buffer, unsignedByte);
- }
-
- protected long readTimestamp(ByteBuffer buffer)
- {
- return EncodingUtils.readTimestamp(buffer);
- }
-
- protected void writeTimestamp(ByteBuffer buffer, long t)
- {
- EncodingUtils.writeTimestamp(buffer, t);
- }
}
---------------------------------------------------------------------
Apache Qpid - AMQP Messaging Implementation
Project: http://qpid.apache.org
Use/Interact: mailto:commits-subscribe@qpid.apache.org