You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/02/20 17:20:46 UTC
svn commit: r509628 [2/2] - in /incubator/qpid/trunk/qpid:
gentools/templ.java/ java/broker/src/main/java/org/apache/qpid/server/
java/broker/src/main/java/org/apache/qpid/server/configuration/
java/broker/src/main/java/org/apache/qpid/server/exchange/...
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Feb 20 08:20:41 2007
@@ -1156,6 +1156,41 @@
protocolHandler.syncWrite(exchangeDeclare, ExchangeDeclareOkBody.class);
}
+
+ public void createQueue(AMQShortString name, boolean autoDelete, boolean durable, boolean exclusive) throws AMQException
+ {
+ AMQFrame queueDeclare = QueueDeclareBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ null, // arguments
+ autoDelete, // autoDelete
+ durable, // durable
+ exclusive, // exclusive
+ false, // nowait
+ false, // passive
+ name, // queue
+ getTicket()); // ticket
+
+ getProtocolHandler().syncWrite(queueDeclare, QueueDeclareOkBody.class);
+
+ }
+
+
+ public void bindQueue(AMQShortString queueName, AMQShortString routingKey, FieldTable arguments, AMQShortString exchangeName) throws AMQException
+ {
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ AMQFrame queueBind = QueueBindBody.createAMQFrame(_channelId,
+ getProtocolMajorVersion(), getProtocolMinorVersion(), // AMQP version (major, minor)
+ arguments, // arguments
+ exchangeName, // exchange
+ false, // nowait
+ queueName, // queue
+ routingKey, // routingKey
+ getTicket()); // ticket
+
+
+ getProtocolHandler().syncWrite(queueBind, QueueBindOkBody.class);
+ }
+
/**
* Declare the queue.
*
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/cluster/SimpleSendable.java Tue Feb 20 08:20:41 2007
@@ -22,13 +22,19 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.MethodConverter_8_0;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
import org.apache.qpid.server.queue.AMQMessage;
import java.util.Iterator;
public class SimpleSendable implements Sendable
{
+
+ //todo fixme - remove 0-8 hard coding
+ ProtocolVersionMethodConverter _methodConverter = new MethodConverter_8_0();
+
private final AMQMessage _message;
public SimpleSendable(AMQMessage message)
@@ -38,12 +44,12 @@
public void send(int channel, Member member) throws AMQException
{
- member.send(new AMQFrame(channel, _message.getPublishBody()));
+ member.send(new AMQFrame(channel, _methodConverter.convertToBody(_message.getMessagePublishInfo())));
member.send(new AMQFrame(channel, _message.getContentHeaderBody()));
- Iterator<ContentBody> it = _message.getContentBodyIterator();
+ Iterator<ContentChunk> it = _message.getContentBodyIterator();
while (it.hasNext())
{
- member.send(new AMQFrame(channel, it.next()));
+ member.send(new AMQFrame(channel, _methodConverter.convertToBody(it.next())));
}
}
}
Modified: incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/main/java/org/apache/qpid/server/queue/RemoteQueueProxy.java Tue Feb 20 08:20:41 2007
@@ -31,8 +31,6 @@
import org.apache.qpid.server.cluster.util.LogMessage;
import org.apache.qpid.server.virtualhost.VirtualHost;
-import java.util.concurrent.Executor;
-
/**
* TODO: separate out an abstract base class from AMQQueue from which this inherits. It does
* not require all the functionality currently in AMQQueue.
@@ -81,8 +79,11 @@
void relay(AMQMessage msg) throws AMQException
{
- BasicPublishBody publish = msg.getPublishBody();
- publish.immediate = false; //can't as yet handle the immediate flag in a cluster
+ // TODO FIXME - can no longer update the publish body as it is an opaque wrapper object
+ // if cluster can handle immediate then it should wrap the wrapper...
+
+// BasicPublishBody publish = msg.getMessagePublishInfo();
+// publish.immediate = false; //can't as yet handle the immediate flag in a cluster
// send this on to the broker for which it is acting as proxy:
_groupMgr.send(_target, new SimpleSendable(msg));
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/ContentBody.java Tue Feb 20 08:20:41 2007
@@ -96,6 +96,8 @@
}
}
+
+
public static AMQFrame createAMQFrame(int channelId, ContentBody body)
{
final AMQFrame frame = new AMQFrame(channelId, body);
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/EncodingUtils.java Tue Feb 20 08:20:41 2007
@@ -641,8 +641,7 @@
public static void writeTimestamp(ByteBuffer buffer, long timestamp)
{
- writeUnsignedInteger(buffer, 0/*timestamp msb*/);
- writeUnsignedInteger(buffer, timestamp);
+ writeLong(buffer, timestamp);
}
public static boolean[] readBooleans(ByteBuffer buffer)
@@ -765,8 +764,8 @@
public static long readTimestamp(ByteBuffer buffer)
{
// Discard msb from AMQ timestamp
- buffer.getUnsignedInt();
- return buffer.getUnsignedInt();
+ //buffer.getUnsignedInt();
+ return buffer.getLong();
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/MethodConverter_8_0.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,104 @@
+package org.apache.qpid.framing;
+
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+import org.apache.qpid.framing.abstraction.ContentChunk;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.qpid.framing.abstraction.AbstractMethodConverter;
+
+import org.apache.mina.common.ByteBuffer;
+
+public class MethodConverter_8_0 extends AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private int _basicPublishClassId;
+ private int _basicPublishMethodId;
+
+ public MethodConverter_8_0()
+ {
+ super((byte)8,(byte)0);
+
+
+ }
+
+ public AMQBody convertToBody(ContentChunk contentChunk)
+ {
+ return new ContentBody(contentChunk.getData());
+ }
+
+ public ContentChunk convertToContentChunk(AMQBody body)
+ {
+ final ContentBody contentBodyChunk = (ContentBody) body;
+
+ return new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return contentBodyChunk.getSize();
+ }
+
+ public ByteBuffer getData()
+ {
+ return contentBodyChunk.payload;
+ }
+
+ public void reduceToFit()
+ {
+ contentBodyChunk.reduceBufferToFit();
+ }
+ };
+
+ }
+
+ public void configure()
+ {
+
+ _basicPublishClassId = BasicPublishBody.getClazz(getProtocolMajorVersion(),getProtocolMinorVersion());
+ _basicPublishMethodId = BasicPublishBody.getMethod(getProtocolMajorVersion(),getProtocolMinorVersion());
+
+ }
+
+ public MessagePublishInfo convertToInfo(AMQMethodBody methodBody)
+ {
+ final BasicPublishBody body = (BasicPublishBody) methodBody;
+
+ return new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return body.getExchange();
+ }
+
+ public boolean isImmediate()
+ {
+ return body.getImmediate();
+ }
+
+ public boolean isMandatory()
+ {
+ return body.getMandatory();
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return body.getRoutingKey();
+ }
+ };
+
+ }
+
+ public AMQMethodBody convertToBody(MessagePublishInfo info)
+ {
+
+ return new BasicPublishBody(getProtocolMajorVersion(),
+ getProtocolMinorVersion(),
+ _basicPublishClassId,
+ _basicPublishMethodId,
+ info.getExchange(),
+ info.isImmediate(),
+ info.isMandatory(),
+ info.getRoutingKey(),
+ 0) ; // ticket
+
+ }
+}
Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/VersionSpecificRegistry.java Tue Feb 20 08:20:41 2007
@@ -20,6 +20,8 @@
*/
package org.apache.qpid.framing;
+import org.apache.qpid.framing.abstraction.ProtocolVersionMethodConverter;
+
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
@@ -36,10 +38,53 @@
private AMQMethodBodyInstanceFactory[][] _registry = new AMQMethodBodyInstanceFactory[DEFAULT_MAX_CLASS_ID][];
+ private ProtocolVersionMethodConverter _protocolVersionConverter;
+
public VersionSpecificRegistry(byte major, byte minor)
{
_protocolMajorVersion = major;
_protocolMinorVersion = minor;
+
+ _protocolVersionConverter = loadProtocolVersionConverters(major, minor);
+ }
+
+ private static ProtocolVersionMethodConverter loadProtocolVersionConverters(byte protocolMajorVersion, byte protocolMinorVersion)
+ {
+ try
+ {
+ Class<ProtocolVersionMethodConverter> versionMethodConverterClass =
+ (Class<ProtocolVersionMethodConverter>) Class.forName("org.apache.qpid.framing.MethodConverter_"+protocolMajorVersion + "_" + protocolMinorVersion);
+ return versionMethodConverterClass.newInstance();
+
+ }
+ catch (ClassNotFoundException e)
+ {
+ _log.warn("Could not find protocol conversion classes for " + protocolMajorVersion + "-" + protocolMinorVersion);
+ if(protocolMinorVersion != 0)
+ {
+ protocolMinorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else if (protocolMajorVersion != 0)
+ {
+ protocolMajorVersion--;
+ return loadProtocolVersionConverters(protocolMajorVersion, protocolMinorVersion);
+ }
+ else
+ {
+ return null;
+ }
+
+
+ }
+ catch (IllegalAccessException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
+ catch (InstantiationException e)
+ {
+ throw new IllegalStateException("Unable to load protocol version converter: ", e);
+ }
}
public byte getProtocolMajorVersion()
@@ -137,5 +182,15 @@
return bodyFactory.newInstance(_protocolMajorVersion, _protocolMinorVersion, classID, methodID, in, size);
+ }
+
+ public ProtocolVersionMethodConverter getProtocolVersionMethodConverter()
+ {
+ return _protocolVersionConverter;
+ }
+
+ public void configure()
+ {
+ _protocolVersionConverter.configure();
}
}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/AbstractMethodConverter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,26 @@
+package org.apache.qpid.framing.abstraction;
+
+public abstract class AbstractMethodConverter implements ProtocolVersionMethodConverter
+{
+ private final byte _protocolMajorVersion;
+
+
+ private final byte _protocolMinorVersion;
+
+ public AbstractMethodConverter(byte major, byte minor)
+ {
+ _protocolMajorVersion = major;
+ _protocolMinorVersion = minor;
+ }
+
+
+ public final byte getProtocolMajorVersion()
+ {
+ return _protocolMajorVersion;
+ }
+
+ public final byte getProtocolMinorVersion()
+ {
+ return _protocolMinorVersion;
+ }
+}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ContentChunk.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,32 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.mina.common.ByteBuffer;
+
+public interface ContentChunk
+{
+ int getSize();
+ ByteBuffer getData();
+
+ void reduceToFit();
+}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfo.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,36 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQShortString;
+
+public interface MessagePublishInfo
+{
+
+ public AMQShortString getExchange();
+
+ public boolean isImmediate();
+
+ public boolean isMandatory();
+
+ public AMQShortString getRoutingKey();
+
+}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/MessagePublishInfoConverter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQMethodBody;
+
+
+public interface MessagePublishInfoConverter
+{
+ public MessagePublishInfo convertToInfo(AMQMethodBody body);
+ public AMQMethodBody convertToBody(MessagePublishInfo info);
+
+}
Added: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java?view=auto&rev=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java (added)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/framing/abstraction/ProtocolVersionMethodConverter.java Tue Feb 20 08:20:41 2007
@@ -0,0 +1,29 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+
+package org.apache.qpid.framing.abstraction;
+
+import org.apache.qpid.framing.AMQBody;
+
+public interface ProtocolVersionMethodConverter extends MessagePublishInfoConverter
+{
+ AMQBody convertToBody(ContentChunk contentBody);
+ ContentChunk convertToContentChunk(AMQBody body);
+
+ void configure();
+}
Modified: incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/test/java/org/apache/qpid/framing/BasicContentHeaderPropertiesTest.java Tue Feb 20 08:20:41 2007
@@ -154,7 +154,7 @@
public void testSetGetTimestamp()
{
- long timestamp = 999999999;
+ long timestamp = System.currentTimeMillis();
_testProperties.setTimestamp(timestamp);
assertEquals(timestamp, _testProperties.getTimestamp());
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/ack/TxAckTest.java Tue Feb 20 08:20:41 2007
@@ -23,6 +23,8 @@
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.store.TestableMemoryMessageStore;
@@ -103,16 +105,32 @@
for(int i = 0; i < messageCount; i++)
{
long deliveryTag = i + 1;
- // TODO: fix hardcoded protocol version data
- TestMessage message = new TestMessage(deliveryTag, i, new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0), txnContext);
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ TestMessage message = new TestMessage(deliveryTag, i, info, txnContext);
_map.add(deliveryTag, new UnacknowledgedMessage(null, message, null, deliveryTag));
}
_acked = acked;
@@ -174,7 +192,7 @@
private final long _tag;
private int _count;
- TestMessage(long tag, long messageId, BasicPublishBody publishBody, TransactionalContext txnContext)
+ TestMessage(long tag, long messageId, MessagePublishInfo publishBody, TransactionalContext txnContext)
{
super(messageId, publishBody, txnContext);
_tag = tag;
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/AbstractHeadersExchangeTestBase.java Tue Feb 20 08:20:41 2007
@@ -23,6 +23,7 @@
import junit.framework.TestCase;
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.*;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageHandleFactory;
@@ -149,15 +150,97 @@
return headers;
}
- static BasicPublishBody getPublishRequest(String id)
+
+ static final class MessagePublishInfoImpl implements MessagePublishInfo
+ {
+ private AMQShortString _exchange;
+ private boolean _immediate;
+ private boolean _mandatory;
+ private AMQShortString _routingKey;
+
+
+ public MessagePublishInfoImpl(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+
+ public MessagePublishInfoImpl(AMQShortString exchange, boolean immediate, boolean mandatory, AMQShortString routingKey)
+ {
+ _exchange = exchange;
+ _immediate = immediate;
+ _mandatory = mandatory;
+ _routingKey = routingKey;
+ }
+
+ public AMQShortString getExchange()
+ {
+ return _exchange;
+ }
+
+ public boolean isImmediate()
+ {
+ return _immediate;
+
+ }
+
+ public boolean isMandatory()
+ {
+ return _mandatory;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return _routingKey;
+ }
+
+
+ public void setExchange(AMQShortString exchange)
+ {
+ _exchange = exchange;
+ }
+
+ public void setImmediate(boolean immediate)
+ {
+ _immediate = immediate;
+ }
+
+ public void setMandatory(boolean mandatory)
+ {
+ _mandatory = mandatory;
+ }
+
+ public void setRoutingKey(AMQShortString routingKey)
+ {
+ _routingKey = routingKey;
+ }
+ }
+
+ static MessagePublishInfo getPublishRequest(final String id)
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody request = new BasicPublishBody((byte)8, (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,false,false,new AMQShortString(id),0);
-
+ MessagePublishInfo request = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString(id);
+ }
+ };
+
return request;
}
@@ -221,7 +304,7 @@
this(getPublishRequest(id), getContentHeader(headers), null);
}
- private Message(BasicPublishBody publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
+ private Message(MessagePublishInfo publish, ContentHeaderBody header, List<ContentBody> bodies) throws AMQException
{
super(_messageStore.getNewMessageId(), publish, _txnContext, header);
}
@@ -265,7 +348,7 @@
{
try
{
- return getPublishBody().routingKey;
+ return getMessagePublishInfo().getRoutingKey();
}
catch (AMQException e)
{
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/HeadersExchangeTest.java Tue Feb 20 08:20:41 2007
@@ -22,7 +22,6 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.server.registry.ApplicationRegistry;
-import org.apache.qpid.server.util.TestApplicationRegistry;
import org.apache.qpid.server.util.NullApplicationRegistry;
import org.apache.qpid.framing.BasicPublishBody;
@@ -55,13 +54,13 @@
Message m7 = new Message("Message7", "XXXXX");
- BasicPublishBody pb7 = m7.getPublishBody();
- pb7.mandatory = true;
+ MessagePublishInfoImpl pb7 = (MessagePublishInfoImpl) (m7.getMessagePublishInfo());
+ pb7.setMandatory(true);
routeAndTest(m7,true);
Message m8 = new Message("Message8", "F0000");
- BasicPublishBody pb8 = m8.getPublishBody();
- pb8.mandatory = true;
+ MessagePublishInfoImpl pb8 = (MessagePublishInfoImpl)(m8.getMessagePublishInfo());
+ pb8.setMandatory(true);
routeAndTest(m8,false,q1);
@@ -88,10 +87,10 @@
bindDefault("F0000");
Message m1 = new Message("Message1", "XXXXX");
Message m2 = new Message("Message2", "F0000");
- BasicPublishBody pb1 = m1.getPublishBody();
- pb1.mandatory = true;
- BasicPublishBody pb2 = m2.getPublishBody();
- pb2.mandatory = true;
+ MessagePublishInfoImpl pb1 = (MessagePublishInfoImpl) (m1.getMessagePublishInfo());
+ pb1.setMandatory(true);
+ MessagePublishInfoImpl pb2 = (MessagePublishInfoImpl) (m2.getMessagePublishInfo());
+ pb2.setMandatory(true);
routeAndTest(m1,true);
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AMQQueueMBeanTest.java Tue Feb 20 08:20:41 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.virtualhost.VirtualHost;
@@ -164,20 +165,32 @@
}
}
- private AMQMessage message(boolean immediate) throws AMQException
+ private AMQMessage message(final boolean immediate) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- immediate,
- false,
- null,
- 0);
-
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
ContentHeaderBody contentHeaderBody = new ContentHeaderBody();
contentHeaderBody.bodySize = 1000; // in bytes
return new AMQMessage(_messageStore.getNewMessageId(), publish, _transactionalContext, contentHeaderBody);
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/AckTest.java Tue Feb 20 08:20:41 2007
@@ -27,6 +27,7 @@
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.RequiredDeliveryException;
import org.apache.qpid.server.ack.UnacknowledgedMessage;
@@ -98,15 +99,29 @@
{
// AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
// TODO: Establish some way to determine the version for the test.
- BasicPublishBody publishBody = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- new AMQShortString("someExchange"),
- false,
- false,
- new AMQShortString("rk"),
- 0);
+ MessagePublishInfo publishBody = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return new AMQShortString("someExchange");
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return new AMQShortString("rk");
+ }
+ };
AMQMessage msg = new AMQMessage(_messageStore.getNewMessageId(), publishBody, txnContext);
if (persistent)
{
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/queue/MessageTestHelper.java Tue Feb 20 08:20:41 2007
@@ -22,6 +22,8 @@
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.store.MessageStore;
import org.apache.qpid.server.store.SkeletonMessageStore;
import org.apache.qpid.server.store.StoreContext;
@@ -57,20 +59,32 @@
return message(false);
}
- AMQMessage message(boolean immediate) throws AMQException
+ AMQMessage message(final boolean immediate) throws AMQException
{
- // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
- // TODO: Establish some way to determine the version for the test.
- BasicPublishBody publish = new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- immediate,
- false,
- null,
- 0);
-
+ MessagePublishInfo publish = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return immediate;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
return new AMQMessage(_messageStore.getNewMessageId(), publish, _txnContext,
new ContentHeaderBody());
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/SkeletonMessageStore.java Tue Feb 20 08:20:41 2007
@@ -24,9 +24,12 @@
import org.apache.qpid.AMQException;
import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import org.apache.qpid.server.queue.AMQQueue;
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.exchange.Exchange;
import java.util.List;
import java.util.concurrent.atomic.AtomicLong;
@@ -56,6 +59,26 @@
{
}
+ public void createExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void removeExchange(Exchange exchange) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void bindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
+ public void unbindQueue(Exchange exchange, AMQShortString routingKey, AMQQueue queue, FieldTable args) throws AMQException
+ {
+ //To change body of implemented methods use File | Settings | File Templates.
+ }
+
public void createQueue(AMQQueue queue) throws AMQException
{
}
@@ -87,7 +110,7 @@
return _messageId.getAndIncrement();
}
- public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentBody contentBody, boolean lastContentBody) throws AMQException
+ public void storeContentBodyChunk(StoreContext sc, Long messageId, int index, ContentChunk contentBody, boolean lastContentBody) throws AMQException
{
}
@@ -102,7 +125,7 @@
return null;
}
- public ContentBody getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
+ public ContentChunk getContentBodyChunk(StoreContext s,Long messageId, int index) throws AMQException
{
return null;
}
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestReferenceCounting.java Tue Feb 20 08:20:41 2007
@@ -25,6 +25,8 @@
import org.apache.qpid.framing.BasicContentHeaderProperties;
import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
import org.apache.qpid.server.queue.AMQMessage;
import org.apache.qpid.server.queue.MessageHandleFactory;
import org.apache.qpid.server.txn.NonTransactionalContext;
@@ -50,16 +52,32 @@
public void testMessageGetsRemoved() throws AMQException
{
createPersistentContentHeader();
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0),
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(), info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
@@ -81,16 +99,33 @@
public void testMessageRemains() throws AMQException
{
- // TODO: fix hardcoded protocol version data
- AMQMessage message = new AMQMessage(_store.getNewMessageId(), new BasicPublishBody((byte)8,
- (byte)0,
- BasicPublishBody.getClazz((byte)8,(byte)0),
- BasicPublishBody.getMethod((byte)8,(byte)0),
- null,
- false,
- false,
- null,
- 0),
+
+ MessagePublishInfo info = new MessagePublishInfo()
+ {
+
+ public AMQShortString getExchange()
+ {
+ return null;
+ }
+
+ public boolean isImmediate()
+ {
+ return false;
+ }
+
+ public boolean isMandatory()
+ {
+ return false;
+ }
+
+ public AMQShortString getRoutingKey()
+ {
+ return null;
+ }
+ };
+
+ AMQMessage message = new AMQMessage(_store.getNewMessageId(),
+ info,
new NonTransactionalContext(_store, _storeContext, null, null, null),
createPersistentContentHeader());
message.incrementReference();
Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java?view=diff&rev=509628&r1=509627&r2=509628
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/store/TestableMemoryMessageStore.java Tue Feb 20 08:20:41 2007
@@ -22,6 +22,7 @@
import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.abstraction.ContentChunk;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -35,7 +36,7 @@
public TestableMemoryMessageStore()
{
_metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>();
- _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>();
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentChunk>>();
}
public ConcurrentMap<Long, MessageMetaData> getMessageMetaDataMap()
@@ -43,7 +44,7 @@
return _metaDataMap;
}
- public ConcurrentMap<Long, List<ContentBody>> getContentBodyMap()
+ public ConcurrentMap<Long, List<ContentChunk>> getContentBodyMap()
{
return _contentBodyMap;
}