You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/10/17 22:46:03 UTC

svn commit: r465038 [2/2] - in /incubator/qpid/branches/new_persistence: cpp/ cpp/broker/ cpp/client/ cpp/common/ cpp/etc/ cpp/etc/stylesheets/ cpp/qpidd/ cpp/src/ cpp/src/qpid/ cpp/src/qpid/broker/ cpp/src/qpid/client/ cpp/src/qpid/concurrent/ cpp/src...

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnBuffer.java Tue Oct 17 13:45:52 2006
@@ -24,9 +24,13 @@
 import java.util.ArrayList;
 import java.util.List;
 
+/**
+ * Holds a list of TxnOp instance representing transactional
+ * operations. 
+ */
 public class TxnBuffer
 {
-    private boolean _persistentMessageRecevied = false;
+    private boolean _containsPersistentChanges = false;
     private final MessageStore _store;
     private final List<TxnOp> _ops = new ArrayList<TxnOp>();
     private static final Logger _log = Logger.getLogger(TxnBuffer.class);
@@ -36,45 +40,64 @@
         _store = store;
     }
 
-    public void setPersistentMessageRecevied()
+    public void containsPersistentChanges()
     {
-        _persistentMessageRecevied = true;
+        _containsPersistentChanges = true;
     }
 
     public void commit() throws AMQException
     {
-        if (_persistentMessageRecevied)
+        if (_containsPersistentChanges)
         {
             _log.debug("Begin Transaction.");
             _store.beginTran();
-        }
-        boolean failed = true;
-        try
-        {
-            for (TxnOp op : _ops)
+            if(prepare())
             {
-                op.commit();
+                _log.debug("Transaction Succeeded");
+                _store.commitTran();
+                for (TxnOp op : _ops)
+                {
+                    op.commit();
+                }
             }
-            _ops.clear();
-            failed = false;
-        }
-        finally
-        {
-            if (_persistentMessageRecevied)
+            else
             {
-                if (failed)
+                _log.debug("Transaction Failed");
+                _store.abortTran();
+            }
+        }else{
+            if(prepare())
+            {
+                for (TxnOp op : _ops)
                 {
-                    _log.debug("Transaction Failed");
-                    _store.abortTran();
+                    op.commit();
                 }
-                else
+            }            
+        }
+        _ops.clear();
+    }
+
+    private boolean prepare() 
+    {        
+        for (int i = 0; i < _ops.size(); i++)
+        {
+            TxnOp op = _ops.get(i);
+            try
+            {
+                op.prepare();
+            }
+            catch(Exception e)
+            {
+                //compensate previously prepared ops
+                for(int j = 0; j < i; j++)
                 {
-                    _log.debug("Transaction Succeeded");
-                    _store.commitTran();
-                }
+                    _ops.get(j).undoPrepare();
+                }    
+                return false;
             }
         }
-    }
+        return true;
+    }   
 
     public void rollback() throws AMQException
     {
@@ -88,5 +111,10 @@
     public void enlist(TxnOp op)
     {
         _ops.add(op);
+    }
+
+    public void cancel(TxnOp op)
+    {
+        _ops.remove(op);
     }
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/src/org/apache/qpid/server/txn/TxnOp.java Tue Oct 17 13:45:52 2006
@@ -19,8 +19,33 @@
 
 import org.apache.qpid.AMQException;
 
+/**
+ * This provides the abstraction of an individual operation within a
+ * transaction. It is used by the TxnBuffer class.
+ */
 public interface TxnOp
 {
-    public void commit() throws AMQException;
+    /**
+     * Do the part of the operation that updates persistent state 
+     */
+    public void prepare() throws AMQException;
+    /**
+     * Complete the operation started by prepare. Can now update in
+     * memory state or make netork transfers.
+     */
+    public void commit();
+    /**
+     * This is not the same as rollback. Unfortunately the use of an
+     * in memory reference count as a locking mechanism and a test for
+     * whether a message should be deleted means that as things are,
+     * handling an acknowledgement unavoidably alters both memory and
+     * persistent state on prepare. This is needed to 'compensate' or
+     * undo the in-memory change if the peristent update of later ops
+     * fails.
+     */
+    public void undoPrepare();
+    /**
+     * Rolls back the operation.
+     */
     public void rollback();
 }

Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/UnitTests.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/UnitTests.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/UnitTests.java Tue Oct 17 13:45:52 2006
@@ -28,6 +28,7 @@
         org.apache.qpid.server.protocol.UnitTests.class,
         org.apache.qpid.server.queue.UnitTests.class,
         org.apache.qpid.server.store.UnitTests.class,
+        org.apache.qpid.server.txn.UnitTests.class,
         org.apache.qpid.server.util.UnitTests.class
         })
 public class UnitTests

Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/queue/DeliveryManagerTest.java Tue Oct 17 13:45:52 2006
@@ -138,7 +138,9 @@
     @Test (expected=NoConsumersException.class)
     public void noConsumers() throws AMQException
     {
-        _mgr.deliver("Me", message(true));
+        AMQMessage msg = message(true);
+        _mgr.deliver("Me", msg);
+        msg.checkDeliveredToConsumer();        
     }
 
     @Test (expected=NoConsumersException.class)
@@ -147,7 +149,9 @@
         TestSubscription s = new TestSubscription("A");
         _subscriptions.addSubscriber(s);
         s.setSuspended(true);
-        _mgr.deliver("Me", message(true));
+        AMQMessage msg = message(true);
+        _mgr.deliver("Me", msg);
+        msg.checkDeliveredToConsumer();
     }
 
     public static junit.framework.Test suite()

Modified: incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java (original)
+++ incubator/qpid/branches/new_persistence/java/broker/test/src/org/apache/qpid/server/util/UnitTests.java Tue Oct 17 13:45:52 2006
@@ -22,7 +22,7 @@
 import org.junit.runners.Suite;
 
 @RunWith(Suite.class)
-@Suite.SuiteClasses({LoggingProxyTest.class})
+@Suite.SuiteClasses({LoggingProxyTest.class, OrderedMapHelperTest.class})
 public class UnitTests
 {
     public static junit.framework.Test suite()

Modified: incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQConnection.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQConnection.java Tue Oct 17 13:45:52 2006
@@ -17,34 +17,44 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.log4j.Logger;
+import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQConnectionException;
 import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.url.URLSyntaxException;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.framing.*;
-import org.apache.qpid.jms.*;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
 import org.apache.qpid.jms.Connection;
-
-import org.apache.log4j.Logger;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.url.URLSyntaxException;
 
 import javax.jms.*;
-import javax.jms.Queue;
-import javax.jms.Session;
-import javax.naming.Reference;
 import javax.naming.NamingException;
-import javax.naming.StringRefAddr;
+import javax.naming.Reference;
 import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
 import java.io.IOException;
-import java.util.*;
-import java.util.concurrent.atomic.AtomicInteger;
 import java.net.ConnectException;
 import java.nio.channels.UnresolvedAddressException;
 import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.atomic.AtomicInteger;
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
@@ -129,8 +139,8 @@
                          String clientName, String virtualHost) throws AMQException, URLSyntaxException
     {
         this(new AMQConnectionURL(ConnectionURL.AMQ_PROTOCOL + "://" +
-                username + ":" + password + "@" + clientName +
-                virtualHost + "?brokerlist='" + broker + "'"));
+                                  username + ":" + password + "@" + clientName +
+                                  virtualHost + "?brokerlist='" + broker + "'"));
     }
 
     public AMQConnection(String host, int port, String username, String password,
@@ -143,14 +153,14 @@
                          String clientName, String virtualHost) throws AMQException, URLSyntaxException
     {
         this(new AMQConnectionURL(useSSL ?
-                ConnectionURL.AMQ_PROTOCOL + "://" +
-                        username + ":" + password + "@" + clientName +
-                        virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
-                        + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
-                ConnectionURL.AMQ_PROTOCOL + "://" +
-                        username + ":" + password + "@" + clientName +
-                        virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
-                        + "," + ConnectionURL.OPTIONS_SSL + "='false'"
+                                  ConnectionURL.AMQ_PROTOCOL + "://" +
+                                  username + ":" + password + "@" + clientName +
+                                  virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+                                  + "," + ConnectionURL.OPTIONS_SSL + "='true'" :
+                                                                                ConnectionURL.AMQ_PROTOCOL + "://" +
+                                                                                username + ":" + password + "@" + clientName +
+                                                                                virtualHost + "?brokerlist='tcp://" + host + ":" + port + "'"
+                                                                                + "," + ConnectionURL.OPTIONS_SSL + "='false'"
         ));
     }
 
@@ -369,12 +379,18 @@
 
     public Session createSession(final boolean transacted, final int acknowledgeMode) throws JMSException
     {
-        return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH);
+        return createSession(transacted, acknowledgeMode, AMQSession.DEFAULT_PREFETCH_HIGH_MARK);
     }
 
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
                                                      final int prefetch) throws JMSException
     {
+        return createSession(transacted, acknowledgeMode, prefetch, prefetch);
+    }
+
+    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+                                                     final int prefetchHigh, final int prefetchLow) throws JMSException
+    {
         checkNotClosed();
         if (channelLimitReached())
         {
@@ -397,14 +413,14 @@
                     // open it, so that there is no window where we could receive data on the channel and not be set
                     // up to handle it appropriately.
                     AMQSession session = new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode,
-                                                        prefetch);
+                                                        prefetchHigh, prefetchLow);
                     _protocolHandler.addSessionByChannel(channelId, session);
                     registerSession(channelId, session);
 
                     boolean success = false;
                     try
                     {
-                        createChannelOverWire(channelId, prefetch, transacted);
+                        createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
                         success = true;
                     }
                     catch (AMQException e)
@@ -432,13 +448,15 @@
         }
     }
 
-    private void createChannelOverWire(int channelId, int prefetch, boolean transacted)
+    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
             throws AMQException
     {
         _protocolHandler.syncWrite(
                 ChannelOpenBody.createAMQFrame(channelId, null), ChannelOpenOkBody.class);
+
+        //todo send low water mark when protocol allows.
         _protocolHandler.syncWrite(
-                BasicQosBody.createAMQFrame(channelId, 0, prefetch, false),
+                BasicQosBody.createAMQFrame(channelId, 0, prefetchHigh, false),
                 BasicQosOkBody.class);
 
         if (transacted)
@@ -451,11 +469,11 @@
         }
     }
 
-    private void reopenChannel(int channelId, int prefetch, boolean transacted) throws AMQException
+    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
     {
         try
         {
-            createChannelOverWire(channelId, prefetch, transacted);
+            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
         }
         catch (AMQException e)
         {
@@ -559,7 +577,7 @@
 
     public void close() throws JMSException
     {
-        synchronized (getFailoverMutex())
+        synchronized(getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
             {
@@ -897,7 +915,7 @@
         {
             AMQSession s = (AMQSession) it.next();
             _protocolHandler.addSessionByChannel(s.getChannelId(), s);
-            reopenChannel(s.getChannelId(), s.getDefaultPrefetch(), s.getTransacted());
+            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
             s.resubscribe();
         }
     }

Modified: incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQSession.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/AMQSession.java Tue Oct 17 13:45:52 2006
@@ -20,11 +20,11 @@
 import org.apache.log4j.Logger;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.message.AbstractJMSMessage;
 import org.apache.qpid.client.message.MessageFactoryRegistry;
 import org.apache.qpid.client.message.UnprocessedMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.failover.FailoverSupport;
 import org.apache.qpid.client.util.FlowControllingBlockingQueue;
 import org.apache.qpid.framing.*;
 import org.apache.qpid.jms.Session;
@@ -46,7 +46,8 @@
 {
     private static final Logger _logger = Logger.getLogger(AMQSession.class);
 
-    public static final int DEFAULT_PREFETCH = 5000;
+    public static final int DEFAULT_PREFETCH_HIGH_MARK = 5000;
+    public static final int DEFAULT_PREFETCH_LOW_MARK = 2500;
 
     private AMQConnection _connection;
 
@@ -56,7 +57,8 @@
 
     private int _channelId;
 
-    private int _defaultPrefetch = DEFAULT_PREFETCH;
+    private int _defaultPrefetchHighMark = DEFAULT_PREFETCH_HIGH_MARK;
+    private int _defaultPrefetchLowMark = DEFAULT_PREFETCH_LOW_MARK;
 
     /**
      * Used in the consume method. We generate the consume tag on the client so that we can use the nowait
@@ -98,7 +100,7 @@
     /**
      * The counter of the next producer id. This id is generated by the session and used only to allow the
      * producer to identify itself to the session when deregistering itself.
-     *
+     * <p/>
      * Access to this id does not require to be synchronized since according to the JMS specification only one
      * thread of control is allowed to create producers for any given session instance.
      */
@@ -125,12 +127,12 @@
             _stopped.set(false);
             try
             {
-                while (!_stopped.get() && (message = (UnprocessedMessage)_queue.take()) != null)
+                while (!_stopped.get() && (message = (UnprocessedMessage) _queue.take()) != null)
                 {
                     dispatchMessage(message);
                 }
             }
-            catch(InterruptedException e)
+            catch (InterruptedException e)
             {
                 ;
             }
@@ -201,12 +203,18 @@
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry)
     {
-        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH);
+        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, DEFAULT_PREFETCH_HIGH_MARK, DEFAULT_PREFETCH_LOW_MARK);
     }
 
     AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
                MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetch)
     {
+        this(con, channelId, transacted, acknowledgeMode, messageFactoryRegistry, defaultPrefetch, defaultPrefetch);
+    }
+
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode,
+               MessageFactoryRegistry messageFactoryRegistry, int defaultPrefetchHighMark, int defaultPrefetchLowMark)
+    {
         _connection = con;
         _transacted = transacted;
         if (transacted)
@@ -219,34 +227,36 @@
         }
         _channelId = channelId;
         _messageFactoryRegistry = messageFactoryRegistry;
-        _defaultPrefetch = defaultPrefetch;
+        _defaultPrefetchHighMark = defaultPrefetchHighMark;
+        _defaultPrefetchLowMark = defaultPrefetchLowMark;
+
         if (_acknowledgeMode == NO_ACKNOWLEDGE)
         {
-        _queue = new FlowControllingBlockingQueue(_defaultPrefetch,
-            new FlowControllingBlockingQueue.ThresholdListener()
-            {
-                public void aboveThreshold(int currentValue)
-                {
-                    if(_acknowledgeMode == NO_ACKNOWLEDGE)
-                    {
-                        _logger.warn("Above threshold so suspending channel. Current value is " + currentValue);
-                        suspendChannel();
-                    }
-                }
-
-                public void underThreshold(int currentValue)
-                {
-                    if(_acknowledgeMode == NO_ACKNOWLEDGE)
-                    {
-                        _logger.warn("Below threshold so unsuspending channel. Current value is " + currentValue);
-                        unsuspendChannel();
-                    }
-                }
-            });
+            _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, _defaultPrefetchLowMark,
+                                                      new FlowControllingBlockingQueue.ThresholdListener()
+                                                      {
+                                                          public void aboveThreshold(int currentValue)
+                                                          {
+                                                              if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                              {
+                                                                  _logger.warn("Above threshold(" + _defaultPrefetchHighMark + ") so suspending channel. Current value is " + currentValue);
+                                                                  suspendChannel();
+                                                              }
+                                                          }
+
+                                                          public void underThreshold(int currentValue)
+                                                          {
+                                                              if (_acknowledgeMode == NO_ACKNOWLEDGE)
+                                                              {
+                                                                  _logger.warn("Below threshold(" + _defaultPrefetchLowMark + ") so unsuspending channel. Current value is " + currentValue);
+                                                                  unsuspendChannel();
+                                                              }
+                                                          }
+                                                      });
         }
         else
         {
-             _queue = new FlowControllingBlockingQueue(_defaultPrefetch,null);
+            _queue = new FlowControllingBlockingQueue(_defaultPrefetchHighMark, null);
         }
     }
 
@@ -260,6 +270,11 @@
         this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetch);
     }
 
+    AMQSession(AMQConnection con, int channelId, boolean transacted, int acknowledgeMode, int defaultPrefetchHigh, int defaultPrefetchLow)
+    {
+        this(con, channelId, transacted, acknowledgeMode, MessageFactoryRegistry.newDefaultRegistry(), defaultPrefetchHigh, defaultPrefetchLow);
+    }
+
     AMQConnection getAMQConnection()
     {
         return _connection;
@@ -267,7 +282,7 @@
 
     public BytesMessage createBytesMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -283,7 +298,7 @@
 
     public MapMessage createMapMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -299,7 +314,7 @@
 
     public javax.jms.Message createMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -315,7 +330,7 @@
 
     public ObjectMessage createObjectMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -331,7 +346,7 @@
 
     public ObjectMessage createObjectMessage(Serializable object) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -355,7 +370,7 @@
 
     public TextMessage createTextMessage() throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
 
@@ -372,7 +387,7 @@
 
     public TextMessage createTextMessage(String text) throws JMSException
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             checkNotClosed();
             try
@@ -388,11 +403,11 @@
         }
     }
 
-   public boolean getTransacted() throws JMSException
-   {
-       checkNotClosed();
-       return _transacted;
-   }
+    public boolean getTransacted() throws JMSException
+    {
+        checkNotClosed();
+        return _transacted;
+    }
 
     public int getAcknowledgeMode() throws JMSException
     {
@@ -407,7 +422,7 @@
         {
             // Acknowledge up to message last delivered (if any) for each consumer.
             //need to send ack for messages delivered to consumers so far
-            for(Iterator i = _consumers.values().iterator(); i.hasNext();)
+            for (Iterator i = _consumers.values().iterator(); i.hasNext();)
             {
                 //Sends acknowledgement to server
                 ((BasicMessageConsumer) i.next()).acknowledgeLastDelivered();
@@ -434,7 +449,7 @@
         }
         catch (AMQException e)
         {
-            throw (JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
+            throw(JMSException) (new JMSException("Failed to rollback: " + e).initCause(e));
         }
     }
 
@@ -442,7 +457,7 @@
     {
         // We must close down all producers and consumers in an orderly fashion. This is the only method
         // that can be called from a different thread of control from the one controlling the session
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             _closed.set(true);
 
@@ -472,6 +487,7 @@
 
     /**
      * Close all producers or consumers. This is called either in the error case or when closing the session normally.
+     *
      * @param amqe the exception, may be null to indicate no error has occurred
      */
     private void closeProducersAndConsumers(AMQException amqe)
@@ -497,11 +513,12 @@
     /**
      * Called when the server initiates the closure of the session
      * unilaterally.
+     *
      * @param e the exception that caused this session to be closed. Null causes the
      */
     public void closed(Throwable e)
     {
-        synchronized (_connection.getFailoverMutex())
+        synchronized(_connection.getFailoverMutex())
         {
             // An AMQException has an error code and message already and will be passed in when closure occurs as a
             // result of a channel close request
@@ -523,7 +540,7 @@
     /**
      * Called to mark the session as being closed. Useful when the session needs to be made invalid, e.g. after
      * failover when the client has veoted resubscription.
-     *
+     * <p/>
      * The caller of this method must already hold the failover mutex.
      */
     void markClosed()
@@ -575,7 +592,8 @@
 
     /**
      * Called to close message consumers cleanly. This may or may <b>not</b> be as a result of an error.
-      * @param error not null if this is a result of an error occurring at the connection level
+     *
+     * @param error not null if this is a result of an error occurring at the connection level
      */
     private void closeConsumers(Throwable error) throws JMSException
     {
@@ -624,6 +642,7 @@
 
     /**
      * Asks the broker to resend all unacknowledged messages for the session.
+     *
      * @throws JMSException
      */
     public void recover() throws JMSException
@@ -692,27 +711,27 @@
             {
                 checkNotClosed();
 
-                return new BasicMessageProducer(_connection, (AMQDestination)destination, _transacted, _channelId,
-                                                    AMQSession.this, _connection.getProtocolHandler(),
-                                                    getNextProducerId(), immediate, mandatory, waitUntilSent);
+                return new BasicMessageProducer(_connection, (AMQDestination) destination, _transacted, _channelId,
+                                                AMQSession.this, _connection.getProtocolHandler(),
+                                                getNextProducerId(), immediate, mandatory, waitUntilSent);
             }
         }.execute(_connection);
     }
 
     public MessageConsumer createConsumer(Destination destination) throws JMSException
     {
-        return createConsumer(destination, _defaultPrefetch, false, false, null);
+        return createConsumer(destination, _defaultPrefetchHighMark, false, false, null);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException
     {
-        return createConsumer(destination, _defaultPrefetch, false, false, messageSelector);
+        return createConsumer(destination, _defaultPrefetchHighMark, false, false, messageSelector);
     }
 
     public MessageConsumer createConsumer(Destination destination, String messageSelector, boolean noLocal)
             throws JMSException
     {
-        return createConsumer(destination, _defaultPrefetch, noLocal, false, messageSelector);
+        return createConsumer(destination, _defaultPrefetchHighMark, noLocal, false, messageSelector);
     }
 
     public MessageConsumer createConsumer(Destination destination,
@@ -748,7 +767,7 @@
             {
                 checkNotClosed();
 
-                AMQDestination amqd = (AMQDestination)destination;
+                AMQDestination amqd = (AMQDestination) destination;
 
                 final AMQProtocolHandler protocolHandler = _connection.getProtocolHandler();
                 // TODO: construct the rawSelector from the selector string if rawSelector == null
@@ -804,6 +823,7 @@
 
     /**
      * Declare the queue.
+     *
      * @param amqd
      * @param protocolHandler
      * @return the queue name. This is useful where the broker is generating a queue name on behalf of the client.
@@ -814,7 +834,7 @@
         // For queues (but not topics) we generate the name in the client rather than the
         // server. This allows the name to be reused on failover if required. In general,
         // the destination indicates whether it wants a name generated or not.
-        if(amqd.isNameRequired())
+        if (amqd.isNameRequired())
         {
             amqd.setQueueName(protocolHandler.generateQueueName());
         }
@@ -838,6 +858,7 @@
 
     /**
      * Register to consume from the queue.
+     *
      * @param queueName
      * @return the consumer tag generated by the broker
      */
@@ -864,9 +885,11 @@
         }
         else
         {
-            try{
+            try
+            {
                 return new AMQQueue(new AMQBindingURL(queueName));
-            }catch(URLSyntaxException urlse)
+            }
+            catch (URLSyntaxException urlse)
             {
                 JMSException jmse = new JMSException(urlse.getReason());
                 jmse.setLinkedException(urlse);
@@ -893,13 +916,14 @@
 
     public Topic createTopic(String topicName) throws JMSException
     {
-         if (topicName.indexOf('/') == -1)
+        if (topicName.indexOf('/') == -1)
         {
-           return new AMQTopic(topicName);
+            return new AMQTopic(topicName);
         }
         else
         {
-            try{
+            try
+            {
                 return new AMQTopic(new AMQBindingURL(topicName));
             }
             catch (URLSyntaxException urlse)
@@ -1015,9 +1039,10 @@
      * Acknowledge a message or several messages. This method can be called via AbstractJMSMessage or from
      * a BasicConsumer. The former where the mode is CLIENT_ACK and the latter where the mode is
      * AUTO_ACK or similar.
+     *
      * @param deliveryTag the tag of the last message to be acknowledged
-     * @param multiple if true will acknowledge all messages up to and including the one specified by the
-     * delivery tag
+     * @param multiple    if true will acknowledge all messages up to and including the one specified by the
+     *                    delivery tag
      */
     public void acknowledgeMessage(long deliveryTag, boolean multiple)
     {
@@ -1031,7 +1056,17 @@
 
     public int getDefaultPrefetch()
     {
-        return _defaultPrefetch;
+        return _defaultPrefetchHighMark;
+    }
+
+    public int getDefaultPrefetchHigh()
+    {
+        return _defaultPrefetchHighMark;
+    }
+
+    public int getDefaultPrefetchLow()
+    {
+        return _defaultPrefetchLowMark;
     }
 
     public int getChannelId()
@@ -1041,7 +1076,7 @@
 
     void start()
     {
-        if(_dispatcher != null)
+        if (_dispatcher != null)
         {
             //then we stopped this and are restarting, so signal server to resume delivery
             unsuspendChannel();
@@ -1056,7 +1091,7 @@
         //stop the server delivering messages to this session
         suspendChannel();
 
-        //stop the dispatcher thread
+//stop the dispatcher thread
         _stopped.set(true);
     }
 
@@ -1067,6 +1102,7 @@
 
     /**
      * Callers must hold the failover mutex before calling this method.
+     *
      * @param consumer
      * @throws AMQException
      */
@@ -1083,7 +1119,7 @@
         bindQueue(amqd, queueName, protocolHandler, consumer.getRawSelectorFieldTable());
 
         String consumerTag = consumeFromQueue(queueName, protocolHandler, consumer.getPrefetch(), consumer.isNoLocal(),
-                                       consumer.isExclusive(), consumer.getAcknowledgeMode());
+                                              consumer.isExclusive(), consumer.getAcknowledgeMode());
 
         consumer.setConsumerTag(consumerTag);
         _consumers.put(consumerTag, consumer);
@@ -1092,6 +1128,7 @@
     /**
      * Called by the MessageConsumer when closing, to deregister the consumer from the
      * map from consumerTag to consumer instance.
+     *
      * @param consumerTag the consumer tag, that was broker-generated
      */
     void deregisterConsumer(String consumerTag)
@@ -1116,6 +1153,7 @@
 
     /**
      * Resubscribes all producers and consumers. This is called when performing failover.
+     *
      * @throws AMQException
      */
     void resubscribe() throws AMQException

Modified: incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Tue Oct 17 13:45:52 2006
@@ -28,7 +28,6 @@
  * <p/>
  * This implementation is <b>only</b> safe where we have a single thread adding
  * items and a single (different) thread removing items.
- *
  */
 public class FlowControllingBlockingQueue
 {
@@ -37,7 +36,8 @@
      */
     private final BlockingQueue _queue = new LinkedBlockingQueue();
 
-    private final int _flowControlThreshold;
+    private final int _flowControlHighThreshold;
+    private final int _flowControlLowThreshold;
 
     private final ThresholdListener _listener;
 
@@ -56,7 +56,13 @@
 
     public FlowControllingBlockingQueue(int threshold, ThresholdListener listener)
     {
-        _flowControlThreshold = threshold;
+        this(threshold, threshold, listener);
+    }
+
+    public FlowControllingBlockingQueue(int highThreshold, int lowThreshold, ThresholdListener listener)
+    {
+        _flowControlHighThreshold = highThreshold;
+        _flowControlLowThreshold = lowThreshold;
         _listener = listener;
     }
 
@@ -67,7 +73,7 @@
         {
             synchronized(_listener)
             {
-                if (--_count == (_flowControlThreshold - 1))
+                if (_count-- == _flowControlLowThreshold)
                 {
                     _listener.underThreshold(_count);
                 }
@@ -83,7 +89,7 @@
         {
             synchronized(_listener)
             {
-                if (++_count == _flowControlThreshold)
+                if (++_count == _flowControlHighThreshold)
                 {
                     _listener.aboveThreshold(_count);
                 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Connection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Connection.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Connection.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Connection.java Tue Oct 17 13:45:52 2006
@@ -30,19 +30,37 @@
 
     /**
      * Get the connection listener that has been registered with this connection, if any
+     *
      * @return the listener or null if none has been set
      */
     ConnectionListener getConnectionListener();
 
     /**
      * Create a session specifying the prefetch limit of messages.
+     *
      * @param transacted
      * @param acknowledgeMode
-     * @param prefetch the maximum number of messages to buffer in the client. This
-     * applies as a total across all consumers
+     * @param prefetch        the maximum number of messages to buffer in the client. This
+     *                        applies as a total across all consumers
      * @return
      * @throws JMSException
      */
     org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode,
-                                          int prefetch) throws JMSException;
+                                              int prefetch) throws JMSException;
+
+
+    /**
+     * Create a session specifying the prefetch limit of messages.
+     *
+     * @param transacted
+     * @param acknowledgeMode
+     * @param prefetchHigh    the maximum number of messages to buffer in the client.
+     *                        This applies as a total across all consumers
+     * @param prefetchLow     the number of messages that must be in the buffer in the client to renable message flow.
+     *                        This applies as a total across all consumers
+     * @return
+     * @throws JMSException
+     */
+    org.apache.qpid.jms.Session createSession(boolean transacted, int acknowledgeMode,
+                                              int prefetchHigh, int prefetchLow) throws JMSException;
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Session.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Session.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Session.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/org/apache/qpid/jms/Session.java Tue Oct 17 13:45:52 2006
@@ -48,6 +48,16 @@
     int getDefaultPrefetch();
 
     /**
+     * @return the High water prefetch value used by default for consumers created on this session.
+     */
+    int getDefaultPrefetchHigh();
+
+    /**
+     * @return the Low water prefetch value used by default for consumers created on this session.
+     */
+    int getDefaultPrefetchLow();
+
+    /**
      * Create a producer
      * @param destination
      * @param mandatory the value of the mandatory flag used by default on the producer

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/client/AllClientUnitTests.java Tue Oct 17 13:45:52 2006
@@ -32,7 +32,8 @@
         org.apache.qpid.client.message.UnitTests.class,
         org.apache.qpid.forwardall.UnitTests.class,
         org.apache.qpid.destinationurl.UnitTests.class,
-        org.apache.qpid.jndi.referenceabletest.UnitTests.class
+        org.apache.qpid.jndi.referenceabletest.UnitTests.class,
+        org.apache.qpid.transacted.UnitTests.class
         })
 public class AllClientUnitTests
 {

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/flow/ChannelFlowTest.java Tue Oct 17 13:45:52 2006
@@ -43,7 +43,7 @@
 
     ChannelFlowTest(AMQConnection connection, AMQDestination destination) throws Exception
     {
-        AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50);
+        AMQSession session = (AMQSession) connection.createSession(false, AMQSession.NO_ACKNOWLEDGE, 50,25);
 
         //set up a slow consumer
         session.createConsumer(destination).setMessageListener(this);

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Bind.java Tue Oct 17 13:45:52 2006
@@ -33,7 +33,7 @@
  * Binds a reference from a JNDI source.
  * Given a properties file with the JNDI information and a binding string.
  */
-class Bind
+public class Bind
 {
     private static final String USAGE="USAGE: java bind <JNDI Properties file> -cf <url> <binding> | -c <url> <binding> [-t <topic Name> <binding>] [-q <queue Name> <binding>]";
     public Bind(String propertiesFile, String bindingURL, Referenceable reference) throws NameAlreadyBoundException, NoInitialContextException

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceable/Unbind.java Tue Oct 17 13:45:52 2006
@@ -28,11 +28,11 @@
  * Unbinds a reference from a JNDI source.
  * Given a properties file with the JNDI information and a binding string.
  */
-class UnBind
+public class Unbind
 {
     private static final String USAGE = "USAGE: java unbind <JNDI Properties file> -b <binding>";
 
-    public UnBind(String propertiesFile, String bindingValue) throws NamingException
+    public Unbind(String propertiesFile, String bindingValue) throws NamingException
     {
         // Set up the environment for creating the initial context
         String qpid_home = System.getProperty("QPID_HOME");
@@ -143,7 +143,7 @@
                     System.out.print("UnBinding:" + binding);
                     try
                     {
-                        new UnBind(args[0], binding);
+                        new Unbind(args[0], binding);
                         System.out.println(" ..Successful");
                     }
                     catch (NamingException nabe)

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Bind.java Tue Oct 17 13:45:52 2006
@@ -18,15 +18,22 @@
 package org.apache.qpid.jndi.referenceabletest;
 
 import org.apache.qpid.client.AMQConnection;
-import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.client.AMQConnectionFactory;
-import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.url.URLSyntaxException;
 import org.junit.Assert;
 
-import javax.jms.*;
-import javax.naming.*;
-
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
+import javax.jms.JMSException;
+import javax.jms.Session;
+import javax.jms.Topic;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameAlreadyBoundException;
+import javax.naming.NamingException;
+import javax.naming.NoInitialContextException;
+import java.io.File;
 import java.util.Hashtable;
 
 /**
@@ -41,11 +48,12 @@
  */
 class Bind
 {
+    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest";
+    public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
 
     String _connectionFactoryString = "";
 
-    String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='tcp://localhost:5672'";
-
+    String _connectionString = "amqp://guest:guest@clientid/testpath?brokerlist='vm://:1'";
     Topic _topic = null;
 
     boolean _bound = false;
@@ -54,13 +62,35 @@
     {
         this(false);
     }
+
     public Bind(boolean output) throws NameAlreadyBoundException, NoInitialContextException
     {
         // Set up the environment for creating the initial context
         Hashtable env = new Hashtable(11);
         env.put(Context.INITIAL_CONTEXT_FACTORY,
                 "com.sun.jndi.fscontext.RefFSContextFactory");
-        env.put(Context.PROVIDER_URL, "file:/temp/qpid-jndi-test");
+        env.put(Context.PROVIDER_URL, PROVIDER_URL);
+
+
+        File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
+
+        if (file.exists() && !file.isDirectory())
+        {
+            System.out.println("Couldn't make directory file already exists");
+            return;
+        }
+        else
+        {
+            if (!file.exists())
+            {
+                if (!file.mkdirs())
+                {
+                    System.out.println("Couldn't make directory");
+                    return;
+                }
+            }
+        }
+
 
         try
         {
@@ -93,7 +123,6 @@
                 Session session = connection.createSession(false, Session.AUTO_ACKNOWLEDGE);
 
                 _topic = session.createTopic("Fruity");
-
             }
             catch (JMSException jmse)
             {
@@ -129,15 +158,14 @@
             System.out.println("Operation failed: " + e);
             if (e instanceof NameAlreadyBoundException)
             {
-                throw (NameAlreadyBoundException) e;
+                throw(NameAlreadyBoundException) e;
             }
 
             if (e instanceof NoInitialContextException)
             {
-                throw (NoInitialContextException) e;
+                throw(NoInitialContextException) e;
             }
         }
-
     }
 
     public String connectionFactoryValue()

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/JNDIReferenceableTest.java Tue Oct 17 13:45:52 2006
@@ -45,12 +45,9 @@
         Bind b = null;
         try
         {
-
-
             try
             {
                 b = new Bind();
-
             }
             catch (NameAlreadyBoundException e)
             {
@@ -69,7 +66,6 @@
                 {
                     Assert.fail("Unable to clear bound objects for test.");
                 }
-
             }
         }
         catch (NoInitialContextException e)
@@ -99,10 +95,8 @@
 
     }
 
-    public static junit.framework.Test suite
-            ()
+    public static junit.framework.Test suite()
     {
         return new JUnit4TestAdapter(JNDIReferenceableTest.class);
     }
-
 }

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Lookup.java Tue Oct 17 13:45:52 2006
@@ -17,11 +17,14 @@
  */
 package org.apache.qpid.jndi.referenceabletest;
 
-import org.apache.qpid.client.AMQTopic;
 import org.apache.qpid.client.AMQConnection;
 import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQTopic;
 
-import javax.naming.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import java.io.File;
 import java.util.Hashtable;
 
 
@@ -37,6 +40,9 @@
  */
 class Lookup
 {
+    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest";
+    public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
+
     AMQTopic _topic = null;
     AMQConnection _connection = null;
     AMQConnectionFactory _connectionFactory = null;
@@ -47,7 +53,26 @@
         Hashtable env = new Hashtable(11);
         env.put(Context.INITIAL_CONTEXT_FACTORY,
                 "com.sun.jndi.fscontext.RefFSContextFactory");
-        env.put(Context.PROVIDER_URL, "file:/temp/qpid-jndi-test");
+        env.put(Context.PROVIDER_URL, PROVIDER_URL);
+
+        File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
+
+        if (file.exists() && !file.isDirectory())
+        {
+            System.out.println("Couldn't make directory file already exists");
+            return;
+        }
+        else
+        {
+            if (!file.exists())
+            {
+                if (!file.mkdirs())
+                {
+                    System.out.println("Couldn't make directory");
+                    return;
+                }
+            }
+        }
 
         try
         {
@@ -70,10 +95,9 @@
         }
     }
 
-
     public String connectionFactoryValue()
     {
-        return ((AMQConnection) _connectionFactory.getConnectionURL()).toURL();
+        return _connectionFactory.getConnectionURL().toString();
     }
 
     public String connectionValue()
@@ -85,7 +109,6 @@
     {
         return _topic.toURL();
     }
-
 
     public static void main(String[] args)
     {

Modified: incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/test/src/org/apache/qpid/jndi/referenceabletest/Unbind.java Tue Oct 17 13:45:52 2006
@@ -17,7 +17,10 @@
  */
 package org.apache.qpid.jndi.referenceabletest;
 
-import javax.naming.*;
+import javax.naming.Context;
+import javax.naming.InitialContext;
+import javax.naming.NameNotFoundException;
+import javax.naming.NamingException;
 import java.io.File;
 import java.util.Hashtable;
 
@@ -33,7 +36,8 @@
  */
 class Unbind
 {
-
+    public static final String DEFAULT_PROVIDER_FILE_PATH = System.getProperty("java.io.tmpdir") + "/JNDITest";
+    public static final String PROVIDER_URL = "file://" + DEFAULT_PROVIDER_FILE_PATH;
 
     boolean _unbound = false;
 
@@ -49,7 +53,26 @@
         Hashtable env = new Hashtable(11);
         env.put(Context.INITIAL_CONTEXT_FACTORY,
                 "com.sun.jndi.fscontext.RefFSContextFactory");
-        env.put(Context.PROVIDER_URL, "file:/temp/qpid-jndi-test");
+        env.put(Context.PROVIDER_URL, PROVIDER_URL);
+
+        File file = new File(PROVIDER_URL.substring(PROVIDER_URL.indexOf("://") + 3));
+
+        if (file.exists() && !file.isDirectory())
+        {
+            System.out.println("Couldn't make directory file already exists");
+            return;
+        }
+        else
+        {
+            if (!file.exists())
+            {
+                if (!file.mkdirs())
+                {
+                    System.out.println("Couldn't make directory");
+                    return;
+                }
+            }
+        }
 
         try
         {

Modified: incubator/qpid/branches/new_persistence/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java (original)
+++ incubator/qpid/branches/new_persistence/java/cluster/test/org/apache/qpid/server/cluster/TestSession.java Tue Oct 17 13:45:52 2006
@@ -29,6 +29,11 @@
         return null;  //TODO
     }
 
+    public IoServiceConfig getServiceConfig()
+    {
+        return null;  //TODO        
+    }
+
     public IoHandler getHandler()
     {
         return null;  //TODO

Modified: incubator/qpid/branches/new_persistence/python/cpp_failing.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/python/cpp_failing.txt?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/python/cpp_failing.txt (original)
+++ incubator/qpid/branches/new_persistence/python/cpp_failing.txt Tue Oct 17 13:45:52 2006
@@ -0,0 +1,2 @@
+tests.tx.TxTests.test_commit
+tests.tx.TxTests.test_rollback

Modified: incubator/qpid/branches/new_persistence/python/java_failing.txt
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/python/java_failing.txt?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/python/java_failing.txt (original)
+++ incubator/qpid/branches/new_persistence/python/java_failing.txt Tue Oct 17 13:45:52 2006
@@ -1,9 +1,23 @@
+tests.basic.BasicTests.test_cancel
 tests.basic.BasicTests.test_consume_exclusive
 tests.basic.BasicTests.test_consume_no_local
 tests.basic.BasicTests.test_consume_queue_errors
 tests.basic.BasicTests.test_consume_unique_consumers
+tests.basic.BasicTests.test_get
+tests.basic.BasicTests.test_qos_prefetch_size
+tests.basic.BasicTests.test_recover_requeue
+tests.exchange.ExchangeTests
+tests.exchange.DefaultExchangeRuleTests.testDefaultExchange
+tests.exchange.HeadersExchangeTests.testMatchAll
+tests.exchange.HeadersExchangeTests.testMatchAny
+tests.exchange.RecommendedTypesRuleTests.testDirect
 tests.exchange.RecommendedTypesRuleTests.testFanout
+tests.exchange.RecommendedTypesRuleTests.testHeaders
+tests.exchange.RecommendedTypesRuleTests.testTopic
+tests.exchange.RequiredInstancesRuleTests.testAmqDirect
 tests.exchange.RequiredInstancesRuleTests.testAmqFanOut
+tests.exchange.RequiredInstancesRuleTests.testAmqMatch
+tests.exchange.RequiredInstancesRuleTests.testAmqTopic
 tests.queue.QueueTests.test_declare_exclusive
 tests.queue.QueueTests.test_declare_passive
 tests.queue.QueueTests.test_delete_ifempty
@@ -11,3 +25,5 @@
 tests.queue.QueueTests.test_delete_simple
 tests.queue.QueueTests.test_purge
 tests.queue.QueueTests.test_bind
+tests.testlib.TestBaseTest.testMessageProperties
+tests.broker.BrokerTests.test_invalid_channel

Modified: incubator/qpid/branches/new_persistence/python/qpid/spec.py
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/python/qpid/spec.py?view=diff&rev=465038&r1=465037&r2=465038
==============================================================================
--- incubator/qpid/branches/new_persistence/python/qpid/spec.py (original)
+++ incubator/qpid/branches/new_persistence/python/qpid/spec.py Tue Oct 17 13:45:52 2006
@@ -204,6 +204,8 @@
     code += "  return self.invoke(%s" % Method.METHOD
     if argnames:
       code += ", (%s,)" % argnames
+    else:
+      code += ", ()" 
     if self.content:
       code += ", content"
     code += ")"