You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by gs...@apache.org on 2007/05/22 10:49:56 UTC
svn commit: r540493 [2/2] - in
/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server:
./ messageStore/ queue/ txn/ util/
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/MessageStore.java Tue May 22 01:49:53 2007
@@ -1,270 +1,300 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing,
- * software distributed under the License is distributed on an
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
- * KIND, either express or implied. See the License for the
- * specific language governing permissions and limitations
- * under the License.
- */
-package org.apache.qpid.server.messageStore;
-
-import org.apache.qpid.server.exception.*;
-import org.apache.qpid.server.virtualhost.VirtualHost;
-import org.apache.qpid.server.txn.TransactionManager;
-import org.apache.qpid.server.exchange.Exchange;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.commons.configuration.Configuration;
-
-import javax.transaction.xa.Xid;
-import java.util.Collection;
-
-/**
- * Created by Arnaud Simon
- * Date: 29-Mar-2007
- * Time: 17:34:02
- */
-public interface MessageStore
-{
- /**
- * Create a new exchange
- *
- * @param exchange the exchange to be persisted
- * @throws InternalErrorException If an error occurs
- */
- public void createExchange(Exchange exchange)
- throws
- InternalErrorException;
-
- /**
- * Remove an exchange
- * @param exchange The exchange to be removed
- * @throws InternalErrorException If an error occurs
- */
- public void removeExchange(Exchange exchange) throws
- InternalErrorException;
-
- /**
- * Bind a queue with an exchange given a routing key
- *
- * @param exchange The exchange to bind the queue with
- * @param routingKey The routing key
- * @param queue The queue to be bound
- * @param args Args
- * @throws InternalErrorException If an error occurs
- */
- public void bindQueue(Exchange exchange,
- AMQShortString routingKey,
- StorableQueue queue, FieldTable args)
- throws
- InternalErrorException;
-
- /**
- * Unbind a queue from an exchange
- *
- * @param exchange The exchange the queue was bound to
- * @param routingKey The routing queue
- * @param queue The queue to unbind
- * @param args args
- * @throws InternalErrorException If an error occurs
- */
- public void unbindQueue(Exchange exchange,
- AMQShortString routingKey,
- StorableQueue queue, FieldTable args)
- throws
- InternalErrorException;
-
- /**
- * Called after instantiation in order to configure the message store. A particular implementation can define
- * whatever parameters it wants.
- *
- * @param virtualHost The virtual host using by this store
- * @param tm The transaction manager implementation
- * @param base The base element identifier from which all configuration items are relative. For example, if the base
- * element is "store", the all elements used by concrete classes will be "store.foo" etc.
- * @param config The apache commons configuration object
- * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
- * @throws IllegalArgumentException If the configuration arguments are illegal
- */
- void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
- throws
- InternalErrorException,
- IllegalArgumentException;
-
- /**
- * Called to close and cleanup any resources used by the message store.
- *
- * @throws InternalErrorException if close fails
- */
- void close()
- throws
- InternalErrorException;
-
- /**
- * Create a queue
- *
- * @param queue the queue to be created
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueAlreadyExistsException If the queue already exists in the store
- */
- public void createQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueAlreadyExistsException;
-
- /**
- * Destroy a queue
- *
- * @param queue The queue to be destroyed
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- */
- public void destroyQueue(StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException;
-
- /**
- * Stage the message before effective enqueue
- *
- * @param m The message to stage
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageAlreadyStagedException If the message is already staged
- */
- public void stage(StorableMessage m)
- throws
- InternalErrorException,
- MessageAlreadyStagedException;
-
-
- /**
- * Append more data with a previously staged message
- *
- * @param m The message to which data must be appended
- * @param data Data to happen to the message
- * @param offset The number of bytes from the beginning of the payload
- * @param size The number of bytes to be written
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message has not been staged
- */
- public void appendContent(StorableMessage m, byte[] data, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Get the content of previously staged or enqueued message.
- * The message headers are also set.
- *
- * @param m The message for which the content must be loaded
- * @param offset The number of bytes from the beginning of the payload
- * @param size The number of bytes to be loaded
- * @return The message content
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message does not exist
- */
- public byte[] loadContent(StorableMessage m, int offset, int size)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Destroy a previously staged message
- *
- * @param m the message to be destroyed
- * @throws InternalErrorException In case of internal message store problem
- * @throws MessageDoesntExistException If the message does not exist in the store
- */
- public void destroy(StorableMessage m)
- throws
- InternalErrorException,
- MessageDoesntExistException;
-
- /**
- * Enqueue a message under the scope of the transaction branch
- * identified by xid when specified.
- * <p> This operation is propagated to the queue and the message.
- * <p> A message that has been previously staged is assumed to have had
- * its payload already added (see appendContent)
- *
- * @param xid The xid of the transaction branch under which the message must be enqueued.
- * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
- * @param m The message to be enqueued
- * @param queue The queue into which the message must be enqueued
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- * @throws InvalidXidException The transaction branch is invalid
- * @throws UnknownXidException The transaction branch is unknown
- * @throws MessageDoesntExistException If the Message does not exist
- */
- public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException,
- MessageDoesntExistException;
-
- /**
- * Dequeue a message under the scope of the transaction branch identified by xid
- * if specified.
- * <p> This operation is propagated to the queue and the message.
- *
- * @param xid The xid of the transaction branch under which the message must be dequeued.
- * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
- * @param m The message to be dequeued
- * @param queue The queue from which the message must be dequeued
- * @throws InternalErrorException In case of internal message store problem
- * @throws QueueDoesntExistException If the queue does not exist in the store
- * @throws InvalidXidException The transaction branch is invalid
- * @throws UnknownXidException The transaction branch is unknown
- */
- public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
- throws
- InternalErrorException,
- QueueDoesntExistException,
- InvalidXidException,
- UnknownXidException;
-
- //=========================================================
- // Recovery specific methods
- //=========================================================
-
- /**
- * List all the persistent queues
- *
- * @return All the persistent queues
- * @throws InternalErrorException In case of internal message store problem
- */
- public Collection<StorableQueue> getAllQueues()
- throws
- InternalErrorException;
-
- /**
- * All enqueued messages of a given queue
- *
- * @param queue The queue where the message are retrieved from
- * @return The list all enqueued messages of a given queue
- * @throws InternalErrorException In case of internal message store problem
- */
- public Collection<StorableMessage> getAllMessages(StorableQueue queue)
- throws
- InternalErrorException;
-
- /**
- * Get a new message ID
- *
- * @return A new message ID
- */
- public long getNewMessageId();
-}
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import org.apache.qpid.server.exception.*;
+import org.apache.qpid.server.virtualhost.VirtualHost;
+import org.apache.qpid.server.txn.TransactionManager;
+import org.apache.qpid.server.exchange.Exchange;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.ContentHeaderBody;
+import org.apache.qpid.framing.abstraction.MessagePublishInfo;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.Collection;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 29-Mar-2007
+ * Time: 17:34:02
+ */
+public interface MessageStore
+{
+ /**
+ * Create a new exchange
+ *
+ * @param exchange the exchange to be persisted
+ * @throws InternalErrorException If an error occurs
+ */
+ public void createExchange(Exchange exchange)
+ throws
+ InternalErrorException;
+
+ /**
+ * Remove an exchange
+ *
+ * @param exchange The exchange to be removed
+ * @throws InternalErrorException If an error occurs
+ */
+ public void removeExchange(Exchange exchange)
+ throws
+ InternalErrorException;
+
+ /**
+ * Bind a queue with an exchange given a routing key
+ *
+ * @param exchange The exchange to bind the queue with
+ * @param routingKey The routing key
+ * @param queue The queue to be bound
+ * @param args Args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void bindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Unbind a queue from an exchange
+ *
+ * @param exchange The exchange the queue was bound to
+ * @param routingKey The routing queue
+ * @param queue The queue to unbind
+ * @param args args
+ * @throws InternalErrorException If an error occurs
+ */
+ public void unbindQueue(Exchange exchange,
+ AMQShortString routingKey,
+ StorableQueue queue, FieldTable args)
+ throws
+ InternalErrorException;
+
+ /**
+ * Called after instantiation in order to configure the message store. A particular implementation can define
+ * whatever parameters it wants.
+ *
+ * @param virtualHost The virtual host using by this store
+ * @param tm The transaction manager implementation
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @throws InternalErrorException If an error occurs that means the store is unable to configure itself
+ * @throws IllegalArgumentException If the configuration arguments are illegal
+ */
+ void configure(VirtualHost virtualHost, TransactionManager tm, String base, Configuration config)
+ throws
+ InternalErrorException,
+ IllegalArgumentException;
+
+ /**
+ * Called to close and cleanup any resources used by the message store.
+ *
+ * @throws InternalErrorException if close fails
+ */
+ void close()
+ throws
+ InternalErrorException;
+
+ /**
+ * Create a queue
+ *
+ * @param queue the queue to be created
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueAlreadyExistsException If the queue already exists in the store
+ */
+ public void createQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueAlreadyExistsException;
+
+ /**
+ * Destroy a queue
+ *
+ * @param queue The queue to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ */
+ public void destroyQueue(StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException;
+
+ /**
+ * Stage the message before effective enqueue
+ *
+ * @param m The message to stage
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageAlreadyStagedException If the message is already staged
+ */
+ public void stage(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageAlreadyStagedException;
+
+
+ /**
+ * Append more data with a previously staged message
+ *
+ * @param m The message to which data must be appended
+ * @param data Data to happen to the message
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be written
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message has not been staged
+ */
+ public void appendContent(StorableMessage m, byte[] data, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the content of previously staged or enqueued message.
+ * The message headers are also set.
+ *
+ * @param m The message for which the content must be loaded
+ * @param offset The number of bytes from the beginning of the payload
+ * @param size The number of bytes to be loaded
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public byte[] loadContent(StorableMessage m, int offset, int size)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the content header of this message
+ *
+ * @param m The message
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public ContentHeaderBody getContentHeaderBody(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Get the MessagePublishInfo of this message
+ *
+ * @param m The message
+ * @return The message content
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist
+ */
+ public MessagePublishInfo getMessagePublishInfo(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Destroy a previously staged message
+ *
+ * @param m the message to be destroyed
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws MessageDoesntExistException If the message does not exist in the store
+ */
+ public void destroy(StorableMessage m)
+ throws
+ InternalErrorException,
+ MessageDoesntExistException;
+
+ /**
+ * Enqueue a message under the scope of the transaction branch
+ * identified by xid when specified.
+ * <p> This operation is propagated to the queue and the message.
+ * <p> A message that has been previously staged is assumed to have had
+ * its payload already added (see appendContent)
+ *
+ * @param xid The xid of the transaction branch under which the message must be enqueued.
+ * <p> It he xid is null then the message is enqueued outside the scope of any transaction.
+ * @param m The message to be enqueued
+ * @param queue The queue into which the message must be enqueued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ * @throws MessageDoesntExistException If the Message does not exist
+ */
+ public void enqueue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException;
+
+ /**
+ * Dequeue a message under the scope of the transaction branch identified by xid
+ * if specified.
+ * <p> This operation is propagated to the queue and the message.
+ *
+ * @param xid The xid of the transaction branch under which the message must be dequeued.
+ * <p> It he xid is null then the message is dequeued outside the scope of any transaction.
+ * @param m The message to be dequeued
+ * @param queue The queue from which the message must be dequeued
+ * @throws InternalErrorException In case of internal message store problem
+ * @throws QueueDoesntExistException If the queue does not exist in the store
+ * @throws InvalidXidException The transaction branch is invalid
+ * @throws UnknownXidException The transaction branch is unknown
+ */
+ public void dequeue(Xid xid, StorableMessage m, StorableQueue queue)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException;
+
+ //=========================================================
+ // Recovery specific methods
+ //=========================================================
+
+ /**
+ * List all the persistent queues
+ *
+ * @return All the persistent queues
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableQueue> getAllQueues()
+ throws
+ InternalErrorException;
+
+ /**
+ * All enqueued messages of a given queue
+ *
+ * @param queue The queue where the message are retrieved from
+ * @return The list all enqueued messages of a given queue
+ * @throws InternalErrorException In case of internal message store problem
+ */
+ public Collection<StorableMessage> getAllMessages(StorableQueue queue)
+ throws
+ InternalErrorException;
+
+ /**
+ * Get a new message ID
+ *
+ * @return A new message ID
+ */
+ public long getNewMessageId();
+}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java Tue May 22 01:49:53 2007
@@ -0,0 +1,135 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.messageStore;
+
+import java.util.ArrayList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 15-May-2007
+ * Time: 10:11:49
+ */
+public abstract class Pool
+{
+ // The maximum size of the pool.
+ private int _maxPoolSize = -1;
+ // The current size of the pool.
+ private int _currentPoolSize = 0;
+ // The pool objects.
+ private volatile ArrayList<Object> _poolObjects = new ArrayList<Object>();
+ //The current number of created instances.
+ private int _instanceCount = 0;
+
+ /**
+ * Create a pool of specified size. Negative or null pool sizes are
+ * disallowed.
+ *
+ * @param poolSize The size of the pool to create. Should be 1 or
+ * greater.
+ * @throws Exception If the pool size is less than 1.
+ */
+ public Pool(int poolSize) throws Exception
+ {
+ if (poolSize <= 0)
+ {
+ throw new Exception("pool size is less than 1: " + poolSize);
+ }
+ _maxPoolSize = poolSize;
+ }
+
+ /**
+ * Return the maximum size of this pool.
+ *
+ * @return The maximum size of this pool.
+ */
+ public final int maxSize()
+ {
+ return _maxPoolSize;
+ }
+
+ /**
+ * Return the current number of created instances.
+ *
+ * @return The current number of created instances in this pool.
+ */
+ public final int numberOfInstances()
+ {
+ return _instanceCount;
+ }
+
+ /**
+ * Extending classes MUST define how to create an instance of the object
+ * that they pool.
+ *
+ * @return An instance of the pooled object.
+ * @throws Exception In case of internal error.
+ */
+ abstract protected Object createInstance() throws Exception;
+
+ /**
+ * Remove the next available object from the pool or wait for one to become
+ * available.
+ *
+ * @return The next available instance.
+ * @throws Exception If the call is interrupted
+ */
+ public final synchronized Object acquireInstance() throws Exception
+ {
+ while (_currentPoolSize == _maxPoolSize)
+ {
+ try
+ {
+ this.wait();
+ }
+ catch (InterruptedException e)
+ {
+ throw new Exception("pool wait threw interrupted exception", e);
+ }
+ }
+ if (_poolObjects.size() == 0)
+ {
+ _poolObjects.add(createInstance());
+ _instanceCount++;
+ }
+ _currentPoolSize++;
+ return _poolObjects.remove(0);
+ }
+
+ /**
+ * Return an object back into this pool.
+ *
+ * @param object The returning object.
+ */
+ public synchronized void releaseInstance(Object object)
+ {
+ _poolObjects.add(object);
+ _currentPoolSize--;
+ this.notify();
+ }
+
+ /**
+ * Return a dead object back into this pool.
+ *
+ */
+ public synchronized void releaseDeadInstance()
+ {
+ _instanceCount--;
+ _currentPoolSize--;
+ this.notify();
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/messageStore/Pool.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/queue/StorableMessageHandle.java Tue May 22 01:49:53 2007
@@ -5,9 +5,9 @@
* to you under the Apache License, Version 2.0 (the
* "License"); you may not use this file except in compliance
* with the License. You may obtain a copy of the License at
- *
+ *
* http://www.apache.org/licenses/LICENSE-2.0
- *
+ *
* Unless required by applicable law or agreed to in writing,
* software distributed under the License is distributed on an
* "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -24,7 +24,11 @@
import org.apache.qpid.server.store.StoreContext;
import org.apache.qpid.server.messageStore.MessageStore;
import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.MessageDoesntExistException;
import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
import org.apache.log4j.Logger;
import javax.transaction.xa.Xid;
@@ -65,7 +69,7 @@
// MessagePublishInfo
private MessagePublishInfo _messagePublishInfo;
// list of chunks
- private List<ContentChunk> _chunks = new LinkedList<ContentChunk>();
+ private List<ContentChunk> _chunks;
//========================================================================
// Constructors
@@ -84,6 +88,17 @@
throws
AMQException
{
+ if (_contentHeaderBody == null)
+ {
+ // load it from the store
+ try
+ {
+ _contentHeaderBody = _messageStore.getContentHeaderBody(_message);
+ } catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
return _contentHeaderBody;
}
@@ -91,6 +106,17 @@
throws
AMQException
{
+ if (_chunks == null )
+ {
+ if(_message.isStaged() )
+ {
+ loadChunks();
+ }
+ else
+ {
+ return 0;
+ }
+ }
return _chunks.size();
}
@@ -106,13 +132,57 @@
IllegalArgumentException,
AMQException
{
+ if (_chunks == null)
+ {
+ loadChunks();
+ }
return _chunks.get(index);
}
+ private void loadChunks()
+ throws
+ AMQException
+ {
+ try
+ {
+ _chunks = new LinkedList<ContentChunk>();
+ byte[] underlying = _messageStore.loadContent(_message, 1, 0);
+ final int size = underlying.length;
+ final org.apache.mina.common.ByteBuffer data =
+ org.apache.mina.common.ByteBuffer.wrap(underlying);
+ ContentChunk cb = new ContentChunk()
+ {
+
+ public int getSize()
+ {
+ return size;
+ }
+
+ public org.apache.mina.common.ByteBuffer getData()
+ {
+ return data;
+ }
+
+ public void reduceToFit()
+ {
+
+ }
+ };
+ _chunks.add(cb);
+ } catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
+
public void addContentBodyFrame(StoreContext storeContext, Long messageId, ContentChunk contentBody, boolean isLastContentBody)
throws
AMQException
{
+ if (_chunks == null)
+ {
+ _chunks = new LinkedList<ContentChunk>();
+ }
_chunks.add(contentBody);
// if rquired this message can be added to the store
//_messageStore.appendContent(_message, _payload, 0, 10);
@@ -123,6 +193,17 @@
throws
AMQException
{
+ if (_messagePublishInfo == null)
+ {
+ // read it from the store
+ try
+ {
+ _messagePublishInfo = _messageStore.getMessagePublishInfo(_message);
+ } catch (Exception e)
+ {
+ throw new AMQException(AMQConstant.INTERNAL_ERROR, e.getMessage(), e);
+ }
+ }
return _messagePublishInfo;
}
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java Tue May 22 01:49:53 2007
@@ -0,0 +1,93 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.exception.InternalErrorException;
+import org.apache.qpid.server.exception.UnknownXidException;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 15:15:18
+ */
+public abstract class JDBCAbstractRecord implements TransactionRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class);
+ // The record types
+ static public final int TYPE_DEQUEUE = 1;
+ static public final int TYPE_ENQUEUE = 2;
+
+ // the queue
+ StorableQueue _queue;
+ // the message
+ StorableMessage _message;
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public JDBCAbstractRecord(StorableMessage m, StorableQueue queue)
+ {
+ _queue = queue;
+ _message = m;
+ }
+
+ public abstract int getType();
+ public long getMessageID()
+ {
+ return _message.getMessageId();
+ }
+
+ public int getQueueID()
+ {
+ return _queue.getQueueID();
+ }
+
+ public void rollback(MessageStore store)
+ throws
+ InternalErrorException
+ {
+
+ }
+
+ public void prepare(MessageStore store)
+ throws
+ InternalErrorException
+ {
+ }
+
+
+ public abstract void rollback(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException;
+
+ public abstract void prepare(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException;
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCAbstractRecord.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java Tue May 22 01:49:53 2007
@@ -0,0 +1,85 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:50:34
+ */
+public class JDBCDequeueRecord extends JDBCAbstractRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCDequeueRecord.class);
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public JDBCDequeueRecord( StorableMessage m, StorableQueue queue)
+ {
+ super(m, queue);
+ }
+
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.dequeue(xid, _message, _queue);
+ }
+
+ public void rollback(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ ((JDBCStore) store).rollbackDequeu(xid, _message, _queue);
+ }
+
+ public void prepare(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ ((JDBCStore) store).prepareDequeu(xid, _message, _queue);
+ }
+
+ public int getType()
+ {
+ return TYPE_DEQUEUE;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCDequeueRecord.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java Tue May 22 01:49:53 2007
@@ -0,0 +1,106 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.StorableQueue;
+import org.apache.qpid.server.messageStore.StorableMessage;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+
+import javax.transaction.xa.Xid;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:50:20
+ */
+public class JDBCEnqueueRecord extends JDBCAbstractRecord
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCEnqueueRecord.class);
+
+ //========================================================================
+ // Constructor
+ //========================================================================
+ public JDBCEnqueueRecord(StorableMessage m, StorableQueue queue)
+ {
+ super(m, queue);
+ }
+
+ //========================================================================
+ // Interface TransactionRecord
+ //========================================================================
+
+ public void commit(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ QueueDoesntExistException,
+ InvalidXidException,
+ UnknownXidException,
+ MessageDoesntExistException
+ {
+ store.enqueue(xid, _message, _queue);
+ }
+
+ public void rollback(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ if (!_message.isEnqueued())
+ {
+ // try to delete the message
+ try
+ {
+ store.destroy(_message);
+ } catch (Exception e)
+ {
+ throw new InternalErrorException("Problem when destoying message ", e);
+ }
+ }
+ }
+
+ public void prepare(MessageStore store, Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ try
+ {
+ if (!_message.isEnqueued() && !_message.isStaged())
+ {
+ store.stage(_message);
+ store.appendContent(_message, _message.getData(), 0, _message.getData().length);
+ }
+ }
+ catch (Exception e)
+ {
+ throw new InternalErrorException("Problem when persisting message ", e);
+ }
+ }
+
+ public int getType()
+ {
+ return TYPE_ENQUEUE;
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCEnqueueRecord.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java Tue May 22 01:49:53 2007
@@ -0,0 +1,196 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.JDBCStore;
+
+import java.util.List;
+import java.util.LinkedList;
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:09:35
+ */
+public class JDBCTransaction implements Transaction
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCTransaction.class);
+ public static long _xidId = 0;
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // the associated connection
+ private JDBCStore.MyConnection _connection;
+ // Indicates whether this transaction is prepared
+ private boolean _prepared = false;
+ // Indicates that this transaction has heuristically rolled back
+ private boolean _heurRollBack = false;
+ // The list of records associated with this tx
+ private List<TransactionRecord> _records = new LinkedList<TransactionRecord>();
+ // The date when this tx has been created.
+ private long _dateCreated;
+ // The timeout in seconds
+ private long _timeout;
+ // this instance xid id used as primary key
+ private long _thisxidId;
+
+ //=========================================================
+ // Constructors
+ //=========================================================
+
+ /**
+ * Create a transaction
+ *
+ */
+ public JDBCTransaction()
+ {
+ _dateCreated = System.currentTimeMillis();
+ _thisxidId = _xidId++;
+ }
+
+ //=========================================================
+ // Getter and Setter methods
+ //=========================================================
+
+ /**
+ * Notify that this tx has been prepared
+ */
+ public void prepare()
+ {
+ _prepared = true;
+ }
+
+ /**
+ * Specify whether this transaction is prepared
+ *
+ * @return true if this transaction is prepared, false otherwise
+ */
+ public boolean isPrepared()
+ {
+ return _prepared;
+ }
+
+ /**
+ * Notify that this tx has been heuristically rolled back
+ */
+ public void heurRollback()
+ {
+ _heurRollBack = true;
+ }
+
+ /**
+ * Specify whether this transaction has been heuristically rolled back
+ *
+ * @return true if this transaction has been heuristically rolled back , false otherwise
+ */
+ public boolean isHeurRollback()
+ {
+ return _heurRollBack;
+ }
+
+ /**
+ * Add an abstract record to this tx.
+ *
+ * @param record The record to be added
+ */
+ public void addRecord(TransactionRecord record)
+ {
+ _records.add(record);
+ }
+
+ /**
+ * Get the list of records associated with this tx.
+ *
+ * @return The list of records associated with this tx.
+ */
+ public List<TransactionRecord> getrecords()
+ {
+ return _records;
+ }
+
+ /**
+ * Set this tx timeout
+ *
+ * @param timeout This tx timeout in seconds
+ */
+ public void setTimeout(long timeout)
+ {
+ _timeout = timeout;
+ }
+
+ /**
+ * Get this tx timeout
+ *
+ * @return This tx timeout in seconds
+ */
+ public long getTimeout()
+ {
+ return _timeout;
+ }
+
+ /**
+ * Specify whether this tx has expired
+ *
+ * @return true if this tx has expired, false otherwise
+ */
+ public boolean hasExpired()
+ {
+ long currentDate = System.currentTimeMillis();
+ boolean result = currentDate - _dateCreated > _timeout * 1000;
+ if (_log.isDebugEnabled() && result)
+ {
+ _log.debug("transaction has expired");
+ }
+ return result;
+ }
+
+ /**
+ * Get the JDBC connection
+ * @return The JDBC connection
+ */
+ public JDBCStore.MyConnection getConnection()
+ {
+ return _connection;
+ }
+
+ /**
+ * Set the JDBC connection
+ *
+ * @param connection The new JDBC connection
+ */
+ public void setConnection(JDBCStore.MyConnection connection)
+ {
+ _connection = connection;
+ }
+
+ /**
+ * This tx xid id used as primary key
+ *
+ * @return this tx xid id
+ */
+ public long getXidID()
+ {
+ return _thisxidId;
+ }
+
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransaction.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Added: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java?view=auto&rev=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java (added)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java Tue May 22 01:49:53 2007
@@ -0,0 +1,554 @@
+/* Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ */
+package org.apache.qpid.server.txn;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.server.messageStore.MessageStore;
+import org.apache.qpid.server.messageStore.JDBCStore;
+import org.apache.qpid.server.exception.*;
+import org.apache.commons.configuration.Configuration;
+
+import javax.transaction.xa.Xid;
+import java.util.HashMap;
+import java.util.Set;
+
+
+/**
+ * Created by Arnaud Simon
+ * Date: 16-May-2007
+ * Time: 14:05:45
+ */
+public class JDBCTransactionManager implements TransactionManager
+{
+ //========================================================================
+ // Static Constants
+ //========================================================================
+ // The logger for this class
+ private static final Logger _log = Logger.getLogger(JDBCTransactionManager.class);
+
+ private static final String ENVIRONMENT_TX_TIMEOUT = "environment-tx-timeout";
+
+ //========================================================================
+ // Instance Fields
+ //========================================================================
+ // The underlying jdbc message store
+ private JDBCStore _messagStore;
+
+ // A map of XID/x
+ private HashMap<Xid, Transaction> _xidMap;
+
+ // A map of in-doubt txs
+ private HashMap<Xid, Transaction> _indoubtXidMap;
+
+ // A default tx timeout in sec
+ private int _defaultTimeout; // set to 10s if not specified in the config
+
+ //===================================
+ //=== Configuartion
+ //===================================
+
+ /**
+ * Configure this TM with the Message store implementation
+ *
+ * @param base The base element identifier from which all configuration items are relative. For example, if the base
+ * element is "store", the all elements used by concrete classes will be "store.foo" etc.
+ * @param config The apache commons configuration object
+ * @param messageStroe the message store associated with the TM
+ */
+ public void configure(MessageStore messageStroe, String base, Configuration config)
+ {
+ _messagStore = (JDBCStore) messageStroe;
+ if (config != null)
+ {
+ _defaultTimeout = config.getInt(base + "." + ENVIRONMENT_TX_TIMEOUT, 120);
+ } else
+ {
+ _defaultTimeout = 120;
+ }
+ _log.info("Using transaction timeout of " + _defaultTimeout + " s");
+ // get the list of in-doubt transactions
+ try
+ {
+ _indoubtXidMap = _messagStore.getAllInddoubt();
+ _xidMap = _indoubtXidMap;
+ } catch (Exception e)
+ {
+ _log.fatal("Cannot recover in-doubt transactions", e);
+ }
+ }
+
+ //===================================
+ //=== TransactionManager interface
+ //===================================
+
+ /**
+ * Begin a transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to begin
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws InvalidXidException The Xid is invalid
+ */
+ public synchronized XAFlag begin(Xid xid)
+ throws
+ InternalErrorException,
+ InvalidXidException
+ {
+ if (xid == null)
+ {
+ throw new InvalidXidException(xid, "null xid");
+ }
+ if (_xidMap.containsKey(xid))
+ {
+ throw new InvalidXidException(xid, "Xid already exist");
+ }
+ Transaction tx = new JDBCTransaction();
+ tx.setTimeout(_defaultTimeout);
+ _xidMap.put(xid, tx);
+ return XAFlag.ok;
+ }
+
+ /**
+ * Prepare the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to prepare
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution.
+ * <li> <code>XAFlag.rdonly</code>: The transaction branch was read-only and has been committed.
+ * <li> <code>XAFlag.rbrollback</code>: The transaction branch was marked rollback-only for an unspecied reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Prepare has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public synchronized XAFlag prepare(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ // get the transaction
+ JDBCTransaction tx = getTransaction(xid);
+ XAFlag result = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ result = XAFlag.rdonly;
+ } else if (tx.hasExpired())
+ {
+ result = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (tx.isPrepared())
+ {
+ throw new CommandInvalidException("TransactionImpl is already prepared");
+ }
+ if (tx.getrecords().size() == 0)
+ {
+ // the tx was read only (no work has been done)
+ _xidMap.remove(xid);
+ result = XAFlag.rdonly;
+ } else
+ {
+ try
+ {
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ // save the xid
+ _messagStore.saveXID(con, tx, xid);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ if (record instanceof JDBCAbstractRecord)
+ {
+ ((JDBCAbstractRecord) record).prepare(_messagStore, xid);
+ _messagStore.saveRecord(con, tx, (JDBCAbstractRecord) record);
+ } else
+ {
+ record.prepare(_messagStore);
+ }
+ }
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ } catch (Exception e)
+ {
+ _log.error("Cannot prepare tx: " + xid);
+ throw new InternalErrorException("Cannot prepare tx: " + xid);
+ }
+ tx.prepare();
+ }
+ }
+ return result;
+ }
+
+ /**
+ * Rollback the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to rollback
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Rollback has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public synchronized XAFlag rollback(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ // get the transaction
+ JDBCTransaction tx = getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else
+ {
+ try
+ {
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ if (record instanceof JDBCAbstractRecord)
+ {
+ ((JDBCAbstractRecord) record).rollback(_messagStore, xid);
+ } else
+ {
+ record.rollback(_messagStore);
+ }
+ }
+ if (tx.isPrepared())
+ {
+ _messagStore.deleteRecords(con, tx);
+ _messagStore.deleteXID(con, tx);
+ _messagStore.commitConnection(con);
+ }
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ }
+ catch (Exception e)
+ {
+ // this should not happen
+ _log.error("Error when rolling back distributed transaction: " + xid);
+ throw new InternalErrorException("Error when rolling back distributed transaction: " + xid, e);
+ }
+ removeTransaction(xid);
+ }
+ if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ }
+ return flag;
+ }
+
+ /**
+ * Commit the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to commit
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the specied transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Commit has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ * @throws org.apache.qpid.server.exception.NotPreparedException
+ * The branch was not prepared prior to commit
+ */
+ public synchronized XAFlag commit(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException,
+ NotPreparedException
+ {
+ // get the transaction
+ JDBCTransaction tx = getTransaction(xid);
+ XAFlag flag = XAFlag.ok;
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ if (!tx.isPrepared())
+ {
+ throw new NotPreparedException("TransactionImpl is not prepared");
+ }
+ try
+ {
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing distributed transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ _messagStore.deleteRecords(con, tx);
+ _messagStore.deleteXID(con, tx);
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ } catch (Exception e)
+ {
+ // this should not happen
+ _log.error("Error when committing distributed transaction heurrb mode returned: " + xid);
+ throw new InternalErrorException("Error when committing distributed transaction: " + xid, e);
+ }
+ removeTransaction(xid);
+ }
+ return flag;
+ }
+
+ /**
+ * One phase commit the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to one phase commit
+ * @return <ul>
+ * <li> <code>XAFlag.ok</code>: Normal execution,
+ * <li> <code>NOT SUPPORTED XAFlag.heurhaz</code>: Due to some failure, the work done on behalf of the specified transaction branch may have been heuristically completed.
+ * <li> <code>NOT SUPPORTED XAFlag.heurcom</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was committed.
+ * <li> <code>XAFlag.heurrb</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.heurmix</code>: Due to a heuristic decision, the work done on behalf of the speci?ed transaction branch was partially committed and partially rolled back.
+ * <li> <code>NOT SUPPORTED XAFlag.rbrollback</code>: The broker marked the transaction branch rollback-only for an unspeci?ed reason.
+ * <li> <code>XAFlag.rbtimeout</code>: The work represented by this transaction branch took too long.
+ * </ul>
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Commit has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public synchronized XAFlag commit_one_phase(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ XAFlag flag = XAFlag.ok;
+ JDBCTransaction tx = getTransaction(xid);
+ if (tx.isHeurRollback())
+ {
+ flag = XAFlag.heurrb;
+ } else if (tx.hasExpired())
+ {
+ flag = XAFlag.rbtimeout;
+ // rollback this tx branch
+ rollback(xid);
+ } else
+ {
+ try
+ {
+ // we do not need to prepare the tx
+ tx.prepare();
+ JDBCStore.MyConnection con = _messagStore.getConnection();
+ tx.setConnection(con);
+ for (TransactionRecord record : tx.getrecords())
+ {
+ try
+ {
+ record.commit(_messagStore, xid);
+ } catch (InvalidXidException e)
+ {
+ throw new UnknownXidException(xid, e);
+ } catch (Exception e)
+ {
+ // this should not happen as the queue and the message must exist
+ _log.error("Error when committing transaction heurmix mode returned: " + xid);
+ flag = XAFlag.heurmix;
+ }
+ }
+ _messagStore.commitConnection(con);
+ tx.setConnection(null);
+ } catch (Exception e)
+ {
+ e.printStackTrace();
+ throw new InternalErrorException("cannot commit transaxtion with xid " + xid + " " + e, e);
+ }
+ finally
+ {
+ removeTransaction(xid);
+ }
+ }
+ return flag;
+ }
+
+ /**
+ * Forget about the transaction branch identified by Xid
+ *
+ * @param xid The xid of the branch to forget
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Forget has been call in an improper context
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public void forget(Xid xid)
+ throws
+ InternalErrorException,
+ CommandInvalidException,
+ UnknownXidException
+ {
+ synchronized (xid)
+ {
+ getTransaction(xid);
+ removeTransaction(xid);
+ }
+ }
+
+ /**
+ * Set the transaction branch timeout value in seconds
+ *
+ * @param xid The xid of the branch to set timeout
+ * @param timeout Timeout value in seconds
+ * @throws InternalErrorException In case of internal problem
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public void setTimeout(Xid xid, long timeout)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ JDBCTransaction tx = getTransaction(xid);
+ tx.setTimeout(timeout);
+ }
+
+ /**
+ * Get the transaction branch timeout
+ *
+ * @param xid The xid of the branch to get the timeout from
+ * @return The timeout associated with the branch identified with xid
+ * @throws InternalErrorException In case of internal problem
+ * @throws UnknownXidException The Xid is unknown
+ */
+ public long getTimeout(Xid xid)
+ throws
+ InternalErrorException,
+ UnknownXidException
+ {
+ JDBCTransaction tx = getTransaction(xid);
+ return tx.getTimeout();
+ }
+
+ /**
+ * Get a set of Xids the RM has prepared or heuristically completed
+ *
+ * @param startscan Indicates that recovery scan should start
+ * @param endscan Indicates that the recovery scan should end after returning the Xids
+ * @return Set of Xids the RM has prepared or heuristically completed
+ * @throws InternalErrorException In case of internal problem
+ * @throws CommandInvalidException Recover has been call in an improper context
+ */
+ public Set<Xid> recover(boolean startscan, boolean endscan)
+ throws
+ InternalErrorException,
+ CommandInvalidException
+ {
+ return _indoubtXidMap.keySet();
+ }
+
+ /**
+ * An error happened (for example the channel has been abruptly closed)
+ * with this Xid, TM must make a heuristical decision.
+ *
+ * @param xid The Xid of the transaction branch to be heuristically completed
+ * @throws UnknownXidException The Xid is unknown
+ * @throws InternalErrorException In case of internal problem
+ */
+ public void HeuristicOutcome(Xid xid)
+ throws
+ UnknownXidException,
+ InternalErrorException
+ {
+ synchronized (xid)
+ {
+ JDBCTransaction tx = getTransaction(xid);
+ if (!tx.isPrepared())
+ {
+ // heuristically rollback this tx
+ for (TransactionRecord record : tx.getrecords())
+ {
+ record.rollback(_messagStore);
+ }
+ tx.heurRollback();
+ }
+ // add this branch in the list of indoubt tx
+ _indoubtXidMap.put(xid, tx);
+ }
+ }
+
+
+ public JDBCTransaction getTransaction(Xid xid)
+ throws
+ UnknownXidException
+ {
+ Transaction tx = _xidMap.get(xid);
+ if (tx == null)
+ {
+ throw new UnknownXidException(xid);
+ }
+ return (JDBCTransaction) tx;
+ }
+
+ //==========================================================================
+ //== Methods for Message Store
+ //==========================================================================
+
+ /**
+ * Get the default tx timeout in seconds
+ *
+ * @return the default tx timeout in seconds
+ */
+ public int getDefaultTimeout()
+ {
+ return _defaultTimeout;
+ }
+ //==========================================================================
+ //== Private Methods
+ //==========================================================================
+
+ private void removeTransaction(Xid xid)
+ {
+ _xidMap.remove(xid);
+ _indoubtXidMap.remove(xid);
+ }
+}
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/JDBCTransactionManager.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/txn/NonTransactionalContext.java Tue May 22 01:49:53 2007
@@ -93,7 +93,10 @@
{
try
{
- message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+ if( ! deliverFirst )
+ {
+ message.getMessageHandle().enqueue(_storeContext, message.getMessageId(), queue);
+ }
queue.process(_storeContext, message, deliverFirst);
//following check implements the functionality
//required by the 'immediate' flag:
Modified: incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java?view=diff&rev=540493&r1=540492&r2=540493
==============================================================================
--- incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java (original)
+++ incubator/qpid/trunk/qpid/java/broker/src/main/java/org/apache/qpid/server/util/NullApplicationRegistry.java Tue May 22 01:49:53 2007
@@ -57,10 +57,14 @@
super(new MapConfiguration(new HashMap()));
}
- public void initialise() throws Exception
+ public void initialise()
+ throws
+ Exception
{
_configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.MemoryMessageStore");
_configuration.addProperty("txn.class", "org.apache.qpid.server.txn.MemoryTransactionManager");
+ // _configuration.addProperty("store.class", "org.apache.qpid.server.messageStore.JDBCStore");
+ // _configuration.addProperty("txn.class", "org.apache.qpid.server.txn.JDBCTransactionManager");
Properties users = new Properties();
@@ -119,5 +123,6 @@
return _accessManager;
}
}
+