You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2006/12/21 22:08:41 UTC

svn commit: r489461 [2/5] - in /incubator/qpid/branches/new_persistence/java: broker/src/main/java/org/apache/qpid/server/exchange/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/server/protocol/ broker/src/ma...

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryQueue.java Thu Dec 21 13:08:38 2006
@@ -26,25 +26,45 @@
 /**
  * AMQ implementation of a TemporaryQueue.
  */
-final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue
+final class AMQTemporaryQueue extends AMQQueue implements TemporaryQueue, TemporaryDestination
 {
 
 
+    private final AMQSession _session;
+    private boolean _deleted;
+
     /**
      * Create a new instance of an AMQTemporaryQueue
      */
-    public AMQTemporaryQueue()
+    public AMQTemporaryQueue(AMQSession session)
     {
         super("TempQueue" + Long.toString(System.currentTimeMillis()), true);
+        _session = session;
     }
 
     /**
      * @see javax.jms.TemporaryQueue#delete()
      */
-    public void delete() throws JMSException
+    public synchronized void delete() throws JMSException
     {
-        throw new UnsupportedOperationException("Delete not supported, " +
-                                                "will auto-delete when connection closed");
+        if(_session.hasConsumer(this))
+        {
+            throw new JMSException("Temporary Queue has consumers so cannot be deleted");
+        }
+        _deleted = true;
+
+        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+        // by the server when there are no more subscriptions to that queue.  This is probably not
+        // quite right for JMSCompliance.
     }
 
+    public AMQSession getSession()
+    {
+        return _session;
+    }
+
+    public boolean isDeleted()
+    {
+        return _deleted;
+    }
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTemporaryTopic.java Thu Dec 21 13:08:38 2006
@@ -26,15 +26,18 @@
 /**
  * AMQ implementation of TemporaryTopic.
  */
-class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic
+class AMQTemporaryTopic extends AMQTopic implements TemporaryTopic, TemporaryDestination
 {
 
+    private final AMQSession _session;
+    private boolean _deleted;
     /**
      * Create new temporary topic.
      */
-    public AMQTemporaryTopic()
+    public AMQTemporaryTopic(AMQSession session)
     {
         super("TempQueue" + Long.toString(System.currentTimeMillis()));
+        _session = session;
     }
 
     /**
@@ -42,8 +45,25 @@
      */
     public void delete() throws JMSException
     {
-        throw new UnsupportedOperationException("Delete not supported, " +
-                "will auto-delete when connection closed");
+        if(_session.hasConsumer(this))
+        {
+            throw new JMSException("Temporary Topic has consumers so cannot be deleted");
+        }
+
+        _deleted = true;
+        // Currently TemporaryQueue is set to be auto-delete which means that the queue will be deleted
+        // by the server when there are no more subscriptions to that queue.  This is probably not
+        // quite right for JMSCompliance.
+    }
+
+    public AMQSession getSession()
+    {
+        return _session;
+    }
+
+    public boolean isDeleted()
+    {
+        return _deleted;
     }
 
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/AMQTopic.java Thu Dec 21 13:08:38 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -40,20 +40,25 @@
 
     public AMQTopic(String name)
     {
-        super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, true, null);
-        _isDurable = false;
+        this(name, true, null, false);
     }
 
-    /**
-     * Constructor for use in creating a topic to represent a durable subscription
-     * @param topic
-     * @param clientId
-     * @param subscriptionName
-     */
-    public AMQTopic(AMQTopic topic, String clientId, String subscriptionName)
+    public AMQTopic(String name, boolean isAutoDelete, String queueName, boolean isDurable)
+    {
+        super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, name, true, isAutoDelete,
+              queueName, isDurable);
+    }
+
+    public static AMQTopic createDurableTopic(AMQTopic topic, String subscriptionName, AMQConnection connection)
+            throws JMSException
+    {
+        return new AMQTopic(topic.getDestinationName(), false, getDurableTopicQueueName(subscriptionName, connection),
+                            true);
+    }
+
+    public static String getDurableTopicQueueName(String subscriptionName, AMQConnection connection) throws JMSException
     {
-        super(ExchangeDefaults.TOPIC_EXCHANGE_NAME, ExchangeDefaults.TOPIC_EXCHANGE_CLASS, topic.getDestinationName(), true, false, clientId + ":" + subscriptionName);
-        _isDurable = true;
+        return connection.getClientID() + ":" + subscriptionName;
     }
 
     public String getTopicName() throws JMSException

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Thu Dec 21 13:08:38 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -214,10 +214,10 @@
             {
                 //handle case where connection has already been started, and the dispatcher is blocked
                 //doing a put on the _synchronousQueue
-                Object msg = _synchronousQueue.poll();
-                if (msg != null)
+                AbstractJMSMessage jmsMsg = (AbstractJMSMessage)_synchronousQueue.poll();
+                if (jmsMsg != null)
                 {
-                    AbstractJMSMessage jmsMsg = (AbstractJMSMessage) msg;
+                    _session.setLastDeliveredMessage(jmsMsg);
                     messageListener.onMessage(jmsMsg);
                     postDeliver(jmsMsg);
                 }
@@ -280,7 +280,7 @@
     public Message receive(long l) throws JMSException
     {
     	checkPreConditions();
-
+        
         acquireReceiving();
 
         try
@@ -297,12 +297,15 @@
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
+                _session.setLastDeliveredMessage(m);
                 postDeliver(m);
             }
+            
             return m;
         }
         catch (InterruptedException e)
         {
+            _logger.warn("Interrupted: " + e, e);
             return null;
         }
         finally
@@ -323,8 +326,10 @@
             final AbstractJMSMessage m = returnMessageOrThrow(o);
             if (m != null)
             {
+                _session.setLastDeliveredMessage(m);
                 postDeliver(m);
             }
+
             return m;
         }
         finally
@@ -423,6 +428,7 @@
             {
                 //we do not need a lock around the test above, and the dispatch below as it is invalid
                 //for an application to alter an installed listener while the session is started
+                _session.setLastDeliveredMessage(jmsMessage);
                 getMessageListener().onMessage(jmsMessage);
                 postDeliver(jmsMessage);
             }
@@ -459,8 +465,9 @@
         }
     }
 
-    private void postDeliver(AbstractJMSMessage msg)
+    private void postDeliver(AbstractJMSMessage msg) throws JMSException
     {
+    	msg.setJMSDestination(_destination);
         switch (_acknowledgeMode)
         {
             case Session.DUPS_OK_ACKNOWLEDGE:
@@ -522,7 +529,7 @@
      */
     private void deregisterConsumer()
     {
-    	_session.deregisterConsumer(_consumerTag);
+    	_session.deregisterConsumer(this);
     }
 
     public String getConsumerTag()
@@ -531,18 +538,18 @@
     }
 
     public void setConsumerTag(String consumerTag)
-    {    	
+    {
         _consumerTag = consumerTag;
     }
 
 	public AMQSession getSession() {
 		return _session;
 	}
-	
+
 	private void checkPreConditions() throws JMSException{
-    	
+
 		this.checkNotClosed();
-		
+
 		if(_session == null || _session.isClosed()){
 			throw new javax.jms.IllegalStateException("Invalid Session");
 		}

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Thu Dec 21 13:08:38 2006
@@ -24,11 +24,13 @@
 import org.apache.mina.common.ByteBuffer;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.message.JMSBytesMessage;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.framing.*;
 
 import javax.jms.*;
 import java.io.UnsupportedEncodingException;
+import java.util.Enumeration;
 
 public class BasicMessageProducer extends Closeable implements org.apache.qpid.jms.MessageProducer
 {
@@ -140,7 +142,7 @@
 
     public void setDisableMessageID(boolean b) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         checkNotClosed();
         // IGNORED
     }
@@ -154,7 +156,7 @@
 
     public void setDisableMessageTimestamp(boolean b) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         _disableTimestamps = b;
     }
 
@@ -166,11 +168,11 @@
 
     public void setDeliveryMode(int i) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         if (i != DeliveryMode.NON_PERSISTENT && i != DeliveryMode.PERSISTENT)
         {
             throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i +
-                                   " is illegal");
+                    " is illegal");
         }
         _deliveryMode = i;
     }
@@ -183,7 +185,7 @@
 
     public void setPriority(int i) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         if (i < 0 || i > 9)
         {
             throw new IllegalArgumentException("Priority of " + i + " is illegal. Value must be in range 0 to 9");
@@ -199,7 +201,7 @@
 
     public void setTimeToLive(long l) throws JMSException
     {
-    	checkPreConditions();
+        checkPreConditions();
         if (l < 0)
         {
             throw new IllegalArgumentException("Time to live must be non-negative - supplied value was " + l);
@@ -227,33 +229,36 @@
 
     public void send(Message message) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
+
+
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+            sendImpl(_destination, message, _deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, _immediate);
         }
     }
 
     public void send(Message message, int deliveryMode) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
+
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, _immediate);
         }
     }
 
     public void send(Message message, int deliveryMode, boolean immediate) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage) message, deliveryMode, _messagePriority, _timeToLive,
+            sendImpl(_destination, message, deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, immediate);
         }
     }
@@ -261,23 +266,23 @@
     public void send(Message message, int deliveryMode, int priority,
                      long timeToLive) throws JMSException
     {
-    	checkPreConditions();
-    	checkInitialDestination();
+        checkPreConditions();
+        checkInitialDestination();
         synchronized (_connection.getFailoverMutex())
         {
-            sendImpl(_destination, (AbstractJMSMessage)message, deliveryMode, priority, timeToLive, _mandatory,
+            sendImpl(_destination, message, deliveryMode, priority, timeToLive, _mandatory,
                      _immediate);
         }
     }
 
     public void send(Destination destination, Message message) throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, _deliveryMode, _messagePriority, _timeToLive,
+            sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive,
                      _mandatory, _immediate);
         }
     }
@@ -286,12 +291,12 @@
                      int priority, long timeToLive)
             throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      _mandatory, _immediate);
         }
     }
@@ -305,7 +310,7 @@
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      mandatory, _immediate);
         }
     }
@@ -314,12 +319,12 @@
                      int priority, long timeToLive, boolean mandatory, boolean immediate)
             throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      mandatory, immediate);
         }
     }
@@ -329,27 +334,158 @@
                      boolean immediate, boolean waitUntilSent)
             throws JMSException
     {
-    	checkPreConditions();
-    	checkDestination(destination);
+        checkPreConditions();
+        checkDestination(destination);
         synchronized (_connection.getFailoverMutex())
         {
             validateDestination(destination);
-            sendImpl((AMQDestination) destination, (AbstractJMSMessage) message, deliveryMode, priority, timeToLive,
+            sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive,
                      mandatory, immediate, waitUntilSent);
         }
     }
 
+
+    private AbstractJMSMessage convertToNativeMessage(Message message) throws JMSException
+    {
+        if (message instanceof AbstractJMSMessage)
+        {
+            return (AbstractJMSMessage) message;
+        }
+        else
+        {
+            AbstractJMSMessage newMessage;
+
+            if (message instanceof BytesMessage)
+            {
+                BytesMessage bytesMessage = (BytesMessage) message;
+                bytesMessage.reset();
+
+                JMSBytesMessage nativeMsg = (JMSBytesMessage) _session.createBytesMessage();
+
+
+                byte[] buf = new byte[1024];
+
+                int len;
+
+                while ((len = bytesMessage.readBytes(buf)) != -1)
+                {
+                    nativeMsg.writeBytes(buf, 0, len);
+                }
+
+                newMessage = nativeMsg;
+            }
+            else if (message instanceof MapMessage)
+            {
+                MapMessage origMessage = (MapMessage) message;
+                MapMessage nativeMessage = _session.createMapMessage();
+
+                Enumeration mapNames = origMessage.getMapNames();
+                while (mapNames.hasMoreElements())
+                {
+                    String name = (String) mapNames.nextElement();
+                    nativeMessage.setObject(name, origMessage.getObject(name));
+                }
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else if (message instanceof ObjectMessage)
+            {
+                ObjectMessage origMessage = (ObjectMessage) message;
+                ObjectMessage nativeMessage = _session.createObjectMessage();
+
+                nativeMessage.setObject(origMessage.getObject());
+
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else if (message instanceof TextMessage)
+            {
+                TextMessage origMessage = (TextMessage) message;
+                TextMessage nativeMessage = _session.createTextMessage();
+
+                nativeMessage.setText(origMessage.getText());
+
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else if (message instanceof StreamMessage)
+            {
+                StreamMessage origMessage = (StreamMessage) message;
+                StreamMessage nativeMessage = _session.createStreamMessage();
+
+
+                try
+                {
+                    origMessage.reset();
+                    while (true)
+                    {
+                        nativeMessage.writeObject(origMessage.readObject());
+                    }
+                }
+                catch (MessageEOFException e)
+                {
+                    ;//
+                }
+                newMessage = (AbstractJMSMessage) nativeMessage;
+            }
+            else
+            {
+                newMessage = (AbstractJMSMessage) _session.createMessage();
+
+            }
+
+            Enumeration propertyNames = message.getPropertyNames();
+            while (propertyNames.hasMoreElements())
+            {
+                String propertyName = String.valueOf(propertyNames.nextElement());
+                if (!propertyName.startsWith("JMSX_"))
+                {
+                    Object value = message.getObjectProperty(propertyName);
+                    newMessage.setObjectProperty(propertyName, value);
+                }
+            }
+
+            newMessage.setJMSDeliveryMode(message.getJMSDeliveryMode());
+
+
+            int priority = message.getJMSPriority();
+            if (priority < 0)
+            {
+                priority = 0;
+            }
+            else if (priority > 9)
+            {
+                priority = 9;
+            }
+
+            newMessage.setJMSPriority(priority);
+            if (message.getJMSReplyTo() != null)
+            {
+                newMessage.setJMSReplyTo(message.getJMSReplyTo());
+            }
+            newMessage.setJMSType(message.getJMSType());
+
+
+            if (newMessage != null)
+            {
+                return newMessage;
+            }
+            else
+            {
+                throw new JMSException("Unable to send message, due to class conversion error: " + message.getClass().getName());
+            }
+        }
+    }
+
+
     private void validateDestination(Destination destination) throws JMSException
     {
         if (!(destination instanceof AMQDestination))
         {
             throw new JMSException("Unsupported destination class: " +
-                                   (destination != null ? destination.getClass() : null));
+                    (destination != null ? destination.getClass() : null));
         }
-        declareDestination((AMQDestination)destination);
+        declareDestination((AMQDestination) destination);
     }
 
-    protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+    protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority,
                             long timeToLive, boolean mandatory, boolean immediate) throws JMSException
     {
         sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
@@ -357,8 +493,9 @@
 
     /**
      * The caller of this method must hold the failover mutex.
+     *
      * @param destination
-     * @param message
+     * @param origMessage
      * @param deliveryMode
      * @param priority
      * @param timeToLive
@@ -366,9 +503,12 @@
      * @param immediate
      * @throws JMSException
      */
-    protected void sendImpl(AMQDestination destination, AbstractJMSMessage message, int deliveryMode, int priority,
+    protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority,
                             long timeToLive, boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
+        checkTemporaryDestination(destination);
+
+        AbstractJMSMessage message = convertToNativeMessage(origMessage);
         AMQFrame publishFrame = BasicPublishBody.createAMQFrame(_channelId, 0, destination.getExchangeName(),
                                                                 destination.getRoutingKey(), mandatory, immediate);
 
@@ -424,11 +564,42 @@
         frames[1] = contentHeaderFrame;
         CompositeAMQDataBlock compositeFrame = new CompositeAMQDataBlock(frames);
         _protocolHandler.writeFrame(compositeFrame, wait);
+
+
+        if (message != origMessage)
+        {
+            _logger.warn("Updating original message");
+            origMessage.setJMSPriority(message.getJMSPriority());
+            origMessage.setJMSTimestamp(message.getJMSTimestamp());
+            _logger.warn("Setting JMSExpiration:" + message.getJMSExpiration());
+            origMessage.setJMSExpiration(message.getJMSExpiration());
+            origMessage.setJMSMessageID(message.getJMSMessageID());
+        }
+    }
+
+    private void checkTemporaryDestination(AMQDestination destination) throws JMSException
+    {
+        if(destination instanceof TemporaryDestination)
+        {
+            _logger.debug("destination is temporary destination");
+            TemporaryDestination tempDest = (TemporaryDestination) destination;
+            if(tempDest.getSession().isClosed())
+            {
+                _logger.debug("session is closed");
+                throw new JMSException("Session for temporary destination has been closed");
+            }
+            if(tempDest.isDeleted())
+            {
+                _logger.debug("destination is deleted");
+                throw new JMSException("Cannot send to a deleted temporary destination");
+            }
+        }
     }
 
     /**
      * Create content bodies. This will split a large message into numerous bodies depending on the negotiated
      * maximum frame size.
+     *
      * @param payload
      * @return the array of content bodies
      */
@@ -458,8 +629,8 @@
             for (int i = 0; i < bodies.length; i++)
             {
                 bodies[i] = new ContentBody();
-                payload.position((int)framePayloadMax * i);
-                int length = (remaining >= framePayloadMax) ? (int)framePayloadMax : (int)remaining;
+                payload.position((int) framePayloadMax * i);
+                int length = (remaining >= framePayloadMax) ? (int) framePayloadMax : (int) remaining;
                 payload.limit(payload.position() + length);
                 bodies[i].payload = payload.slice();
                 remaining -= length;
@@ -480,32 +651,42 @@
         _encoding = encoding;
     }
 
-	private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException {
-		checkNotClosed();
+    private void checkPreConditions() throws javax.jms.IllegalStateException, JMSException
+    {
+        checkNotClosed();
+
+        if (_session == null || _session.isClosed())
+        {
+            throw new javax.jms.IllegalStateException("Invalid Session");
+        }
+    }
+
+    private void checkInitialDestination()
+    {
+        if (_destination == null)
+        {
+            throw new UnsupportedOperationException("Destination is null");
+        }
+    }
+
+    private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException
+    {
+        if (_destination != null && suppliedDestination != null)
+        {
+            throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+        }
+
+        if (suppliedDestination == null)
+        {
+            throw new InvalidDestinationException("Supplied Destination was invalid");
+        }
+
+
+    }
+
 
-		if(_session == null || _session.isClosed()){
-			throw new javax.jms.IllegalStateException("Invalid Session");
-		}
-	}
-
-	private void checkInitialDestination(){
-		if(_destination == null){
-			throw new UnsupportedOperationException("Destination is null");
-		}
-	}
-
-	private void checkDestination(Destination suppliedDestination) throws InvalidDestinationException{
-		if (_destination != null && suppliedDestination != null){
-			throw new UnsupportedOperationException("This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
-		}
-
-		if (suppliedDestination == null){
-			throw new InvalidDestinationException("Supplied Destination was invalid");
-		}
-	}
-
-
-	public AMQSession getSession() {
-		return _session;
-	}
+    public AMQSession getSession()
+    {
+        return _session;
+    }
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/QueueSenderAdapter.java Thu Dec 21 13:08:38 2006
@@ -10,119 +10,124 @@
 
 public class QueueSenderAdapter implements QueueSender {
 
-	private MessageProducer delegate;
-	private Queue queue;
+	private MessageProducer _delegate;
+	private Queue _queue;
 	private boolean closed = false;
 	
 	public QueueSenderAdapter(MessageProducer msgProducer, Queue queue){
-		delegate = msgProducer;
-		this.queue = queue;
+		_delegate = msgProducer;
+		_queue = queue;
 	}
 	
 	public Queue getQueue() throws JMSException {
 		checkPreConditions();
-		return queue;
+		return _queue;
 	}
 
 	public void send(Message msg) throws JMSException {
 		checkPreConditions();
-		delegate.send(msg);
+		_delegate.send(msg);
 	}
 
 	public void send(Queue queue, Message msg) throws JMSException {
-		checkPreConditions();
-		delegate.send(queue, msg);
+		checkPreConditions(queue);
+		_delegate.send(queue, msg);
 	}
 
 	public void publish(Message msg, int deliveryMode, int priority, long timeToLive)
 	throws JMSException {
 		checkPreConditions();
-		delegate.send(msg, deliveryMode,priority,timeToLive);
+		_delegate.send(msg, deliveryMode,priority,timeToLive);
 	}
 
 	public void send(Queue queue,Message msg, int deliveryMode, int priority, long timeToLive)
 			throws JMSException {
-		checkPreConditions();
-		delegate.send(queue,msg, deliveryMode,priority,timeToLive);
+		checkPreConditions(queue);
+		_delegate.send(queue,msg, deliveryMode,priority,timeToLive);
 	}
 	
 	public void close() throws JMSException {
-		delegate.close();
+		_delegate.close();
 		closed = true;
 	}
 
 	public int getDeliveryMode() throws JMSException {
 		checkPreConditions();
-		return delegate.getDeliveryMode();
+		return _delegate.getDeliveryMode();
 	}
 
 	public Destination getDestination() throws JMSException {
 		checkPreConditions();
-		return delegate.getDestination();
+		return _delegate.getDestination();
 	}
 
 	public boolean getDisableMessageID() throws JMSException {
 		checkPreConditions();
-		return delegate.getDisableMessageID();
+		return _delegate.getDisableMessageID();
 	}
 
 	public boolean getDisableMessageTimestamp() throws JMSException {
 		checkPreConditions();
-		return delegate.getDisableMessageTimestamp();
+		return _delegate.getDisableMessageTimestamp();
 	}
 
 	public int getPriority() throws JMSException {
 		checkPreConditions();
-		return delegate.getPriority();
+		return _delegate.getPriority();
 	}
 
 	public long getTimeToLive() throws JMSException {
 		checkPreConditions();
-		return delegate.getTimeToLive();
+		return _delegate.getTimeToLive();
 	}
 
 	public void send(Destination dest, Message msg) throws JMSException {
-		checkPreConditions();
-		delegate.send(dest,msg);
+		checkPreConditions((Queue)dest);
+		_delegate.send(dest,msg);
 	}
 
 	public void send(Message msg, int deliveryMode, int priority, long timeToLive)
 			throws JMSException {
 		checkPreConditions();
-		delegate.send(msg, deliveryMode,priority,timeToLive);
+		_delegate.send(msg, deliveryMode,priority,timeToLive);
 	}
 
 	public void send(Destination dest, Message msg, int deliveryMode, int priority, long timeToLive) throws JMSException {
-		checkPreConditions();
-		delegate.send(dest,msg, deliveryMode,priority,timeToLive);
+		checkPreConditions((Queue)dest);
+		_delegate.send(dest,msg, deliveryMode,priority,timeToLive);
 	}
 
 	public void setDeliveryMode(int deliveryMode) throws JMSException {
 		checkPreConditions();
-		delegate.setDeliveryMode(deliveryMode);
+		_delegate.setDeliveryMode(deliveryMode);
 	}
 
 	public void setDisableMessageID(boolean disableMessageID) throws JMSException {
 		checkPreConditions();
-		delegate.setDisableMessageID(disableMessageID);
+		_delegate.setDisableMessageID(disableMessageID);
 	}
 
 	public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
 		checkPreConditions();
-		delegate.setDisableMessageTimestamp(disableMessageTimestamp);
+		_delegate.setDisableMessageTimestamp(disableMessageTimestamp);
 	}
 
 	public void setPriority(int priority) throws JMSException {
 		checkPreConditions();
-		delegate.setPriority(priority);
+		_delegate.setPriority(priority);
 	}
 
 	public void setTimeToLive(long timeToLive) throws JMSException {
 		checkPreConditions();
-		delegate.setTimeToLive(timeToLive);
+		_delegate.setTimeToLive(timeToLive);
 	}
-	
-	private void checkPreConditions() throws IllegalStateException, IllegalStateException {
+
+    private void checkPreConditions() throws IllegalStateException, IllegalStateException
+    {
+        checkPreConditions(_queue);
+    }
+
+    private void checkPreConditions(Queue queue) throws IllegalStateException, IllegalStateException {
 		if (closed){
 			throw new javax.jms.IllegalStateException("Publisher is closed");
 		}
@@ -131,7 +136,7 @@
 			throw new UnsupportedOperationException("Queue is null");
 		}
 		
-		AMQSession session = ((BasicMessageProducer)delegate).getSession();
+		AMQSession session = ((BasicMessageProducer) _delegate).getSession();
 		
 		if(session == null || session.isClosed()){
 			throw new javax.jms.IllegalStateException("Invalid Session");

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/TopicSubscriberAdaptor.java Thu Dec 21 13:08:38 2006
@@ -35,10 +35,10 @@
 class TopicSubscriberAdaptor implements TopicSubscriber
 {
     private final Topic _topic;
-    private final MessageConsumer _consumer;
+    private final BasicMessageConsumer _consumer;
     private final boolean _noLocal;
 
-    TopicSubscriberAdaptor(Topic topic, MessageConsumer consumer, boolean noLocal)
+    TopicSubscriberAdaptor(Topic topic, BasicMessageConsumer consumer, boolean noLocal)
     {
         _topic = topic;
         _consumer = consumer;
@@ -119,4 +119,10 @@
 			throw new javax.jms.IllegalStateException("Invalid Session");
 		}
 	}
+
+    BasicMessageConsumer getMessageConsumer()
+    {
+        return _consumer;
+    }
+
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/AbstractJMSMessage.java Thu Dec 21 13:08:38 2006
@@ -32,16 +32,18 @@
 import org.apache.qpid.client.JmsNotImplementedException;
 import org.apache.qpid.framing.BasicContentHeaderProperties;
 import org.apache.qpid.framing.FieldTable;
+import org.apache.qpid.framing.JMSPropertyFieldTable;
 
 import javax.jms.Destination;
 import javax.jms.JMSException;
 import javax.jms.MessageNotReadableException;
 import javax.jms.MessageNotWriteableException;
+import javax.jms.MessageFormatException;
 import java.util.Collections;
 import java.util.Enumeration;
 import java.util.Map;
 
-public abstract class AbstractJMSMessage extends AMQMessage implements javax.jms.Message
+public abstract class AbstractJMSMessage extends AMQMessage implements org.apache.qpid.jms.Message
 {
     private static final Map _destinationCache = Collections.synchronizedMap(new ReferenceMap());
 
@@ -50,7 +52,8 @@
     protected ByteBuffer _data;
     private boolean _readableProperties = false;
     private boolean _readableMessage = false;
-
+    private Destination _destination;
+    
     protected AbstractJMSMessage(ByteBuffer data)
     {
         super(new BasicContentHeaderProperties());
@@ -174,12 +177,12 @@
     public Destination getJMSDestination() throws JMSException
     {
         // TODO: implement this once we have sorted out how to figure out the exchange class
-        throw new JmsNotImplementedException();
+    	return _destination;
     }
 
     public void setJMSDestination(Destination destination) throws JMSException
     {
-        throw new JmsNotImplementedException();
+    	_destination = destination;
     }
 
     public int getJMSDeliveryMode() throws JMSException
@@ -234,7 +237,7 @@
 
     public void clearProperties() throws JMSException
     {
-        getJmsContentHeaderProperties().getHeaders().clear();
+        getJmsContentHeaderProperties().getJMSHeaders().clear();
 
         _readableProperties = false;
     }
@@ -249,138 +252,139 @@
     public boolean propertyExists(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().propertyExists(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().propertyExists(propertyName);
     }
 
     public boolean getBooleanProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getBoolean(propertyName);
+
+        return getJmsContentHeaderProperties().getJMSHeaders().getBoolean(propertyName);
     }
 
     public byte getByteProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getByte(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getByte(propertyName);
     }
 
     public short getShortProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getShort(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getShort(propertyName);
     }
 
     public int getIntProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getInteger(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getInteger(propertyName);
     }
 
     public long getLongProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getLong(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getLong(propertyName);
     }
 
     public float getFloatProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getFloat(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getFloat(propertyName);
     }
 
     public double getDoubleProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getDouble(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getDouble(propertyName);
     }
 
     public String getStringProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getString(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getString(propertyName);
     }
 
     public Object getObjectProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        return getJmsContentHeaderProperties().getHeaders().getObject(propertyName);
+        return getJmsContentHeaderProperties().getJMSHeaders().getObject(propertyName);
     }
 
     public Enumeration getPropertyNames() throws JMSException
     {
-        return getJmsContentHeaderProperties().getHeaders().getPropertyNames();
+        return getJmsContentHeaderProperties().getJMSHeaders().getPropertyNames();
     }
 
     public void setBooleanProperty(String propertyName, boolean b) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setBoolean(propertyName, b);
+        getJmsContentHeaderProperties().getJMSHeaders().setBoolean(propertyName, b);
     }
 
     public void setByteProperty(String propertyName, byte b) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setByte(propertyName, new Byte(b));
+        getJmsContentHeaderProperties().getJMSHeaders().setByte(propertyName, new Byte(b));
     }
 
     public void setShortProperty(String propertyName, short i) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setShort(propertyName, new Short(i));
+        getJmsContentHeaderProperties().getJMSHeaders().setShort(propertyName, new Short(i));
     }
 
     public void setIntProperty(String propertyName, int i) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setInteger(propertyName, new Integer(i));
+        getJmsContentHeaderProperties().getJMSHeaders().setInteger(propertyName, new Integer(i));
     }
 
     public void setLongProperty(String propertyName, long l) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setLong(propertyName, new Long(l));
+        getJmsContentHeaderProperties().getJMSHeaders().setLong(propertyName, new Long(l));
     }
 
     public void setFloatProperty(String propertyName, float f) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setFloat(propertyName, new Float(f));
+        getJmsContentHeaderProperties().getJMSHeaders().setFloat(propertyName, new Float(f));
     }
 
     public void setDoubleProperty(String propertyName, double v) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setDouble(propertyName, new Double(v));
+        getJmsContentHeaderProperties().getJMSHeaders().setDouble(propertyName, new Double(v));
     }
 
     public void setStringProperty(String propertyName, String value) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setString(propertyName, value);
+        getJmsContentHeaderProperties().getJMSHeaders().setString(propertyName, value);
     }
 
     public void setObjectProperty(String propertyName, Object object) throws JMSException
     {
         checkWritableProperties();
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().setObject(propertyName, object);
+        getJmsContentHeaderProperties().getJMSHeaders().setObject(propertyName, object);
     }
 
     protected void removeProperty(String propertyName) throws JMSException
     {
         checkPropertyName(propertyName);
-        getJmsContentHeaderProperties().getHeaders().remove(propertyName);
+        getJmsContentHeaderProperties().getJMSHeaders().remove(propertyName);
     }
 
-    public void acknowledge() throws JMSException
+    public void acknowledgeThis() throws JMSException
     {
         // the JMS 1.1 spec says in section 3.6 that calls to acknowledge are ignored when client acknowledge
         // is not specified. In our case, we only set the session field where client acknowledge mode is specified.
@@ -397,6 +401,14 @@
         }
     }
 
+    public void acknowledge() throws JMSException
+    {
+        if(_session != null)
+        {
+            _session.acknowledge();
+        }
+    }
+
 
     /**
      * This forces concrete classes to implement clearBody()
@@ -426,13 +438,13 @@
             buf.append("\nJMS reply to: ").append(String.valueOf(getJMSReplyTo()));
             buf.append("\nAMQ message number: ").append(_deliveryTag);
             buf.append("\nProperties:");
-            if (getJmsContentHeaderProperties().getHeaders().isEmpty())
+            if (getJmsContentHeaderProperties().getJMSHeaders().isEmpty())
             {
                 buf.append("<NONE>");
             }
             else
             {
-                buf.append('\n').append(getJmsContentHeaderProperties().getHeaders());
+                buf.append('\n').append(getJmsContentHeaderProperties().getJMSHeaders());
             }
             return buf.toString();
         }
@@ -462,9 +474,6 @@
         {
             throw new IllegalArgumentException("Property name must not be the empty string");
         }
-
-        // Call to ensure that the it has been set.
-        getJmsContentHeaderProperties().getHeaders();
     }
 
     public BasicContentHeaderProperties getJmsContentHeaderProperties()
@@ -478,14 +487,7 @@
         // position beyond the start
         if (_data != null)
         {
-            if (!_readableMessage)
-            {
-                _data.flip();
-            }
-            else
-            {
-                _data.rewind();
-            }
+            reset();
         }
         return _data;
     }
@@ -524,7 +526,7 @@
         return !_readableMessage;
     }
 
-    public void reset() throws JMSException
+    public void reset() 
     {
         if (_readableMessage)
         {

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSBytesMessage.java Thu Dec 21 13:08:38 2006
@@ -30,6 +30,9 @@
 import javax.jms.MessageEOFException;
 import java.nio.charset.CharacterCodingException;
 import java.nio.charset.Charset;
+import java.nio.charset.CharsetEncoder;
+import java.nio.charset.CharsetDecoder;
+import java.nio.CharBuffer;
 
 public class JMSBytesMessage extends AbstractBytesMessage implements BytesMessage
 {
@@ -149,10 +152,27 @@
         checkReadable();
         // we check only for one byte since theoretically the string could be only a
         // single byte when using UTF-8 encoding
-        checkAvailable(1);
+
         try
         {
-            return _data.getString(Charset.forName("UTF-8").newDecoder());
+            short length = readShort();
+            if(length == 0)
+            {
+                return "";
+            }
+            else
+            {
+                CharsetDecoder decoder = Charset.forName("UTF-8").newDecoder();
+                ByteBuffer encodedString = _data.slice();
+                encodedString.limit(length);
+                _data.position(_data.position()+length);
+                CharBuffer string = decoder.decode(encodedString.buf());
+                
+                return string.toString();
+            }
+
+
+            
         }
         catch (CharacterCodingException e)
         {
@@ -257,9 +277,15 @@
         checkWritable();
         try
         {
-            _data.putString(string, Charset.forName("UTF-8").newEncoder());
+            CharsetEncoder encoder = Charset.forName("UTF-8").newEncoder();
+            java.nio.ByteBuffer encodedString = encoder.encode(CharBuffer.wrap(string));
+            
+            _data.putShort((short)encodedString.limit());
+            _data.put(encodedString);
+
+            //_data.putString(string, Charset.forName("UTF-8").newEncoder());
             // we must add the null terminator manually
-            _data.put((byte)0);
+            //_data.put((byte)0);
         }
         catch (CharacterCodingException e)
         {

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSMapMessage.java Thu Dec 21 13:08:38 2006
@@ -25,6 +25,8 @@
 import org.apache.qpid.framing.FieldTableFactory;
 import org.apache.qpid.framing.ContentHeaderBody;
 import org.apache.qpid.framing.EncodingUtils;
+import org.apache.qpid.framing.JMSPropertyFieldTable;
+import org.apache.qpid.framing.AMQFrameDecodingException;
 import org.apache.qpid.AMQException;
 import org.apache.log4j.Logger;
 
@@ -39,7 +41,7 @@
 
     public static final String MIME_TYPE = "jms/map-message";
 
-    private PropertyFieldTable _map;
+    private JMSPropertyFieldTable _properties;
 
     JMSMapMessage() throws JMSException
     {
@@ -49,10 +51,9 @@
     JMSMapMessage(ByteBuffer data) throws JMSException
     {
         super(data); // this instantiates a content header
-        _map = new PropertyFieldTable();
+        _properties = new JMSPropertyFieldTable();
     }
 
-
     JMSMapMessage(long messageNbr, ContentHeaderBody contentHeader, ByteBuffer data)
             throws AMQException
     {
@@ -62,19 +63,33 @@
         {
 
             long tableSize = EncodingUtils.readInteger(_data);
-            _map = (PropertyFieldTable) FieldTableFactory.newFieldTable(_data, tableSize);
-
+            try
+            {
+                _properties = new JMSPropertyFieldTable(_data, tableSize);
+            }
+            catch (JMSException e)
+            {
+                Exception error = e.getLinkedException();
+                if (error instanceof AMQFrameDecodingException)
+                {
+                    throw(AMQFrameDecodingException) error;
+                }
+                else
+                {
+                    throw new AMQException(e.getMessage(), e);
+                }
+            }
         }
         else
         {
-            _map = (PropertyFieldTable) FieldTableFactory.newFieldTable();
+            _properties = new JMSPropertyFieldTable();
         }
     }
 
 
     public String toBodyString() throws JMSException
     {
-        return "MapSize:" + _map.getEncodedSize() + "\nMapData:\n" + _map.toString();
+        return _properties.toString();
     }
 
     public String getMimeType()
@@ -82,85 +97,43 @@
         return MIME_TYPE;
     }
 
-    // MapMessage  Interface
 
-    public boolean getBoolean(String string) throws JMSException
+    public ByteBuffer getData()
     {
-        Boolean b = _map.getBoolean(string);
-
-        if (b == null)
-        {
-            if (_map.containsKey(string))
-            {
-                Object str = _map.getObject(string);
+        //What if _data is null?
+        _properties.writeToBuffer(_data);
+        return super.getData();
+    }
 
-                if (str == null || !(str instanceof String))
-                {
-                    throw new MessageFormatException("getBoolean can't use " + string + " item.");
-                }
-                else
-                {
-                    return Boolean.valueOf((String) str);
-                }
-            }
-            else
-            {
-                b = Boolean.valueOf(null);
-            }
-        }
+    @Override
+    public void clearBodyImpl() throws JMSException
+    {
+        super.clearBodyImpl();
+        _properties.clear();
+    }
 
-        return b;
+    public boolean getBoolean(String string) throws JMSException
+    {
+        return _properties.getBoolean(string);
     }
 
     public byte getByte(String string) throws JMSException
     {
-        Byte b = _map.getByte(string);
-        if (b == null)
-        {
-            if (_map.containsKey(string))
-            {
-                Object str = _map.getObject(string);
-
-                if (str == null || !(str instanceof String))
-                {
-                    throw new MessageFormatException("getByte can't use " + string + " item.");
-                }
-                else
-                {
-                    return Byte.valueOf((String) str);
-                }
-            }
-            else
-            {
-                b = Byte.valueOf(null);
-            }
-        }
-
-        return b;
+        return _properties.getByte(string);
     }
 
     public short getShort(String string) throws JMSException
     {
-        {
-            Short s = _map.getShort(string);
-
-            if (s == null)
-            {
-                s = Short.valueOf(getByte(string));
-            }
-
-            return s;
-        }
+        return _properties.getShort(string);
     }
 
     public char getChar(String string) throws JMSException
     {
-
-        Character result = _map.getCharacter(string);
+    	Character result = _properties.getCharacter(string);
 
         if (result == null)
         {
-            throw new MessageFormatException("getChar couldn't find " + string + " item.");
+            throw new NullPointerException("getChar couldn't find " + string + " item.");
         }
         else
         {
@@ -170,179 +143,97 @@
 
     public int getInt(String string) throws JMSException
     {
-        Integer i = _map.getInteger(string);
-
-        if (i == null)
-        {
-            i = Integer.valueOf(getShort(string));
-        }
-
-        return i;
+        return _properties.getInteger(string);
     }
 
     public long getLong(String string) throws JMSException
     {
-
-        Long l = _map.getLong(string);
-
-        if (l == null)
-        {
-            l = Long.valueOf(getInt(string));
-        }
-
-        return l;
-
+        return _properties.getLong(string);
     }
 
     public float getFloat(String string) throws JMSException
     {
-
-        Float f = _map.getFloat(string);
-
-        if (f == null)
-        {
-            if (_map.containsKey(string))
-            {
-                Object str = _map.getObject(string);
-
-                if (str == null || !(str instanceof String))
-                {
-                    throw new MessageFormatException("getFloat can't use " + string + " item.");
-                }
-                else
-                {
-                    return Float.valueOf((String) str);
-                }
-            }
-            else
-            {
-                f = Float.valueOf(null);
-            }
-
-        }
-
-        return f;
-
+        return _properties.getFloat(string);
     }
 
     public double getDouble(String string) throws JMSException
     {
-        Double d = _map.getDouble(string);
-
-        if (d == null)
-        {
-            d = Double.valueOf(getFloat(string));
-        }
-
-        return d;
+        return _properties.getDouble(string);
     }
 
     public String getString(String string) throws JMSException
     {
-        String s = _map.getString(string);
-
-        if (s == null)
-        {
-            if (_map.containsKey(string))
-            {
-                Object o = _map.getObject(string);
-                if (o instanceof byte[])
-                {
-                    throw new MessageFormatException("getObject couldn't find " + string + " item.");
-                }
-                else
-                {
-                    if (o == null)
-                    {
-                        return null;
-                    }
-                    else
-                    {
-                        s = String.valueOf(o);
-                    }
-                }
-            }
-        }
-
-        return s;
+        return _properties.getString(string);
     }
 
     public byte[] getBytes(String string) throws JMSException
     {
-
-        byte[] result = _map.getBytes(string);
-
-        if (result == null)
-        {
-            throw new MessageFormatException("getBytes couldn't find " + string + " item.");
-        }
-
-        return result;
-
+        return _properties.getBytes(string);
     }
 
     public Object getObject(String string) throws JMSException
     {
-        return _map.getObject(string);
+        return _properties.getObject(string);
     }
 
     public Enumeration getMapNames() throws JMSException
     {
-        return _map.getPropertyNames();
+        return _properties.getMapNames();
     }
 
+
     public void setBoolean(String string, boolean b) throws JMSException
     {
         checkWritable();
-        _map.setBoolean(string, b);
+        _properties.setBoolean(string, b);
     }
 
     public void setByte(String string, byte b) throws JMSException
     {
         checkWritable();
-        _map.setByte(string, b);
+        _properties.setByte(string, b);
     }
 
     public void setShort(String string, short i) throws JMSException
     {
         checkWritable();
-        _map.setShort(string, i);
+        _properties.setShort(string, i);
     }
 
     public void setChar(String string, char c) throws JMSException
     {
         checkWritable();
-        _map.setChar(string, c);
+        _properties.setChar(string, c);
     }
 
     public void setInt(String string, int i) throws JMSException
     {
         checkWritable();
-        _map.setInteger(string, i);
+        _properties.setInteger(string, i);
     }
 
     public void setLong(String string, long l) throws JMSException
     {
         checkWritable();
-        _map.setLong(string, l);
+        _properties.setLong(string, l);
     }
 
     public void setFloat(String string, float v) throws JMSException
     {
         checkWritable();
-        _map.setFloat(string, v);
+        _properties.setFloat(string, v);
     }
 
     public void setDouble(String string, double v) throws JMSException
     {
         checkWritable();
-        _map.setDouble(string, v);
+        _properties.setDouble(string, v);
     }
 
     public void setString(String string, String string1) throws JMSException
     {
         checkWritable();
-        _map.setString(string, string1);
+        _properties.setString(string, string1);
     }
 
     public void setBytes(String string, byte[] bytes) throws JMSException
@@ -353,25 +244,18 @@
     public void setBytes(String string, byte[] bytes, int i, int i1) throws JMSException
     {
         checkWritable();
-        _map.setBytes(string, bytes, i, i1);
+        _properties.setBytes(string, bytes, i, i1);
     }
 
     public void setObject(String string, Object object) throws JMSException
     {
         checkWritable();
-        _map.setObject(string, object);
+        _properties.setObject(string, object);
     }
 
     public boolean itemExists(String string) throws JMSException
     {
-        return _map.itemExists(string);
-    }
-
-    public ByteBuffer getData()
-    {
-        //What if _data is null?
-        _map.writeToBuffer(_data);
-        return super.getData();
+        return _properties.itemExists(string);
     }
 
 }

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSObjectMessage.java Thu Dec 21 13:08:38 2006
@@ -72,6 +72,7 @@
             _data.release();
         }
         _data = null;
+
     }
 
     public String toBodyString() throws JMSException
@@ -97,6 +98,7 @@
         {
             _data.rewind();
         }
+
         try
         {
             ObjectOutputStream out = new ObjectOutputStream(_data.asOutputStream());
@@ -108,6 +110,7 @@
         {
             throw new MessageFormatException("Message not serializable: " + e);
         }
+
     }
 
     public Serializable getObject() throws JMSException
@@ -120,15 +123,18 @@
 
         try
         {
+        	_data.rewind();
             in = new ObjectInputStream(_data.asInputStream());
             return (Serializable) in.readObject();
         }
         catch (IOException e)
-        {
-            throw new MessageFormatException("Could not deserialize message: " + e);
+        {           
+           e.printStackTrace();
+           throw new MessageFormatException("Could not deserialize message: " + e);
         }
         catch (ClassNotFoundException e)
         {
+        	e.printStackTrace();
             throw new MessageFormatException("Could not deserialize message: " + e);
         }
         finally

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/message/JMSStreamMessage.java Thu Dec 21 13:08:38 2006
@@ -226,6 +226,10 @@
         byte wireType = readWireType();
         try
         {
+        	if(wireType == NULL_STRING_TYPE){
+        		throw new NullPointerException();
+        	}
+        	
             if (wireType != CHAR_TYPE)
             {
                 _data.position(position);
@@ -428,7 +432,7 @@
                     break;
                 case NULL_STRING_TYPE:
                     result = null;
-                    break;
+                    throw new NullPointerException("data is null");
                 case BOOLEAN_TYPE:
                     checkAvailable(1);
                     result = String.valueOf(readBooleanImpl());

Modified: incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?view=diff&rev=489461&r1=489460&r2=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/new_persistence/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Thu Dec 21 13:08:38 2006
@@ -7,9 +7,9 @@
  * to you under the Apache License, Version 2.0 (the
  * "License"); you may not use this file except in compliance
  * with the License.  You may obtain a copy of the License at
- * 
+ *
  *   http://www.apache.org/licenses/LICENSE-2.0
- * 
+ *
  * Unless required by applicable law or agreed to in writing,
  * software distributed under the License is distributed on an
  * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
@@ -104,6 +104,8 @@
         frame2handlerMap.put(BasicDeliverBody.class, BasicDeliverMethodHandler.getInstance());
         frame2handlerMap.put(BasicReturnBody.class, BasicReturnMethodHandler.getInstance());
         frame2handlerMap.put(ChannelFlowOkBody.class, ChannelFlowOkMethodHandler.getInstance());
+        frame2handlerMap.put(QueueDeleteOkBody.class, QueueDeleteOkMethodHandler.getInstance());
+        frame2handlerMap.put(ExchangeBoundOkBody.class, ExchangeBoundOkMethodHandler.getInstance());
         _state2HandlersMap.put(AMQState.CONNECTION_OPEN, frame2handlerMap);
     }
 

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,110 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.config.ConnectorConfig;
+import org.apache.qpid.config.AbstractConfig;
+import org.apache.qpid.config.Connector;
+
+import javax.jms.Connection;
+
+class Config extends AbstractConfig implements ConnectorConfig
+{
+    private String host = "localhost";
+    private int port = 5672;
+    private String factory;
+    private boolean echo;
+    private int batch = 100;
+    private boolean persistent = true;
+
+    Config(String[] argv)
+    {
+        setOptions(argv);
+    }
+
+    Connection createConnection() throws Exception
+    {
+        return new Connector().createConnection(this);
+    }
+
+    public boolean isEchoOn()
+    {
+        return echo;
+    }
+
+    public boolean usePersistentMessages()
+    {
+        return persistent;
+    }
+
+    public int getBatchSize()
+    {
+        return batch;
+    }
+
+    public String getHost()
+    {
+        return host;
+    }
+
+    public int getPort()
+    {
+        return port;
+    }
+
+    public String getFactory()
+    {
+        return factory;
+    }
+
+    public void setOption(String key, String value)
+    {
+        if("-host".equalsIgnoreCase(key))
+        {
+            host = value;
+        }
+        else if("-port".equalsIgnoreCase(key))
+        {
+            port = parseInt("Bad port number", value);
+        }
+        else if("-factory".equalsIgnoreCase(key))
+        {
+            factory = value;
+        }
+        else if("-echo".equalsIgnoreCase(key))
+        {
+            echo = "true".equalsIgnoreCase(value);
+        }
+        else if("-persistent".equalsIgnoreCase(key))
+        {
+            persistent = "true".equalsIgnoreCase(value);
+        }
+        else if("-batch".equalsIgnoreCase(key))
+        {
+            batch = parseInt("Bad batch size", value);
+        }
+        else
+        {
+            System.out.println("Ignoring nrecognised option " + key);
+        }
+    }
+
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Config.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import java.util.Arrays;
+
+public class Ping
+{
+    public static void main(String[] argv) throws Exception
+    {
+        Config config = new Config(argv);
+        Connection con = config.createConnection();
+        con.setClientID("ping");
+        new Relay(new AMQQueue("ping"), new AMQQueue("pong"), con,
+                  config.isEchoOn(),
+                  config.getBatchSize(),
+                  config.usePersistentMessages()).start();
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Ping.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,43 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+
+public class Pong
+{
+    public static void main(String[] argv) throws Exception
+    {
+        Config config = new Config(argv);
+        Connection con = config.createConnection();
+        con.setClientID("pong");
+        new Relay(new AMQQueue("pong"), new AMQQueue("ping"), con,
+                  config.isEchoOn(),
+                  config.getBatchSize(),
+                  config.usePersistentMessages()).start();
+
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Pong.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,127 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQSession;
+
+import javax.jms.MessageProducer;
+import javax.jms.MessageConsumer;
+import javax.jms.Session;
+import javax.jms.Destination;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.DeliveryMode;
+
+class Relay implements Runnable
+{
+    private final Connection _con;
+    private final Session _session;
+    private final MessageConsumer _src;
+    private final MessageProducer _dest;
+    private final int _batch;
+    private final boolean _echo;
+    private int _counter;
+    private long start;
+    private boolean _running;
+
+    Relay(Destination src, Destination dest, Connection con) throws JMSException
+    {
+        this(src, dest, con, false, 100, true);
+    }
+
+    Relay(Destination src, Destination dest, Connection con, boolean echo, int batch, boolean persistent) throws JMSException
+    {
+        _echo = echo;
+        _batch = batch;
+        _con = con;
+        _session = con.createSession(true, AMQSession.NO_ACKNOWLEDGE);
+        _src = _session.createConsumer(src);
+        _dest = _session.createProducer(dest);
+        _dest.setDeliveryMode(persistent ? DeliveryMode.PERSISTENT : DeliveryMode.NON_PERSISTENT);
+
+    }
+
+    public void run()
+    {
+        start = System.currentTimeMillis();
+        try{
+            while(true) relay();
+        }
+        catch(JMSException e)
+        {
+            e.printStackTrace();
+        }
+        try
+        {
+            _session.close();
+        }
+        catch (JMSException e)
+        {
+            e.printStackTrace();
+        }
+    }
+
+    void relay() throws JMSException
+    {
+        _dest.send(relay(_src.receive()));
+        _session.commit();
+    }
+
+    Message relay(Message in) throws JMSException
+    {
+        if(!_running)
+        {
+            System.out.println(_con.getClientID() + " started.");
+            _running = true;
+        }
+        if(++_counter % _batch == 0)
+        {
+            long time = System.currentTimeMillis() - start;
+            System.out.println(_batch + " iterations performed in " + time + " ms");
+            try
+            {
+                Thread.sleep(100);
+            }
+            catch (InterruptedException e)
+            {
+                e.printStackTrace();
+            }
+            start = System.currentTimeMillis();
+        }
+        if(_echo)
+        {
+            System.out.println("Received: " + ((TextMessage) in).getText());
+        }
+        return _session.createTextMessage(_con.getClientID() + _counter);
+    }
+
+    void start() throws InterruptedException, JMSException
+    {
+        Thread runner = new Thread(this);
+        runner.start();
+        _con.start();
+        System.out.println(_con.getClientID() + " waiting...");
+        runner.join();
+        _con.close();
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Relay.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.transacted;
+
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.AMQQueue;
+
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Session;
+
+public class Start
+{
+    public static void main(String[] argv) throws Exception
+    {
+        Connection con = new Config(argv).createConnection();
+        AMQQueue ping = new AMQQueue("ping");
+        Session session = con.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        session.createProducer(ping).send(session.createTextMessage("start"));
+        session.close();
+        con.close();
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/transacted/Start.java
------------------------------------------------------------------------------
    svn:eol-style = native

Added: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java?view=auto&rev=489461
==============================================================================
--- incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java (added)
+++ incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java Thu Dec 21 13:08:38 2006
@@ -0,0 +1,151 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ * 
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ * 
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.weblogic;
+
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.AMQDestination;
+
+import javax.jms.*;
+import javax.naming.InitialContext;
+import javax.naming.NamingException;
+import javax.naming.Context;
+import java.net.InetAddress;
+import java.util.Hashtable;
+
+public class ServiceProvider
+{
+    private static final String JNDI_FACTORY = "weblogic.jndi.WLInitialContextFactory";
+    private static final String JMS_FACTORY = "transientJMSConnectionFactory";
+
+    private static final Logger _logger = Logger.getLogger(ServiceProvider.class);
+
+    private static MessageProducer _destinationProducer;
+
+    private static Queue _destinationQ;
+
+    public static void main(String[] args)
+    {
+        _logger.info("Starting...");
+
+        if (args.length != 2)
+        {
+            System.out.println("Usage: <WLS URI> <service queue>");
+            System.exit(1);
+        }
+        try
+        {
+            String url = args[0];
+            String receiveQueue = args[1];
+
+            final InitialContext ctx = getInitialContext(url);
+
+            QueueConnectionFactory qconFactory = (QueueConnectionFactory) ctx.lookup(JMS_FACTORY);
+            QueueConnection qcon = qconFactory.createQueueConnection();
+            final QueueSession qsession = qcon.createQueueSession(false, Session.CLIENT_ACKNOWLEDGE);
+            Queue receiveQ = (Queue) ctx.lookup(receiveQueue);
+
+            _logger.info("Service (queue) name is '" + receiveQ + "'...");
+
+            String selector = (args.length > 2 && args[2] != null && args[2].length() > 1) ? args[2] : null;
+
+            _logger.info("Message selector is <" + selector + ">...");
+
+            MessageConsumer consumer = qsession.createConsumer(receiveQ, selector);
+
+            consumer.setMessageListener(new MessageListener()
+            {
+                private int _messageCount;
+
+                public void onMessage(javax.jms.Message message)
+                {
+                    //_logger.info("Got message '" + message + "'");
+
+                    TextMessage tm = (TextMessage) message;
+
+                    try
+                    {
+                        Queue responseQueue = (Queue)tm.getJMSReplyTo();
+                        if (!responseQueue.equals(_destinationQ))
+                        {
+                            _destinationQ = responseQueue;
+                            _logger.info("Creating destination for " + responseQueue);
+
+                            try
+                            {
+                                _destinationProducer = qsession.createProducer(_destinationQ);
+                                _destinationProducer.setDeliveryMode(DeliveryMode.NON_PERSISTENT);
+                            }
+                            catch (JMSException e)
+                            {
+                                // TODO Auto-generated catch block
+                                e.printStackTrace();
+                            }
+                        }
+                        _messageCount++;
+                        if (_messageCount % 1000 == 0)
+                        {
+                            _logger.info("Received message total: " + _messageCount);
+                            _logger.info("Sending response to '" + responseQueue + "'");
+                        }
+
+                        String payload = "This is a response: sing together: 'Mahnah mahnah...'" + tm.getText();
+                        TextMessage msg = qsession.createTextMessage(payload);
+                        if (tm.propertyExists("timeSent"))
+                        {
+                            _logger.info("timeSent property set on message");
+                            final long timeSent = tm.getLongProperty("timeSent");
+                            msg.setLongProperty("timeSent", timeSent);
+                            _logger.info("time taken to go from service request to provider is: " + (System.currentTimeMillis() - timeSent));
+                        }
+                        _destinationProducer.send(msg);
+                        if (_messageCount % 1000 == 0)
+                        {
+                            tm.acknowledge();
+                            _logger.info("Sent response to '" + responseQueue + "'");
+                        }
+                    }
+                    catch (JMSException e)
+                    {
+                        _logger.error("Error sending message: " + e, e);
+                    }
+                }
+            });
+            qcon.start();
+        }
+        catch (Throwable t)
+        {
+            System.err.println("Fatal error: " + t);
+            t.printStackTrace();
+        }
+
+
+        System.out.println("Waiting...");
+    }
+
+    private static InitialContext getInitialContext(String url) throws NamingException
+    {
+        Hashtable env = new Hashtable();
+        env.put(Context.INITIAL_CONTEXT_FACTORY, JNDI_FACTORY);
+        env.put(Context.PROVIDER_URL, url);
+        return new InitialContext(env);
+    }
+}

Propchange: incubator/qpid/branches/new_persistence/java/client/src/old_test/java/org/apache/qpid/weblogic/ServiceProvider.java
------------------------------------------------------------------------------
    svn:eol-style = native