You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2007/01/08 00:11:57 UTC
svn commit: r493872 [3/4] - in /incubator/qpid/trunk/qpid/java: broker/bin/
broker/src/main/java/org/apache/qpid/server/
broker/src/main/java/org/apache/qpid/server/ack/
broker/src/main/java/org/apache/qpid/server/exchange/
broker/src/main/java/org/apa...
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,70 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+
+/**
+ * Encapsulates a publish body and a content header. In the context of the message store these are treated as a
+ * single unit.
+ */
+public class MessageMetaData
+{
+ private BasicPublishBody _publishBody;
+
+ private ContentHeaderBody _contentHeaderBody;
+
+ private int _contentChunkCount;
+
+ public MessageMetaData(BasicPublishBody publishBody, ContentHeaderBody contentHeaderBody, int contentChunkCount)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ _publishBody = publishBody;
+ _contentChunkCount = contentChunkCount;
+ }
+
+ public int getContentChunkCount()
+ {
+ return _contentChunkCount;
+ }
+
+ public void setContentChunkCount(int contentChunkCount)
+ {
+ _contentChunkCount = contentChunkCount;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public void setPublishBody(BasicPublishBody publishBody)
+ {
+ _publishBody = publishBody;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/MessageMetaData.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/NoConsumersException.java Sun Jan 7 15:11:53 2007
@@ -20,13 +20,8 @@
*/
package org.apache.qpid.server.queue;
-import org.apache.qpid.server.RequiredDeliveryException;
-import org.apache.qpid.framing.ContentHeaderBody;
-import org.apache.qpid.framing.ContentBody;
-import org.apache.qpid.framing.BasicPublishBody;
import org.apache.qpid.protocol.AMQConstant;
-
-import java.util.List;
+import org.apache.qpid.server.RequiredDeliveryException;
/**
* Signals that no consumers exist for a message at a given point in time.
@@ -35,19 +30,9 @@
*/
public class NoConsumersException extends RequiredDeliveryException
{
- public NoConsumersException(String queue,
- BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
- {
- super("Immediate delivery to " + queue + " is not possible.", publishBody, contentHeaderBody, contentBodies);
- }
-
- public NoConsumersException(BasicPublishBody publishBody,
- ContentHeaderBody contentHeaderBody,
- List<ContentBody> contentBodies)
+ public NoConsumersException(AMQMessage message)
{
- super("Immediate delivery is not possible.", publishBody, contentHeaderBody, contentBodies);
+ super("Immediate delivery is not possible.", message);
}
public int getReplyCode()
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/Subscription.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -26,11 +26,11 @@
public interface Subscription
{
- void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException;
+ void send(AMQMessage msg, AMQQueue queue) throws AMQException;
boolean isSuspended();
- void queueDeleted(AMQQueue queue);
+ void queueDeleted(AMQQueue queue) throws AMQException;
boolean hasFilters();
@@ -44,5 +44,5 @@
void close();
- boolean isBrowser();
+ boolean isBrowser();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionImpl.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,18 +23,18 @@
import org.apache.log4j.Logger;
import org.apache.mina.common.ByteBuffer;
import org.apache.qpid.AMQException;
-import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.common.AMQPFilterTypes;
-import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
-import org.apache.qpid.framing.AMQDataBlock;
+import org.apache.qpid.common.ClientProperties;
import org.apache.qpid.framing.AMQFrame;
+import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.framing.BasicDeliverBody;
import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.BasicCancelOkBody;
import org.apache.qpid.server.AMQChannel;
import org.apache.qpid.server.filter.FilterManager;
import org.apache.qpid.server.filter.FilterManagerFactory;
import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.StoreContext;
+import org.apache.qpid.util.ConcurrentLinkedQueueAtomicSize;
import java.util.Queue;
@@ -201,7 +201,7 @@
* @param queue
* @throws AMQException
*/
- public void send(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ public void send(AMQMessage msg, AMQQueue queue) throws AMQException
{
if (msg != null)
{
@@ -211,7 +211,7 @@
}
else
{
- sendToConsumer(msg, queue);
+ sendToConsumer(channel.getStoreContext(), msg, queue);
}
}
else
@@ -220,7 +220,7 @@
}
}
- private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ private void sendToBrowser(AMQMessage msg, AMQQueue queue) throws AMQException
{
// We don't decrement the reference here as we don't want to consume the message
// but we do want to send it to the client.
@@ -235,14 +235,12 @@
{
channel.addUnacknowledgedBrowsedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
}
}
- private void sendToConsumer(AMQMessage msg, AMQQueue queue) throws FailedDequeueException
+ private void sendToConsumer(StoreContext storeContext, AMQMessage msg, AMQQueue queue)
+ throws AMQException
{
try
{
@@ -257,7 +255,11 @@
// the message is unacked, it will be lost.
if (!_acks)
{
- queue.dequeue(msg);
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("No ack mode so dequeuing message immediately: " + msg.getMessageId());
+ }
+ queue.dequeue(storeContext, msg);
}
synchronized(channel)
{
@@ -268,10 +270,7 @@
channel.addUnacknowledgedMessage(msg, deliveryTag, consumerTag, queue);
}
- ByteBuffer deliver = createEncodedDeliverFrame(deliveryTag, msg.getRoutingKey(), msg.getExchangeName());
- AMQDataBlock frame = msg.getDataBlock(deliver, channel.getChannelId());
-
- protocolSession.writeFrame(frame);
+ msg.writeDeliver(protocolSession, channel.getChannelId(), deliveryTag, consumerTag);
}
}
finally
@@ -290,7 +289,7 @@
*
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
channel.queueDeleted(queue);
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SubscriptionSet.java Sun Jan 7 15:11:53 2007
@@ -204,7 +204,7 @@
*
* @param queue
*/
- public void queueDeleted(AMQQueue queue)
+ public void queueDeleted(AMQQueue queue) throws AMQException
{
for (Subscription s : _subscriptions)
{
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/SynchronizedDeliveryManager.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.queue;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
import org.apache.log4j.Logger;
import java.util.LinkedList;
@@ -41,11 +42,13 @@
* Holds any queued messages
*/
private final Queue<AMQMessage> _messages = new LinkedList<AMQMessage>();
+
/**
* Ensures that only one asynchronous task is running for this manager at
* any time.
*/
private final AtomicBoolean _processing = new AtomicBoolean();
+
/**
* The subscriptions on the queue to whom messages are delivered
*/
@@ -70,9 +73,9 @@
_queue = queue;
}
- private synchronized boolean enqueue(AMQMessage msg)
+ private synchronized boolean enqueue(AMQMessage msg) throws AMQException
{
- if (msg.isImmediate())
+ if (msg.getPublishBody().immediate)
{
return false;
}
@@ -90,7 +93,7 @@
}
}
- private synchronized void startQueueing(AMQMessage msg)
+ private synchronized void startQueueing(AMQMessage msg) throws AMQException
{
_queueing = true;
enqueue(msg);
@@ -127,21 +130,21 @@
//no-op . This DM has no PreDeliveryQueues
}
- public synchronized void removeAMessageFromTop() throws AMQException
+ public synchronized void removeAMessageFromTop(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
if (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
}
}
- public synchronized void clearAllMessages() throws AMQException
+ public synchronized void clearAllMessages(StoreContext storeContext) throws AMQException
{
AMQMessage msg = poll();
while (msg != null)
{
- msg.dequeue(_queue);
+ msg.dequeue(storeContext, _queue);
msg = poll();
}
}
@@ -231,7 +234,7 @@
* @throws NoConsumersException if there are no active subscribers to deliver
* the message to
*/
- public void deliver(String name, AMQMessage msg) throws FailedDequeueException
+ public void deliver(StoreContext storeContext, String name, AMQMessage msg) throws FailedDequeueException, AMQException
{
// first check whether we are queueing, and enqueue if we are
if (!enqueue(msg))
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,118 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.AMQException;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Contains data that is only used in AMQMessage transiently, e.g. while the content
+ * body fragments are arriving.
+ *
+ * Having this data stored in a separate class means that the AMQMessage class avoids
+ * the small overhead of numerous guaranteed-null references.
+ *
+ * @author Apache Software Foundation
+ */
+public class TransientMessageData
+{
+ /**
+ * Stored temporarily until the header has been received at which point it is used when
+ * constructing the handle
+ */
+ private BasicPublishBody _publishBody;
+
+ /**
+ * Also stored temporarily.
+ */
+ private ContentHeaderBody _contentHeaderBody;
+
+ /**
+ * Keeps a track of how many bytes we have received in body frames
+ */
+ private long _bodyLengthReceived = 0;
+
+ /**
+ * This is stored during routing, to know the queues to which this message should immediately be
+ * delivered. It is <b>cleared after delivery has been attempted</b>. Any persistent record of destinations is done
+ * by the message handle.
+ */
+ private List<AMQQueue> _destinationQueues = new LinkedList<AMQQueue>();
+
+ public BasicPublishBody getPublishBody()
+ {
+ return _publishBody;
+ }
+
+ public void setPublishBody(BasicPublishBody publishBody)
+ {
+ _publishBody = publishBody;
+ }
+
+ public List<AMQQueue> getDestinationQueues()
+ {
+ return _destinationQueues;
+ }
+
+ public void setDestinationQueues(List<AMQQueue> destinationQueues)
+ {
+ _destinationQueues = destinationQueues;
+ }
+
+ public ContentHeaderBody getContentHeaderBody()
+ {
+ return _contentHeaderBody;
+ }
+
+ public void setContentHeaderBody(ContentHeaderBody contentHeaderBody)
+ {
+ _contentHeaderBody = contentHeaderBody;
+ }
+
+ public long getBodyLengthReceived()
+ {
+ return _bodyLengthReceived;
+ }
+
+ public void addBodyLength(int value)
+ {
+ _bodyLengthReceived += value;
+ }
+
+ public boolean isAllContentReceived() throws AMQException
+ {
+ return _bodyLengthReceived == _contentHeaderBody.bodySize;
+ }
+
+ public void addDestinationQueue(AMQQueue queue)
+ {
+ _destinationQueues.add(queue);
+ }
+
+ public boolean isPersistent()
+ {
+ //todo remove literal values to a constant file such as AMQConstants in common
+ return _contentHeaderBody.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) _contentHeaderBody.properties).getDeliveryMode() == 2;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/TransientMessageData.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,190 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.queue;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.framing.BasicContentHeaderProperties;
+import org.apache.qpid.framing.BasicPublishBody;
+import org.apache.qpid.framing.ContentBody;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.lang.ref.WeakReference;
+import java.util.ArrayList;
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class WeakReferenceMessageHandle implements AMQMessageHandle
+{
+ private WeakReference<ContentHeaderBody> _contentHeaderBody;
+
+ private WeakReference<BasicPublishBody> _publishBody;
+
+ private List<WeakReference<ContentBody>> _contentBodies;
+
+ private boolean _redelivered;
+
+ private final MessageStore _messageStore;
+
+ public WeakReferenceMessageHandle(MessageStore messageStore)
+ {
+ _messageStore = messageStore;
+ }
+
+ public ContentHeaderBody getContentHeaderBody(long messageId) throws AMQException
+ {
+ ContentHeaderBody chb = (_contentHeaderBody != null?_contentHeaderBody.get():null);
+ if (chb == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ chb = mmd.getContentHeaderBody();
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(chb);
+ _publishBody = new WeakReference<BasicPublishBody>(mmd.getPublishBody());
+ }
+ return chb;
+ }
+
+ public int getBodyCount(long messageId) throws AMQException
+ {
+ if (_contentBodies == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ int chunkCount = mmd.getContentChunkCount();
+ _contentBodies = new ArrayList<WeakReference<ContentBody>>(chunkCount);
+ for (int i = 0; i < chunkCount; i++)
+ {
+ _contentBodies.add(new WeakReference<ContentBody>(null));
+ }
+ }
+ return _contentBodies.size();
+ }
+
+ public long getBodySize(long messageId) throws AMQException
+ {
+ return getContentHeaderBody(messageId).bodySize;
+ }
+
+ public ContentBody getContentBody(long messageId, int index) throws AMQException, IllegalArgumentException
+ {
+ if (index > _contentBodies.size() - 1)
+ {
+ throw new IllegalArgumentException("Index " + index + " out of valid range 0 to " +
+ (_contentBodies.size() - 1));
+ }
+ WeakReference<ContentBody> wr = _contentBodies.get(index);
+ ContentBody cb = wr.get();
+ if (cb == null)
+ {
+ cb = _messageStore.getContentBodyChunk(messageId, index);
+ _contentBodies.set(index, new WeakReference<ContentBody>(cb));
+ }
+ return cb;
+ }
+
+ /**
+ * Content bodies are set <i>before</i> the publish and header frames
+ * @param storeContext
+ * @param messageId
+ * @param contentBody
+ * @throws AMQException
+ */
+ public void addContentBodyFrame(StoreContext storeContext, long messageId, ContentBody contentBody) throws AMQException
+ {
+ if (_contentBodies == null)
+ {
+ _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ }
+ _contentBodies.add(new WeakReference<ContentBody>(contentBody));
+ _messageStore.storeContentBodyChunk(storeContext, messageId, _contentBodies.size() - 1, contentBody);
+ }
+
+ public BasicPublishBody getPublishBody(long messageId) throws AMQException
+ {
+ BasicPublishBody bpb = (_publishBody != null?_publishBody.get():null);
+ if (bpb == null)
+ {
+ MessageMetaData mmd = _messageStore.getMessageMetaData(messageId);
+ bpb = mmd.getPublishBody();
+ _publishBody = new WeakReference<BasicPublishBody>(bpb);
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(mmd.getContentHeaderBody());
+ }
+ return bpb;
+ }
+
+ public boolean isRedelivered()
+ {
+ return _redelivered;
+ }
+
+ public void setRedelivered(boolean redelivered)
+ {
+ _redelivered = redelivered;
+ }
+
+ public boolean isPersistent(long messageId) throws AMQException
+ {
+ //todo remove literal values to a constant file such as AMQConstants in common
+ ContentHeaderBody chb = getContentHeaderBody(messageId);
+ return chb.properties instanceof BasicContentHeaderProperties &&
+ ((BasicContentHeaderProperties) chb.properties).getDeliveryMode() == 2;
+ }
+
+ /**
+ * This is called when all the content has been received.
+ * @param publishBody
+ * @param contentHeaderBody
+ * @throws AMQException
+ */
+ public void setPublishAndContentHeaderBody(StoreContext storeContext, long messageId, BasicPublishBody publishBody,
+ ContentHeaderBody contentHeaderBody)
+ throws AMQException
+ {
+ // if there are no content bodies the list will be null so we must
+ // create en empty list here
+ if (contentHeaderBody.bodySize == 0)
+ {
+ _contentBodies = new LinkedList<WeakReference<ContentBody>>();
+ }
+ _messageStore.storeMessageMetaData(storeContext, messageId, new MessageMetaData(publishBody, contentHeaderBody,
+ _contentBodies.size()));
+ _publishBody = new WeakReference<BasicPublishBody>(publishBody);
+ _contentHeaderBody = new WeakReference<ContentHeaderBody>(contentHeaderBody);
+ }
+
+ public void removeMessage(StoreContext storeContext, long messageId) throws AMQException
+ {
+ _messageStore.removeMessage(storeContext, messageId);
+ }
+
+ public void enqueue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ {
+ _messageStore.enqueueMessage(storeContext, queue.getName(), messageId);
+ }
+
+ public void dequeue(StoreContext storeContext, long messageId, AMQQueue queue) throws AMQException
+ {
+ _messageStore.dequeueMessage(storeContext, queue.getName(), messageId);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/WeakReferenceMessageHandle.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MemoryMessageStore.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -23,10 +23,12 @@
import org.apache.commons.configuration.Configuration;
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
+import java.util.ArrayList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -43,21 +45,25 @@
private static final String HASHTABLE_CAPACITY_CONFIG = "hashtable-capacity";
- protected ConcurrentMap<Long, AMQMessage> _messageMap;
+ protected ConcurrentMap<Long, MessageMetaData> _metaDataMap;
+
+ protected ConcurrentMap<Long, List<ContentBody>> _contentBodyMap;
private final AtomicLong _messageId = new AtomicLong(1);
public void configure()
{
- _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash table");
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>(DEFAULT_HASHTABLE_CAPACITY);
+ _log.info("Using capacity " + DEFAULT_HASHTABLE_CAPACITY + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(DEFAULT_HASHTABLE_CAPACITY);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(DEFAULT_HASHTABLE_CAPACITY);
}
public void configure(String base, Configuration config)
{
int hashtableCapacity = config.getInt(base + "." + HASHTABLE_CAPACITY_CONFIG, DEFAULT_HASHTABLE_CAPACITY);
- _log.info("Using capacity " + hashtableCapacity + " for hash table");
- _messageMap = new ConcurrentHashMap<Long, AMQMessage>(hashtableCapacity);
+ _log.info("Using capacity " + hashtableCapacity + " for hash tables");
+ _metaDataMap = new ConcurrentHashMap<Long, MessageMetaData>(hashtableCapacity);
+ _contentBodyMap = new ConcurrentHashMap<Long, List<ContentBody>>(hashtableCapacity);
}
public void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception
@@ -67,70 +73,71 @@
public void close() throws Exception
{
- if (_messageMap != null)
+ if (_metaDataMap != null)
{
- _messageMap.clear();
- _messageMap = null;
+ _metaDataMap.clear();
+ _metaDataMap = null;
+ }
+ if (_contentBodyMap != null)
+ {
+ _contentBodyMap.clear();
+ _contentBodyMap = null;
}
}
- public void put(AMQMessage msg)
- {
- _messageMap.put(msg.getMessageId(), msg);
- }
-
- public void removeMessage(long messageId)
+ public void removeMessage(StoreContext context, long messageId)
{
if (_log.isDebugEnabled())
{
_log.debug("Removing message with id " + messageId);
}
- _messageMap.remove(messageId);
+ _metaDataMap.remove(messageId);
+ _contentBodyMap.remove(messageId);
}
public void createQueue(AMQQueue queue) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
public void removeQueue(String name) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void enqueueMessage(String name, long messageId) throws AMQException
+ public void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void dequeueMessage(String name, long messageId) throws AMQException
+ public void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void beginTran() throws AMQException
+ public void beginTran(StoreContext context) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void commitTran() throws AMQException
+ public void commitTran(StoreContext context) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public void abortTran() throws AMQException
+ public void abortTran(StoreContext context) throws AMQException
{
- //To change body of implemented methods use File | Settings | File Templates.
+ // Not required to do anything
}
- public boolean inTran()
+ public boolean inTran(StoreContext context)
{
return false;
}
public List<AMQQueue> createQueues() throws AMQException
{
- return null; //To change body of implemented methods use File | Settings | File Templates.
+ return null;
}
public long getNewMessageId()
@@ -138,8 +145,33 @@
return _messageId.getAndIncrement();
}
- public AMQMessage getMessage(long messageId)
+ public void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody)
+ throws AMQException
+ {
+ List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ if (bodyList == null)
+ {
+ bodyList = new ArrayList<ContentBody>();
+ _contentBodyMap.put(messageId, bodyList);
+ }
+
+ bodyList.add(index, contentBody);
+ }
+
+ public void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData)
+ throws AMQException
+ {
+ _metaDataMap.put(messageId, messageMetaData);
+ }
+
+ public MessageMetaData getMessageMetaData(long messageId) throws AMQException
+ {
+ return _metaDataMap.get(messageId);
+ }
+
+ public ContentBody getContentBodyChunk(long messageId, int index) throws AMQException
{
- return _messageMap.get(messageId);
+ List<ContentBody> bodyList = _contentBodyMap.get(messageId);
+ return bodyList.get(index);
}
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/MessageStore.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,8 +22,9 @@
import org.apache.commons.configuration.Configuration;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.framing.ContentBody;
import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.MessageMetaData;
import org.apache.qpid.server.queue.QueueRegistry;
import java.util.List;
@@ -37,34 +38,33 @@
* @param base the base element identifier from which all configuration items are relative. For example, if the base
* element is "store", the all elements used by concrete classes will be "store.foo" etc.
* @param config the apache commons configuration object
+ * @throws Exception if an error occurs that means the store is unable to configure itself
*/
void configure(QueueRegistry queueRegistry, String base, Configuration config) throws Exception;
/**
* Called to close and cleanup any resources used by the message store.
- * @throws Exception
+ * @throws Exception if close fails
*/
void close() throws Exception;
- void put(AMQMessage msg) throws AMQException;
-
- void removeMessage(long messageId) throws AMQException;
+ void removeMessage(StoreContext storeContext, long messageId) throws AMQException;
void createQueue(AMQQueue queue) throws AMQException;
void removeQueue(String name) throws AMQException;
- void enqueueMessage(String name, long messageId) throws AMQException;
+ void enqueueMessage(StoreContext context, String name, long messageId) throws AMQException;
- void dequeueMessage(String name, long messageId) throws AMQException;
+ void dequeueMessage(StoreContext context, String name, long messageId) throws AMQException;
- void beginTran() throws AMQException;
+ void beginTran(StoreContext context) throws AMQException;
- void commitTran() throws AMQException;
+ void commitTran(StoreContext context) throws AMQException;
- void abortTran() throws AMQException;
+ void abortTran(StoreContext context) throws AMQException;
- boolean inTran();
+ boolean inTran(StoreContext context);
/**
* Recreate all queues that were persisted, including re-enqueuing of existing messages
@@ -78,6 +78,13 @@
* @return a message id
*/
long getNewMessageId();
-}
+ void storeContentBodyChunk(StoreContext context, long messageId, int index, ContentBody contentBody) throws AMQException;
+ void storeMessageMetaData(StoreContext context, long messageId, MessageMetaData messageMetaData) throws AMQException;
+
+ MessageMetaData getMessageMetaData(long messageId) throws AMQException;
+
+ ContentBody getContentBodyChunk(long messageId, int index) throws AMQException;
+
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.store;
+
+/**
+ * A context that the store can use to associate with a transactional context. For example, it could store
+ * some kind of txn id.
+ *
+ * @author Apache Software Foundation
+ */
+public class StoreContext
+{
+ private Object _payload;
+
+ public Object getPayload()
+ {
+ return _payload;
+ }
+
+ public void setPayload(Object payload)
+ {
+ _payload = payload;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/store/StoreContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,91 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.List;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class CleanupMessageOperation implements TxnOp
+{
+ private static final Logger _log = Logger.getLogger(CleanupMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final List<RequiredDeliveryException> _returns;
+
+ public CleanupMessageOperation(AMQMessage msg, List<RequiredDeliveryException> returns)
+ {
+ _msg = msg;
+ _returns = returns;
+ }
+
+ public void prepare(StoreContext context) throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ //don't need to do anything here, if the store's txn failed
+ //when processing prepare then the message was not stored
+ //or enqueued on any queues and can be discarded
+ }
+
+ public void commit(StoreContext context)
+ {
+ //The routers reference can now be released. This is done
+ //here to ensure that it happens after the queues that
+ //enqueue it have incremented their counts (which as a
+ //memory only operation is done in the commit phase).
+ try
+ {
+ _msg.decrementReference(context);
+ }
+ catch (AMQException e)
+ {
+ _log.error("On commiting transaction, failed to cleanup unused message: " + e, e);
+ }
+ try
+ {
+ _msg.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ //TODO: store this for delivery after the commit-ok
+ _returns.add(e);
+ }
+ catch (AMQException e)
+ {
+ _log.error("On commiting transaction, unable to determine whether delivered to a consumer immediately: " +
+ e, e);
+ }
+ }
+
+ public void rollback(StoreContext context)
+ {
+ // NO OP
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/CleanupMessageOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,74 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public class DeliverMessageOperation implements TxnOp
+{
+ private static final Logger _logger = Logger.getLogger(DeliverMessageOperation.class);
+
+ private final AMQMessage _msg;
+
+ private final AMQQueue _queue;
+
+ public DeliverMessageOperation(AMQMessage msg, AMQQueue queue)
+ {
+ _msg = msg;
+ _queue = queue;
+ _msg.incrementReference();
+ }
+
+ public void prepare(StoreContext context) throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit(StoreContext context)
+ {
+ //do the memeory part of the record()
+ _msg.incrementReference();
+ //then process the message
+ try
+ {
+ _queue.process(context, _msg);
+ }
+ catch (AMQException e)
+ {
+ //TODO: is there anything else we can do here? I think not...
+ _logger.error("Error during commit of a queue delivery: " + e, e);
+ }
+ }
+
+ public void rollback(StoreContext storeContext)
+ {
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/DeliverMessageOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,148 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.TxAck;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.List;
+
+/**
+ * A transactional context that only supports local transactions.
+ */
+public class LocalTransactionalContext implements TransactionalContext
+{
+ private final TxnBuffer _txnBuffer;
+
+ /**
+ * We keep hold of the ack operation so that we can consolidate acks, i.e. multiple acks within a txn are
+ * consolidated into a single operation
+ */
+ private TxAck _ackOp;
+
+ private List<RequiredDeliveryException> _returnMessages;
+
+ private final MessageStore _messageStore;
+
+ private final StoreContext _storeContext;
+
+ private boolean _inTran = false;
+
+ public LocalTransactionalContext(MessageStore messageStore, StoreContext storeContext,
+ TxnBuffer txnBuffer, List<RequiredDeliveryException> returnMessages)
+ {
+ _messageStore = messageStore;
+ _storeContext = storeContext;
+ _txnBuffer = txnBuffer;
+ _returnMessages = returnMessages;
+ _txnBuffer.enlist(new StoreMessageOperation(messageStore));
+ }
+
+ public void rollback() throws AMQException
+ {
+ _txnBuffer.rollback(_storeContext);
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ // A publication will result in the enlisting of several
+ // TxnOps. The first is an op that will store the message.
+ // Following that (and ordering is important), an op will
+ // be added for every queue onto which the message is
+ // enqueued. Finally a cleanup op will be added to decrement
+ // the reference associated with the routing.
+
+ _txnBuffer.enlist(new DeliverMessageOperation(message, queue));
+ _txnBuffer.enlist(new CleanupMessageOperation(message, _returnMessages));
+ }
+
+ private void checkAck(long deliveryTag, UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Ack with delivery tag " + deliveryTag + " not known for channel");
+ }
+ }
+
+ public void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException
+ {
+ //check that the tag exists to give early failure
+ if (!multiple || deliveryTag > 0)
+ {
+ checkAck(deliveryTag, unacknowledgedMessageMap);
+ }
+ //we use a single txn op for all acks and update this op
+ //as new acks come in. If this is the first ack in the txn
+ //we will need to create and enlist the op.
+ if (_ackOp == null)
+ {
+ _ackOp = new TxAck(unacknowledgedMessageMap);
+ _txnBuffer.enlist(_ackOp);
+ }
+ // update the op to include this ack request
+ if (multiple && deliveryTag == 0)
+ {
+ // if have signalled to ack all, that refers only
+ // to all at this time
+ _ackOp.update(lastDeliveryTag, multiple);
+ }
+ else
+ {
+ _ackOp.update(deliveryTag, multiple);
+ }
+ }
+
+ public void messageFullyReceived(boolean persistent) throws AMQException
+ {
+ // Not required in this transactional context
+ }
+
+ public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
+ {
+ // Not required in this transactional context
+ }
+
+ public void beginTranIfNecessary() throws AMQException
+ {
+ if (!_inTran)
+ {
+ _messageStore.beginTran(_storeContext);
+ _inTran = true;
+ }
+ }
+
+ public void commit() throws AMQException
+ {
+ if (_ackOp != null)
+ {
+ _ackOp.consolidate();
+ //already enlisted, after commit will reset regardless of outcome
+ _ackOp = null;
+ }
+
+ _txnBuffer.commit(_storeContext);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/LocalTransactionalContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,208 @@
+/*
+ *
+ * Copyright (c) 2006 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.AMQChannel;
+import org.apache.qpid.server.RequiredDeliveryException;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+import org.apache.qpid.server.ack.UnacknowledgedMessage;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.queue.NoConsumersException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Set;
+
+/**
+ * @author Apache Software Foundation
+ */
+public class NonTransactionalContext implements TransactionalContext
+{
+ private static final Logger _log = Logger.getLogger(NonTransactionalContext.class);
+
+ /**
+ * Channel is useful for logging
+ */
+ private final AMQChannel _channel;
+
+ /**
+ * Where to put undeliverable messages
+ */
+ private final List<RequiredDeliveryException> _returnMessages;
+
+ private Set<Long> _browsedAcks;
+
+ private final MessageStore _messageStore;
+
+ private StoreContext _storeContext;
+
+ /**
+ * Whether we are in a transaction
+ */
+ private boolean _inTran;
+
+ public NonTransactionalContext(MessageStore messageStore, StoreContext storeContext, AMQChannel channel,
+ List<RequiredDeliveryException> returnMessages, Set<Long> browsedAcks)
+ {
+ _channel = channel;
+ _storeContext = storeContext;
+ _returnMessages = returnMessages;
+ _messageStore = messageStore;
+ _browsedAcks = browsedAcks;
+ }
+
+ public void beginTranIfNecessary() throws AMQException
+ {
+ if (!_inTran)
+ {
+ _messageStore.beginTran(_storeContext);
+ _inTran = true;
+ }
+ }
+
+ public void commit() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void rollback() throws AMQException
+ {
+ // Does not apply to this context
+ }
+
+ public void deliver(AMQMessage message, AMQQueue queue) throws AMQException
+ {
+ try
+ {
+ message.incrementReference();
+ queue.process(_storeContext, message);
+ //following check implements the functionality
+ //required by the 'immediate' flag:
+ message.checkDeliveredToConsumer();
+ }
+ catch (NoConsumersException e)
+ {
+ _returnMessages.add(e);
+ }
+ }
+
+ public void acknowledgeMessage(final long deliveryTag, long lastDeliveryTag,
+ boolean multiple, final UnacknowledgedMessageMap unacknowledgedMessageMap)
+ throws AMQException
+ {
+ if (multiple)
+ {
+ if (deliveryTag == 0)
+ {
+
+ //Spec 2.1.6.11 ... If the multiple field is 1, and the delivery tag is zero,
+ // tells the server to acknowledge all outstanding mesages.
+ _log.info("Multiple ack on delivery tag 0. ACKing all messages. Current count:" +
+ unacknowledgedMessageMap.size());
+ unacknowledgedMessageMap.visit(new UnacknowledgedMessageMap.Visitor()
+ {
+ public boolean callback(UnacknowledgedMessage message) throws AMQException
+ {
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + message.message.getMessageId());
+ }
+ message.discard(_storeContext);
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+ return false;
+ }
+
+ public void visitComplete()
+ {
+ unacknowledgedMessageMap.clear();
+ }
+ });
+ }
+ else
+ {
+ if (!unacknowledgedMessageMap.contains(deliveryTag))
+ {
+ throw new AMQException("Multiple ack on delivery tag " + deliveryTag + " not known for channel");
+ }
+
+ LinkedList<UnacknowledgedMessage> acked = new LinkedList<UnacknowledgedMessage>();
+ unacknowledgedMessageMap.drainTo(acked, deliveryTag);
+ for (UnacknowledgedMessage msg : acked)
+ {
+ if (!_browsedAcks.contains(deliveryTag))
+ {
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Discarding message: " + msg.message.getMessageId());
+ }
+ msg.discard(_storeContext);
+ }
+ else
+ {
+ _browsedAcks.remove(deliveryTag);
+ }
+ }
+ }
+ }
+ else
+ {
+ UnacknowledgedMessage msg;
+ msg = unacknowledgedMessageMap.remove(deliveryTag);
+
+ if (msg == null)
+ {
+ _log.info("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId());
+ throw new AMQException("Single ack on delivery tag " + deliveryTag + " not known for channel:" +
+ _channel.getChannelId());
+ }
+ msg.discard(_storeContext);
+ if (_log.isDebugEnabled())
+ {
+ _log.debug("Received non-multiple ack for messaging with delivery tag " + deliveryTag + " msg id " +
+ msg.message.getMessageId());
+ }
+ }
+ }
+
+ public void messageFullyReceived(boolean persistent) throws AMQException
+ {
+ if (persistent)
+ {
+ _messageStore.commitTran(_storeContext);
+ _inTran = false;
+ }
+ }
+
+ public void messageProcessed(AMQProtocolSession protocolSession) throws AMQException
+ {
+ _channel.processReturns(protocolSession);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,58 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
+
+/**
+ * A transactional operation to store messages in an underlying persistent store. When this operation
+ * commits it will do everything to ensure that all messages are safely committed to persistent
+ * storage.
+ */
+public class StoreMessageOperation implements TxnOp
+{
+ private final MessageStore _messsageStore;
+
+ public StoreMessageOperation(MessageStore messageStore)
+ {
+ _messsageStore = messageStore;
+ }
+
+ public void prepare(StoreContext context) throws AMQException
+ {
+ }
+
+ public void undoPrepare()
+ {
+ }
+
+ public void commit(StoreContext context) throws AMQException
+ {
+ _messsageStore.commitTran(context);
+ }
+
+ public void rollback(StoreContext context) throws AMQException
+ {
+ _messsageStore.abortTran(context);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/StoreMessageOperation.java
------------------------------------------------------------------------------
svn:eol-style = native
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java?view=auto&rev=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java Sun Jan 7 15:11:53 2007
@@ -0,0 +1,48 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.server.ack.UnacknowledgedMessageMap;
+import org.apache.qpid.server.queue.AMQMessage;
+import org.apache.qpid.server.queue.AMQQueue;
+import org.apache.qpid.server.protocol.AMQProtocolSession;
+
+/**
+ * @author Robert Greig (robert.j.greig@jpmorgan.com)
+ */
+public interface TransactionalContext
+{
+ void beginTranIfNecessary() throws AMQException;
+
+ void commit() throws AMQException;
+
+ void rollback() throws AMQException;
+
+ void deliver(AMQMessage message, AMQQueue queue) throws AMQException;
+
+ void acknowledgeMessage(long deliveryTag, long lastDeliveryTag, boolean multiple,
+ UnacknowledgedMessageMap unacknowledgedMessageMap) throws AMQException;
+
+ void messageFullyReceived(boolean persistent) throws AMQException;
+
+ void messageProcessed(AMQProtocolSession protocolSession) throws AMQException;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TransactionalContext.java
------------------------------------------------------------------------------
svn:eol-style = native
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnBuffer.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -22,91 +22,63 @@
import org.apache.log4j.Logger;
import org.apache.qpid.AMQException;
-import org.apache.qpid.server.store.MessageStore;
+import org.apache.qpid.server.store.StoreContext;
import java.util.ArrayList;
import java.util.List;
/**
* Holds a list of TxnOp instance representing transactional
- * operations.
+ * operations.
*/
public class TxnBuffer
{
- private boolean _containsPersistentChanges = false;
- private final MessageStore _store;
private final List<TxnOp> _ops = new ArrayList<TxnOp>();
private static final Logger _log = Logger.getLogger(TxnBuffer.class);
- public TxnBuffer(MessageStore store)
- {
- _store = store;
- }
-
- public void containsPersistentChanges()
+ public TxnBuffer()
{
- _containsPersistentChanges = true;
}
- public void commit() throws AMQException
+ public void commit(StoreContext context) throws AMQException
{
- if (_containsPersistentChanges)
+ if (prepare(context))
{
- _log.debug("Begin Transaction.");
- _store.beginTran();
- if(prepare())
+ for (TxnOp op : _ops)
{
- _log.debug("Transaction Succeeded");
- _store.commitTran();
- for (TxnOp op : _ops)
- {
- op.commit();
- }
+ op.commit(context);
}
- else
- {
- _log.debug("Transaction Failed");
- _store.abortTran();
- }
- }else{
- if(prepare())
- {
- for (TxnOp op : _ops)
- {
- op.commit();
- }
- }
}
_ops.clear();
}
- private boolean prepare()
- {
+ private boolean prepare(StoreContext context)
+ {
for (int i = 0; i < _ops.size(); i++)
{
TxnOp op = _ops.get(i);
try
{
- op.prepare();
+ op.prepare(context);
}
- catch(Exception e)
+ catch (Exception e)
{
//compensate previously prepared ops
for(int j = 0; j < i; j++)
{
_ops.get(j).undoPrepare();
- }
+ }
return false;
}
}
return true;
- }
+ }
- public void rollback() throws AMQException
+ public void rollback(StoreContext context) throws AMQException
{
for (TxnOp op : _ops)
{
- op.rollback();
+ op.rollback(context);
}
_ops.clear();
}
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/TxnOp.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -21,6 +21,7 @@
package org.apache.qpid.server.txn;
import org.apache.qpid.AMQException;
+import org.apache.qpid.server.store.StoreContext;
/**
* This provides the abstraction of an individual operation within a
@@ -29,14 +30,14 @@
public interface TxnOp
{
/**
- * Do the part of the operation that updates persistent state
+ * Do the part of the operation that updates persistent state
*/
- public void prepare() throws AMQException;
+ public void prepare(StoreContext context) throws AMQException;
/**
* Complete the operation started by prepare. Can now update in
* memory state or make netork transfers.
*/
- public void commit();
+ public void commit(StoreContext context) throws AMQException;
/**
* This is not the same as rollback. Unfortunately the use of an
* in memory reference count as a locking mechanism and a test for
@@ -50,5 +51,5 @@
/**
* Rolls back the operation.
*/
- public void rollback();
+ public void rollback(StoreContext context) throws AMQException;
}
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java Sun Jan 7 15:11:53 2007
@@ -56,7 +56,7 @@
}
catch (FailoverException e)
{
- _log.info("Failover exception caught during operation");
+ _log.info("Failover exception caught during operation: " + e, e);
}
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/codec/BasicDeliverTest.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -254,7 +254,7 @@
static BasicDeliverBody createBasicDeliverBody()
{
- BasicDeliverBody body = new BasicDeliverBody();
+ BasicDeliverBody body = new BasicDeliverBody((byte)8, (byte)0);
body.consumerTag = "myConsumerTag";
body.deliveryTag = 1;
body.exchange = "myExchange";
Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/framing/FieldTableTest.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,6 +20,7 @@
*/
package org.apache.qpid.framing;
+import junit.framework.TestCase;
import org.apache.mina.common.ByteBuffer;
import java.io.BufferedReader;
@@ -29,8 +30,6 @@
import java.util.Enumeration;
import java.util.Properties;
-import junit.framework.TestCase;
-
public class FieldTableTest extends TestCase
{
@@ -47,7 +46,7 @@
EncodingUtils.encodedLongStringLength(value);
assertEquals(table.getEncodedSize(), size);
-
+
key = "Integer";
Integer number = new Integer(60);
table.put(key, number);
@@ -87,7 +86,7 @@
doTestEncoding(load("FieldTableTest2.properties"));
}
*/
- void doTestEncoding(FieldTable table) throws AMQFrameDecodingException
+ void doTestEncoding(FieldTable table) throws AMQFrameDecodingException, AMQProtocolVersionException
{
assertEquivalent(table, encodeThenDecode(table));
}
@@ -102,7 +101,7 @@
}
}
- FieldTable encodeThenDecode(FieldTable table) throws AMQFrameDecodingException
+ FieldTable encodeThenDecode(FieldTable table) throws AMQFrameDecodingException, AMQProtocolVersionException
{
ContentHeaderBody header = new ContentHeaderBody();
header.classId = 6;
Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/requestreply1/VmRequestReply.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,12 +20,9 @@
*/
package org.apache.qpid.requestreply1;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.test.VMBrokerSetup;
-import org.apache.log4j.Logger;
-
import junit.framework.TestCase;
+import org.apache.log4j.Logger;
+import org.apache.qpid.testutil.VMBrokerSetup;
public class VmRequestReply extends TestCase
{
Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/test/unit/jndi/referenceabletest/JNDIReferenceableTest.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,14 +20,11 @@
*/
package org.apache.qpid.test.unit.jndi.referenceabletest;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
-import org.apache.qpid.test.VMBrokerSetup;
+import junit.framework.TestCase;
+import org.apache.qpid.testutil.VMBrokerSetup;
import javax.naming.NameAlreadyBoundException;
import javax.naming.NoInitialContextException;
-
-import junit.framework.TestCase;
/**
* Usage: To run these you need to have the sun JNDI SPI for the FileSystem.
Modified: incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceRequestingClient.java Sun Jan 7 15:11:53 2007
@@ -7,9 +7,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -20,20 +20,13 @@
*/
package org.apache.qpid.weblogic;
-import org.apache.qpid.jms.*;
import org.apache.log4j.Logger;
-import javax.naming.NamingException;
-import javax.naming.InitialContext;
-import javax.naming.Context;
import javax.jms.*;
-import javax.jms.MessageConsumer;
-import javax.jms.Session;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
import java.util.Hashtable;
-import java.io.File;
-import java.io.FilenameFilter;
-import java.io.Reader;
-import java.io.FileReader;
/**
* Created by IntelliJ IDEA.
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/AMQConnectionTest.java Sun Jan 7 15:11:53 2007
@@ -24,7 +24,7 @@
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
import org.apache.qpid.client.AMQSession;
-import org.apache.qpid.testutil.VMBrokerSetup;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.*;
@@ -41,6 +41,7 @@
protected void setUp() throws Exception
{
super.setUp();
+ TransportConnection.createVMBroker(1);
_connection = new AMQConnection("vm://:1", "guest", "guest", "fred", "/test");
_topic = new AMQTopic("mytopic");
_queue = new AMQQueue("myqueue");
@@ -48,6 +49,7 @@
protected void tearDown() throws Exception
{
+ super.tearDown();
try
{
_connection.close();
@@ -55,8 +57,8 @@
catch (JMSException e)
{
//ignore
- }
- super.tearDown();
+ }
+ TransportConnection.killAllVMBrokers();
}
/**
@@ -195,6 +197,6 @@
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(AMQConnectionTest.class));
+ return new junit.framework.TestSuite(AMQConnectionTest.class);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseOkTest.java Sun Jan 7 15:11:53 2007
@@ -20,17 +20,17 @@
*/
package org.apache.qpid.test.unit.client.channelclose;
+import junit.framework.TestCase;
+import junit.textui.TestRunner;
+import org.apache.log4j.Logger;
import org.apache.qpid.client.AMQConnection;
import org.apache.qpid.client.AMQQueue;
-import org.apache.qpid.testutil.VMBrokerSetup;
-import org.apache.log4j.Logger;
+import org.apache.qpid.client.transport.TransportConnection;
import javax.jms.*;
import java.util.ArrayList;
import java.util.List;
-import junit.framework.TestCase;
-import junit.textui.TestRunner;
/**
* Due to bizarre exception handling all sessions are closed if you get
@@ -64,6 +64,7 @@
{
super.setUp();
+ TransportConnection.createVMBroker(1);
_connection = new AMQConnection(_connectionString, "guest", "guest", randomize("Client"), "/test_path");
_destination1 = new AMQQueue("q1", true);
@@ -192,7 +193,15 @@
{
while (received.size() < count)
{
- received.wait();
+ try
+ {
+ received.wait();
+ }
+ catch (InterruptedException e)
+ {
+ _log.info("Interrupted: " + e);
+ throw e;
+ }
}
}
}
@@ -209,6 +218,6 @@
public static junit.framework.Test suite()
{
- return new VMBrokerSetup(new junit.framework.TestSuite(ChannelCloseOkTest.class));
+ return new junit.framework.TestSuite(ChannelCloseOkTest.class);
}
}
Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=493872&r1=493871&r2=493872
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Sun Jan 7 15:11:53 2007
@@ -1,11 +1,23 @@
-/**
- * User: Robert Greig
- * Date: 12-Dec-2006
- ******************************************************************************
- * (c) Copyright JP Morgan Chase Ltd 2006. All rights reserved. No part of
- * this program may be photocopied reproduced or translated to another
- * program language without prior written consent of JP Morgan Chase Ltd
- ******************************************************************************/
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
package org.apache.qpid.test.unit.message;
import junit.framework.TestCase;