You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/03/06 15:12:49 UTC

svn commit: r515127 [2/2] - in /incubator/qpid/trunk/qpid/java: broker/src/main/java/org/apache/qpid/server/ broker/src/main/java/org/apache/qpid/server/ack/ broker/src/main/java/org/apache/qpid/server/handler/ broker/src/main/java/org/apache/qpid/serv...

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Mar  6 06:12:47 2007
@@ -198,9 +198,10 @@
 
     private final Object _suspensionLock = new Object();
 
-
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */
 
+    private static final Logger _dispatcherLogger = Logger.getLogger(Dispatcher.class);
+
     private class Dispatcher extends Thread
     {
 
@@ -212,12 +213,37 @@
         public Dispatcher()
         {
             super("Dispatcher-Channel-" + _channelId);
+            if (_dispatcherLogger.isInfoEnabled())
+            {
+                _dispatcherLogger.info(getName() + " created");
+            }
         }
 
         public void run()
         {
+            if (_dispatcherLogger.isInfoEnabled())
+            {
+                _dispatcherLogger.info(getName() + " started");
+            }
+
             UnprocessedMessage message;
 
+            // Allow disptacher to start stopped
+            synchronized (_lock)
+            {
+                while (connectionStopped())
+                {
+                    try
+                    {
+                        _lock.wait();
+                    }
+                    catch (InterruptedException e)
+                    {
+                        // ignore
+                    }
+                }
+            }
+
             try
             {
                 while (!_closed.get() && (message = (UnprocessedMessage) _queue.take()) != null)
@@ -243,10 +269,12 @@
             }
             catch (InterruptedException e)
             {
-                ;
+                //ignore
+            }
+            if (_dispatcherLogger.isInfoEnabled())
+            {
+                _dispatcherLogger.info(getName() + " thread terminating for channel " + _channelId);
             }
-
-            _logger.info("Dispatcher thread terminating for channel " + _channelId);
         }
 
         // only call while holding lock
@@ -263,6 +291,12 @@
                 currently = _connectionStopped;
                 _connectionStopped = connectionStopped;
                 _lock.notify();
+
+                if (_dispatcherLogger.isDebugEnabled())
+                {
+                    _dispatcherLogger.debug("Dispatcher Connection " + (connectionStopped ? "Started" : "Stopped") +
+                                            ": Currently " + (currently ? "Started" : "Stopped"));
+                }
             }
             return currently;
         }
@@ -275,9 +309,14 @@
 
                 if (consumer == null)
                 {
-                    _logger.warn("Received a message from queue " + message.getDeliverBody().consumerTag + " without a handler - ignoring...");
-                    _logger.warn("Consumers that exist: " + _consumers);
-                    _logger.warn("Session hashcode: " + System.identityHashCode(this));
+                    if (_dispatcherLogger.isInfoEnabled())
+                    {
+                        _dispatcherLogger.info("Received a message(" + System.identityHashCode(message) + ")" +
+                                               "[" + message.getDeliverBody().deliveryTag + "] from queue "
+                                               + message.getDeliverBody().consumerTag + " without a handler - rejecting(requeue)...");
+                    }
+
+                    rejectMessage(message, true);
                 }
                 else
                 {
@@ -311,7 +350,7 @@
 
                 rejectAllMessages(true);
 
-                _logger.debug("Session Pre Dispatch Queue cleared");
+                _dispatcherLogger.debug("Session Pre Dispatch Queue cleared");
 
                 for (BasicMessageConsumer consumer : _consumers.values())
                 {
@@ -323,20 +362,28 @@
 
         }
 
-        public void rejectPending(AMQShortString consumerTag)
+        public void rejectPending(BasicMessageConsumer consumer)
         {
             synchronized (_lock)
             {
-                boolean stopped = connectionStopped();
+                boolean stopped = _dispatcher.connectionStopped();
 
-                _dispatcher.setConnectionStopped(false);
-
-                rejectMessagesForConsumerTag(consumerTag, true);
-
-                if (stopped)
+                if (!stopped)
                 {
-                    _dispatcher.setConnectionStopped(stopped);
+                    _dispatcher.setConnectionStopped(true);
                 }
+
+                // Reject messages on pre-receive queue
+                consumer.rollback();
+
+                // Reject messages on pre-dispatch queue
+                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
+
+                // Remove consumer from map.
+                deregisterConsumer(consumer);
+
+                _dispatcher.setConnectionStopped(stopped);
+
             }
         }
     }
@@ -549,14 +596,15 @@
                     suspendChannel(true);
                 }
 
-                _connection.getProtocolHandler().syncWrite(
-                        TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
-
                 if (_dispatcher != null)
                 {
                     _dispatcher.rollback();
                 }
 
+                _connection.getProtocolHandler().syncWrite(
+                        TxRollbackBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
+
+
                 if (!isSuspended)
                 {
                     suspendChannel(false);
@@ -663,14 +711,10 @@
                 jmse = e;
             }
         }
-        finally
+        if (jmse != null)
         {
-            if (jmse != null)
-            {
-                throw jmse;
-            }
+            throw jmse;
         }
-
     }
 
 
@@ -835,6 +879,11 @@
                 consumer.clearUnackedMessages();
             }
 
+            if (_dispatcher != null)
+            {
+                _dispatcher.rollback();
+            }
+
             // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
             // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
             // Be aware of possible changes to parameter order as versions change.
@@ -844,11 +893,6 @@
                                                                                        false)    // requeue
                     , BasicRecoverOkBody.class);
 
-            if (_dispatcher != null)
-            {
-                _dispatcher.rollback();
-            }
-
             if (!isSuspended)
             {
                 suspendChannel(false);
@@ -1223,35 +1267,17 @@
         return (counter != null) && (counter.get() != 0);
     }
 
-
-    public void declareExchange(AMQShortString name, AMQShortString type) throws AMQException
+    public void declareExchange(AMQShortString name, AMQShortString type, boolean nowait) throws AMQException
     {
-        declareExchange(name, type, getProtocolHandler());
+        declareExchange(name, type, getProtocolHandler(), nowait);
     }
 
-    public void declareExchangeSynch(AMQShortString name, AMQShortString type) throws AMQException
+    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
-        // TODO: Be aware of possible changes to parameter order as versions change.
-        AMQFrame frame = ExchangeDeclareBody.createAMQFrame(_channelId,
-                                                            getProtocolMajorVersion(), getProtocolMinorVersion(),    // AMQP version (major, minor)
-                                                            null,    // arguments
-                                                            false,    // autoDelete
-                                                            false,    // durable
-                                                            name,    // exchange
-                                                            false,    // internal
-                                                            false,    // nowait
-                                                            false,    // passive
-                                                            getTicket(),    // ticket
-                                                            type);    // type
-        getProtocolHandler().syncWrite(frame, ExchangeDeclareOkBody.class);
+        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler, nowait);
     }
 
-    private void declareExchange(AMQDestination amqd, AMQProtocolHandler protocolHandler) throws AMQException
-    {
-        declareExchange(amqd.getExchangeName(), amqd.getExchangeClass(), protocolHandler);
-    }
-
-    private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler) throws AMQException
+    private void declareExchange(AMQShortString name, AMQShortString type, AMQProtocolHandler protocolHandler, boolean nowait) throws AMQException
     {
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame exchangeDeclare = ExchangeDeclareBody.createAMQFrame(_channelId,
@@ -1261,7 +1287,7 @@
                                                                       false,    // durable
                                                                       name,    // exchange
                                                                       false,    // internal
-                                                                      false,    // nowait
+                                                                      nowait,    // nowait
                                                                       false,    // passive
                                                                       getTicket(),    // ticket
                                                                       type);    // type
@@ -1874,15 +1900,21 @@
 
     synchronized void startDistpatcherIfNecessary()
     {
+        startDistpatcherIfNecessary(false);
+    }
+
+    synchronized void startDistpatcherIfNecessary(boolean initiallyStopped)
+    {
         if (_dispatcher == null)
         {
             _dispatcher = new Dispatcher();
             _dispatcher.setDaemon(true);
+            _dispatcher.setConnectionStopped(initiallyStopped);
             _dispatcher.start();
         }
         else
         {
-            _dispatcher.setConnectionStopped(false);
+            _dispatcher.setConnectionStopped(initiallyStopped);
         }
     }
 
@@ -1910,7 +1942,7 @@
 
         AMQProtocolHandler protocolHandler = getProtocolHandler();
 
-        declareExchange(amqd, protocolHandler);
+        declareExchange(amqd, protocolHandler, false);
 
         AMQShortString queueName = declareQueue(amqd, protocolHandler);
 
@@ -1950,12 +1982,6 @@
                     _destinationConsumerCount.remove(dest);
                 }
             }
-
-            //ensure we remove the messages from the consumer even if the dispatcher hasn't started
-            if (_dispatcher == null)
-            {
-                rejectMessagesForConsumerTag(consumer.getConsumerTag(), true);
-            }// if the dispatcher is running we have to do the clean up in the Ok Handler.
         }
     }
 
@@ -2033,6 +2059,8 @@
 
     public void confirmConsumerCancelled(AMQShortString consumerTag)
     {
+
+        // Remove the consumer from the map
         BasicMessageConsumer consumer = (BasicMessageConsumer) _consumers.get(consumerTag);
         if (consumer != null)
         {
@@ -2040,26 +2068,33 @@
             {
                 consumer.closeWhenNoMessages(true);
             }
+
+            //Clean the Maps up first
+            //Flush any pending messages for this consumerTag
+            if (_dispatcher != null)
+            {
+                _logger.info("Dispatcher is not null");
+            }
             else
             {
-                consumer.rollback();
+                _logger.info("Dispatcher is null so created stopped dispatcher");
+
+                startDistpatcherIfNecessary(true);
             }
-        }
 
-        //Flush any pending messages for this consumerTag
-        if (_dispatcher != null)
-        {
-            _dispatcher.rejectPending(consumerTag);
+            _dispatcher.rejectPending(consumer);
         }
         else
         {
-            rejectMessagesForConsumerTag(consumerTag, true);
+            _logger.warn("Unable to confirm cancellation of consumer (" + consumerTag + "). Not found in consumer map.");
         }
+
+
     }
 
     /*
-     * I could have combined the last 3 methods, but this way it improves readability
-     */
+    * I could have combined the last 3 methods, but this way it improves readability
+    */
     private AMQTopic checkValidTopic(Topic topic) throws JMSException
     {
         if (topic == null)
@@ -2189,16 +2224,20 @@
 
             if (consumerTag == null || message.getDeliverBody().consumerTag.equals(consumerTag))
             {
-                if (_logger.isTraceEnabled())
+                if (_logger.isDebugEnabled())
                 {
-                    _logger.trace("Removing message from _queue:" + message);
+                    _logger.debug("Removing message(" + System.identityHashCode(message) +
+                                  ") from _queue DT:" + message.getDeliverBody().deliveryTag);
                 }
 
                 messages.remove();
 
-                rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+                rejectMessage(message, requeue);
 
-                _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+                if (_logger.isDebugEnabled())
+                {
+                    _logger.debug("Rejected the message(" + message.getDeliverBody() + ") for consumer :" + consumerTag);
+                }
             }
             else
             {
@@ -2207,15 +2246,45 @@
         }
     }
 
+
+    public void rejectMessage(UnprocessedMessage message, boolean requeue)
+    {
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Rejecting Unacked message:" + message.getDeliverBody().deliveryTag);
+        }
+
+        rejectMessage(message.getDeliverBody().deliveryTag, requeue);
+    }
+
+    public void rejectMessage(AbstractJMSMessage message, boolean requeue)
+    {
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Rejecting Abstract message:" + message.getDeliveryTag());
+        }
+        rejectMessage(message.getDeliveryTag(), requeue);
+
+    }
+
     public void rejectMessage(long deliveryTag, boolean requeue)
     {
-        AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
-                                                                  getProtocolMajorVersion(),
-                                                                  getProtocolMinorVersion(),
-                                                                  deliveryTag,
-                                                                  requeue);
+        if (_acknowledgeMode == CLIENT_ACKNOWLEDGE ||
+            _acknowledgeMode == SESSION_TRANSACTED)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Rejecting delivery tag:" + deliveryTag);
+            }
+            AMQFrame basicRejectBody = BasicRejectBody.createAMQFrame(_channelId,
+                                                                      getProtocolMajorVersion(),
+                                                                      getProtocolMinorVersion(),
+                                                                      deliveryTag,
+                                                                      requeue);
 
-        _connection.getProtocolHandler().writeFrame(basicRejectBody);
+            _connection.getProtocolHandler().writeFrame(basicRejectBody);
+        }
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Mar  6 06:12:47 2007
@@ -21,6 +21,7 @@
 package org.apache.qpid.client;
 
 import java.util.Iterator;
+import java.util.List;
 import java.util.concurrent.ArrayBlockingQueue;
 import java.util.concurrent.ConcurrentLinkedQueue;
 import java.util.concurrent.TimeUnit;
@@ -109,9 +110,6 @@
     /** Number of messages unacknowledged in DUPS_OK_ACKNOWLEDGE mode */
     private int _outstanding;
 
-    /** Tag of last message delievered, whoch should be acknowledged on commit in transaction mode. */
-    private long _lastDeliveryTag;
-
     /**
      * Switch to enable sending of acknowledgements when using DUPS_OK_ACKNOWLEDGE mode. Enabled when _outstannding
      * number of msgs >= _prefetchHigh and disabled at < _prefetchLow
@@ -120,6 +118,9 @@
 
     private ConcurrentLinkedQueue<Long> _unacknowledgedDeliveryTags = new ConcurrentLinkedQueue<Long>();
 
+    /** List of tags delievered, The last of which which should be acknowledged on commit in transaction mode. */    
+    private ConcurrentLinkedQueue<Long> _receivedDeliveryTags = new ConcurrentLinkedQueue<Long>();
+
     /**
      * The thread that was used to call receive(). This is important for being able to interrupt that thread if a
      * receive() is in progress.
@@ -432,6 +433,11 @@
 
     public void close(boolean sendClose) throws JMSException
     {
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("Closing consumer:" + debugIdentity());
+        }
+
         synchronized (_connection.getFailoverMutex())
         {
             if (!_closed.getAndSet(true))
@@ -448,6 +454,12 @@
                     try
                     {
                         _protocolHandler.syncWrite(cancelFrame, BasicCancelOkBody.class);
+
+                        if (_logger.isDebugEnabled())
+                        {
+                            _logger.debug("CancelOk'd for consumer:" + debugIdentity());
+                        }
+
                     }
                     catch (AMQException e)
                     {
@@ -456,11 +468,14 @@
                     }
                 }
 
-                deregisterConsumer();
-                _unacknowledgedDeliveryTags.clear();
+                //done in BasicCancelOK Handler
+                //deregisterConsumer();
                 if (_messageListener != null && _receiving.get())
                 {
-                    _logger.info("Interrupting thread: " + _receivingThread);
+                    if (_logger.isInfoEnabled())
+                    {
+                        _logger.info("Interrupting thread: " + _receivingThread);
+                    }
                     _receivingThread.interrupt();
                 }
             }
@@ -616,7 +631,7 @@
                 }
                 else
                 {
-                    _lastDeliveryTag = msg.getDeliveryTag();
+                    _receivedDeliveryTags.add(msg.getDeliveryTag());
                 }
                 break;
         }
@@ -625,10 +640,16 @@
     /** Acknowledge up to last message delivered (if any). Used when commiting. */
     void acknowledgeLastDelivered()
     {
-        if (_lastDeliveryTag > 0)
+        if (!_receivedDeliveryTags.isEmpty())
         {
-            _session.acknowledgeMessage(_lastDeliveryTag, true);
-            _lastDeliveryTag = -1;
+            long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+            while (!_receivedDeliveryTags.isEmpty())
+            {
+                lastDeliveryTag = _receivedDeliveryTags.poll();
+            }
+
+            _session.acknowledgeMessage(lastDeliveryTag, true);
         }
     }
 
@@ -738,43 +759,76 @@
 
     public void rollback()
     {
+        clearUnackedMessages();
+
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Rejecting received messages");
+        }
 
+        //rollback received but not committed messages
+        while (!_receivedDeliveryTags.isEmpty())
+        {
+            Long tag = _receivedDeliveryTags.poll();
+
+            if (tag != null)
+            {
+                if (_logger.isTraceEnabled())
+                {
+                    _logger.trace("Rejecting tag from _receivedDTs:" + tag);
+                }
+
+                _session.rejectMessage(tag, true);
+            }
+        }
+
+        //rollback pending messages
         if (_synchronousQueue.size() > 0)
         {
             if (_logger.isDebugEnabled())
             {
-                _logger.debug("Rejecting the messages for consumer with tag:" + _consumerTag);
+                _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ")" +
+                              "for consumer with tag:" + _consumerTag);
             }
             Iterator iterator = _synchronousQueue.iterator();
+
             while (iterator.hasNext())
             {
-                Object o = iterator.next();
 
+                Object o = iterator.next();
                 if (o instanceof AbstractJMSMessage)
                 {
-                    _session.rejectMessage(((AbstractJMSMessage) o).getDeliveryTag(), true);
+                    _session.rejectMessage(((AbstractJMSMessage) o), true);
 
                     if (_logger.isTraceEnabled())
                     {
-                        _logger.trace("Rejected message" + o);
-                        iterator.remove();
+                        _logger.trace("Rejected message:" + ((AbstractJMSMessage) o).getDeliveryTag());
                     }
+                    iterator.remove();
 
                 }
                 else
                 {
                     _logger.error("Queue contained a :" + o.getClass() +
                                   " unable to reject as it is not an AbstractJMSMessage. Will be cleared");
+                    iterator.remove();
                 }
             }
 
             if (_synchronousQueue.size() != 0)
             {
                 _logger.warn("Queue was not empty after rejecting all messages Remaining:" + _synchronousQueue.size());
+                rollback();
             }
 
             _synchronousQueue.clear();
         }
+    }
+
+
+    public String debugIdentity()
+    {
+        return String.valueOf(_consumerTag);
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/handler/BasicCancelOkMethodHandler.java Tue Mar  6 06:12:47 2007
@@ -28,27 +28,29 @@
 import org.apache.qpid.framing.BasicCancelOkBody;
 import org.apache.qpid.protocol.AMQMethodEvent;
 
-/**
- * @author Apache Software Foundation
- */
 public class BasicCancelOkMethodHandler implements StateAwareMethodListener
 {
-     private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
-     private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
+    private static final Logger _logger = Logger.getLogger(BasicCancelOkMethodHandler.class);
+    private static final BasicCancelOkMethodHandler _instance = new BasicCancelOkMethodHandler();
 
-     public static BasicCancelOkMethodHandler getInstance()
-     {
-         return _instance;
-     }
+    public static BasicCancelOkMethodHandler getInstance()
+    {
+        return _instance;
+    }
 
-     private BasicCancelOkMethodHandler()
-     {
-     }
+    private BasicCancelOkMethodHandler()
+    {
+    }
 
-     public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
-     {
-         _logger.debug("New BasicCancelOk method received");
-         BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
-         protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);                  
-     }
+    public void methodReceived(AMQStateManager stateManager, AMQProtocolSession protocolSession, AMQMethodEvent evt) throws AMQException
+    {
+        BasicCancelOkBody body = (BasicCancelOkBody) evt.getMethod();
+
+        if (_logger.isInfoEnabled())
+        {
+            _logger.info("New BasicCancelOk method received for consumer:" + body.consumerTag);
+        }
+
+        protocolSession.confirmConsumerCancelled(evt.getChannelId(), body.consumerTag);
+    }
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/message/UnprocessedMessage.java Tue Mar  6 06:12:47 2007
@@ -30,13 +30,11 @@
 import org.apache.qpid.framing.ContentHeaderBody;
 
 /**
- * This class contains everything needed to process a JMS message. It assembles the
- * deliver body, the content header and the content body/ies.
- *
- * Note that the actual work of creating a JMS message for the client code's use is done
- * outside of the MINA dispatcher thread in order to minimise the amount of work done in
- * the MINA dispatcher thread.
+ * This class contains everything needed to process a JMS message. It assembles the deliver body, the content header and
+ * the content body/ies.
  *
+ * Note that the actual work of creating a JMS message for the client code's use is done outside of the MINA dispatcher
+ * thread in order to minimise the amount of work done in the MINA dispatcher thread.
  */
 public class UnprocessedMessage
 {
@@ -47,9 +45,7 @@
     private final int _channelId;
     private ContentHeaderBody _contentHeader;
 
-    /**
-     * List of ContentBody instances. Due to fragmentation you don't know how big this will be in general
-     */
+    /** List of ContentBody instances. Due to fragmentation you don't know how big this will be in general */
     private List<ContentBody> _bodies;
 
     public UnprocessedMessage(int channelId, BasicDeliverBody deliverBody)
@@ -74,9 +70,9 @@
         {
             final long payloadSize = body.payload.remaining();
 
-            if(_bodies == null)
+            if (_bodies == null)
             {
-                if(payloadSize == getContentHeader().bodySize)
+                if (payloadSize == getContentHeader().bodySize)
                 {
                     _bodies = Collections.singletonList(body);
                 }

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java Tue Mar  6 06:12:47 2007
@@ -58,6 +58,7 @@
                 {
                     _logger.debug("State " + _state + " not achieved so waiting...");
                     _monitor.wait(TIME_OUT);
+                    //fixme this won't cause the timeout to exit the loop. need to set _throwable
                 }
                 catch (InterruptedException e)
                 {

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java Tue Mar  6 06:12:47 2007
@@ -20,9 +20,6 @@
  */
 package org.apache.qpid.client.util;
 
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.client.message.UnprocessedMessage;
-import org.apache.log4j.Logger;
 
 import java.util.concurrent.BlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/ack/RecoverTest.java Tue Mar  6 06:12:47 2007
@@ -73,7 +73,8 @@
         Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(),new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
         //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+        //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+        // This is the default now
 
         AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
         Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);
@@ -130,7 +131,8 @@
         Queue queue = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("someQ"), new AMQShortString("someQ"), false, true);
         MessageConsumer consumer = consumerSession.createConsumer(queue);
         //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+        //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
+        // This is the default now
 
         AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
         Session producerSession = con2.createSession(false, Session.CLIENT_ACKNOWLEDGE);

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java Tue Mar  6 06:12:47 2007
@@ -109,6 +109,10 @@
             }
             catch (AMQException e)
             {
+                if (_logger.isInfoEnabled())
+                {
+                    _logger.info("Exception occured was:" + e.getErrorCode());
+                }
                 assertEquals("Connection should be closed", AMQConstant.CHANNEL_ERROR, e.getErrorCode());
 
                 _connection = newConnection();
@@ -315,15 +319,15 @@
         }
         catch (JMSException e)
         {
-            fail("Creating new connection when:"+e.getMessage());
+            fail("Creating new connection when:" + e.getMessage());
         }
         catch (AMQException e)
         {
-            fail("Creating new connection when:"+e.getMessage());
+            fail("Creating new connection when:" + e.getMessage());
         }
         catch (URLSyntaxException e)
         {
-            fail("Creating new connection when:"+e.getMessage());
+            fail("Creating new connection when:" + e.getMessage());
         }
 
 

Added: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java?view=auto&rev=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java Tue Mar  6 06:12:47 2007
@@ -0,0 +1,603 @@
+/*
+ *  Licensed to the Apache Software Foundation (ASF) under one
+ *  or more contributor license agreements.  See the NOTICE file
+ *  distributed with this work for additional information
+ *  regarding copyright ownership.  The ASF licenses this file
+ *  to you under the Apache License, Version 2.0 (the
+ *  "License"); you may not use this file except in compliance
+ *  with the License.  You may obtain a copy of the License at
+ *
+ *    http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *  Unless required by applicable law or agreed to in writing,
+ *  software distributed under the License is distributed on an
+ *  "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ *  KIND, either express or implied.  See the License for the
+ *  specific language governing permissions and limitations
+ *  under the License.
+ *
+ *
+ */
+package org.apache.qpid.test.unit.close;
+
+import junit.framework.TestCase;
+
+import java.util.concurrent.atomic.AtomicInteger;
+
+
+import javax.jms.ExceptionListener;
+import javax.jms.Session;
+import javax.jms.Connection;
+import javax.jms.JMSException;
+import javax.jms.Queue;
+import javax.jms.MessageProducer;
+import javax.jms.Message;
+import javax.jms.TextMessage;
+import javax.jms.MessageConsumer;
+
+import org.apache.qpid.client.AMQConnectionFactory;
+import org.apache.qpid.client.AMQConnectionURL;
+import org.apache.qpid.client.AMQConnection;
+import org.apache.qpid.client.message.AbstractJMSMessage;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpid.AMQException;
+import org.apache.log4j.Logger;
+import org.apache.log4j.Level;
+
+public class MessageRequeueTest extends TestCase
+{
+
+    private static final Logger _logger = Logger.getLogger(MessageRequeueTest.class);
+
+    protected static AtomicInteger consumerIds = new AtomicInteger(0);
+    protected final Integer numTestMessages = 150;
+
+    protected final int consumeTimeout = 3000;
+
+    protected final String queue = "direct://amq.direct//queue";
+    protected String payload = "Message:";
+
+    protected final String BROKER = "vm://:1";
+    private boolean testReception = true;
+
+    private long[] receieved = new long[numTestMessages + 1];
+    private boolean passed=false;
+
+    protected void setUp() throws Exception
+    {
+        super.setUp();
+        TransportConnection.createVMBroker(1);
+
+        QpidClientConnection conn = new QpidClientConnection();
+
+        conn.connect();
+        // clear queue
+        conn.consume(queue, consumeTimeout);
+        // load test data
+        _logger.info("creating test data, " + numTestMessages + " messages");
+        conn.put(queue, payload, numTestMessages);
+        // close this connection
+        conn.disconnect();
+    }
+
+    protected void tearDown() throws Exception
+    {
+        super.tearDown();
+
+        if (!passed)
+        {
+            QpidClientConnection conn = new QpidClientConnection();
+
+            conn.connect();
+            // clear queue
+            conn.consume(queue, consumeTimeout);
+        }
+        TransportConnection.killVMBroker(1);
+    }
+
+    /** multiple consumers */
+    public void testDrain() throws JMSException, InterruptedException
+    {
+        QpidClientConnection conn = new QpidClientConnection();
+
+        conn.connect();
+
+        _logger.info("consuming queue " + queue);
+        Queue q = conn.getSession().createQueue(queue);
+
+        final MessageConsumer consumer = conn.getSession().createConsumer(q);
+        int messagesReceived = 0;
+
+        long messageLog[] = new long[numTestMessages + 1];
+
+        _logger.info("consuming...");
+        Message msg = consumer.receive(1000);
+        while (msg != null)
+        {
+            messagesReceived++;
+
+            long dt = ((AbstractJMSMessage) msg).getDeliveryTag();
+
+            int msgindex = msg.getIntProperty("index");
+            if (messageLog[msgindex] != 0)
+            {
+                _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) msg).getDeliveryTag() +
+                              ") more than once.");
+            }
+
+            if (_logger.isInfoEnabled())
+            {
+                _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+                             "DT:" + dt +
+                             "IN:" + msgindex);
+            }
+
+            if (dt == 0)
+            {
+                _logger.error("DT is zero for msg:" + msgindex);
+            }
+
+            messageLog[msgindex] = dt;
+
+            //get Next message
+            msg = consumer.receive(1000);
+        }
+
+        conn.getSession().commit();
+        consumer.close();
+        assertEquals("number of consumed messages does not match initial data", (int) numTestMessages, messagesReceived);
+
+        int index = 0;
+        StringBuilder list = new StringBuilder();
+        list.append("Failed to receive:");
+        int failed = 0;
+
+        for (long b : messageLog)
+        {
+            if (b == 0 && index != 0) //delivery tag of zero shouldn't exist
+            {
+                _logger.error("Index: " + index + " was not received.");
+                list.append(" ");
+                list.append(index);
+                list.append(":");
+                list.append(b);
+                failed++;
+            }
+
+            index++;
+        }
+        assertEquals(list.toString(), 0, failed);
+        _logger.info("consumed: " + messagesReceived);
+        conn.disconnect();
+    }
+
+    /** multiple consumers */
+    public void testTwoCompetingConsumers()
+    {
+        Consumer c1 = new Consumer();
+        Consumer c2 = new Consumer();
+        Consumer c3 = new Consumer();
+        Consumer c4 = new Consumer();
+
+        Thread t1 = new Thread(c1);
+        Thread t2 = new Thread(c2);
+        Thread t3 = new Thread(c3);
+        Thread t4 = new Thread(c4);
+
+        t1.start();
+//        t2.start();
+//        t3.start();
+//        t4.start();
+
+        try
+        {
+            t1.join();
+            t2.join();
+            t3.join();
+            t4.join();
+        }
+        catch (InterruptedException e)
+        {
+            fail("Uanble to join to Consumer theads");
+        }
+
+        _logger.info("consumer 1 count is " + c1.getCount());
+        _logger.info("consumer 2 count is " + c2.getCount());
+        _logger.info("consumer 3 count is " + c3.getCount());
+        _logger.info("consumer 4 count is " + c4.getCount());
+
+        Integer totalConsumed = c1.getCount() + c2.getCount() + c3.getCount() + c4.getCount();
+
+        // Check all messages were correctly delivered
+        int index = 0;
+        StringBuilder list = new StringBuilder();
+        list.append("Failed to receive:");
+        int failed = 0;
+
+        for (long b : receieved)
+        {
+            if (b == 0 && index != 0) //delivery tag of zero shouldn't exist (and we don't have msg 0)
+            {
+                _logger.error("Index: " + index + " was not received.");
+                list.append(" ");
+                list.append(index);
+                list.append(":");
+                list.append(b);
+                failed++;
+            }
+            index++;
+        }
+        assertEquals(list.toString() + "-" + numTestMessages + "-" + totalConsumed, 0, failed);
+        assertEquals("number of consumed messages does not match initial data", numTestMessages, totalConsumed);
+        passed=true;
+    }
+
+    class Consumer implements Runnable
+    {
+        private Integer count = 0;
+        private Integer id;
+
+        public Consumer()
+        {
+            id = consumerIds.addAndGet(1);
+        }
+
+        public void run()
+        {
+            try
+            {
+                _logger.info("consumer-" + id + ": starting");
+                QpidClientConnection conn = new QpidClientConnection();
+
+                conn.connect();
+
+                _logger.info("consumer-" + id + ": connected, consuming...");
+                Message result;
+                do
+                {
+                    result = conn.getNextMessage(queue, consumeTimeout);
+                    if (result != null)
+                    {
+
+                        long dt = ((AbstractJMSMessage) result).getDeliveryTag();
+
+                        if (testReception)
+                        {
+                            int msgindex = result.getIntProperty("index");
+                            if (receieved[msgindex] != 0)
+                            {
+                                _logger.error("Received Message(" + msgindex + ":" + ((AbstractJMSMessage) result).getDeliveryTag() +
+                                              ") more than once.");
+                            }
+
+                            if (_logger.isInfoEnabled())
+                            {
+                                _logger.info("Received Message(" + System.identityHashCode(msgindex) + ") " +
+                                             "DT:" + dt +
+                                             "IN:" + msgindex);
+                            }
+
+                            if (dt == 0)
+                            {
+                                _logger.error("DT is zero for msg:" + msgindex);
+                            }
+
+                            receieved[msgindex] = dt;
+                        }
+
+
+                        count++;
+                        if (count % 100 == 0)
+                        {
+                            _logger.info("consumer-" + id + ": got " + result + ", new count is " + count);
+                        }
+                    }
+                }
+                while (result != null);
+
+                _logger.info("consumer-" + id + ": complete");
+                conn.disconnect();
+
+            }
+            catch (Exception e)
+            {
+                e.printStackTrace();
+            }
+        }
+
+        public Integer getCount()
+        {
+            return count;
+        }
+
+        public Integer getId()
+        {
+            return id;
+        }
+    }
+
+
+    public class QpidClientConnection implements ExceptionListener
+    {
+        private boolean transacted = true;
+        private int ackMode = Session.CLIENT_ACKNOWLEDGE;
+        private Connection connection;
+
+        private String virtualHost;
+        private String brokerlist;
+        private int prefetch;
+        protected Session session;
+        protected boolean connected;
+
+        public QpidClientConnection()
+        {
+            super();
+            setVirtualHost("/test");
+            setBrokerList(BROKER);
+            setPrefetch(5000);
+        }
+
+
+        public void connect() throws JMSException
+        {
+            if (!connected)
+            {
+                /*
+                * amqp://[user:pass@][clientid]/virtualhost?
+                * brokerlist='[transport://]host[:port][?option='value'[&option='value']];'
+                * [&failover='method[?option='value'[&option='value']]']
+                * [&option='value']"
+                */
+                String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+                try
+                {
+                    AMQConnectionFactory factory = new AMQConnectionFactory(new AMQConnectionURL(brokerUrl));
+                    _logger.info("connecting to Qpid :" + brokerUrl);
+                    connection = factory.createConnection();
+
+                    // register exception listener
+                    connection.setExceptionListener(this);
+
+                    session = ((AMQConnection) connection).createSession(transacted, ackMode, prefetch);
+
+
+                    _logger.info("starting connection");
+                    connection.start();
+
+                    connected = true;
+                }
+                catch (URLSyntaxException e)
+                {
+                    throw new JMSException("URL syntax error in [" + brokerUrl + "]: " + e.getMessage());
+                }
+            }
+        }
+
+        public void disconnect() throws JMSException
+        {
+            if (connected)
+            {
+                session.commit();
+                session.close();
+                connection.close();
+                connected = false;
+                _logger.info("disconnected");
+            }
+        }
+
+        public void disconnectWithoutCommit() throws JMSException
+        {
+            if (connected)
+            {
+                session.close();
+                connection.close();
+                connected = false;
+                _logger.info("disconnected without commit");
+            }
+        }
+
+        public String getBrokerList()
+        {
+            return brokerlist;
+        }
+
+        public void setBrokerList(String brokerlist)
+        {
+            this.brokerlist = brokerlist;
+        }
+
+        public String getVirtualHost()
+        {
+            return virtualHost;
+        }
+
+        public void setVirtualHost(String virtualHost)
+        {
+            this.virtualHost = virtualHost;
+        }
+
+        public void setPrefetch(int prefetch)
+        {
+            this.prefetch = prefetch;
+        }
+
+
+        /** override as necessary */
+        public void onException(JMSException exception)
+        {
+            _logger.info("ExceptionListener event: error " + exception.getErrorCode() + ", message: " + exception.getMessage());
+        }
+
+        public boolean isConnected()
+        {
+            return connected;
+        }
+
+        public Session getSession()
+        {
+            return session;
+        }
+
+        /**
+         * Put a String as a text messages, repeat n times. A null payload will result in a null message.
+         *
+         * @param queueName The queue name to put to
+         * @param payload   the content of the payload
+         * @param copies    the number of messages to put
+         *
+         * @throws javax.jms.JMSException any exception that occurs
+         */
+        public void put(String queueName, String payload, int copies) throws JMSException
+        {
+            if (!connected)
+            {
+                connect();
+            }
+
+            _logger.info("putting to queue " + queueName);
+            Queue queue = session.createQueue(queueName);
+
+            final MessageProducer sender = session.createProducer(queue);
+
+            for (int i = 0; i < copies; i++)
+            {
+                Message m = session.createTextMessage(payload + i);
+                m.setIntProperty("index", i + 1);
+                sender.send(m);
+            }
+
+            session.commit();
+            sender.close();
+            _logger.info("put " + copies + " copies");
+        }
+
+        /**
+         * GET the top message on a queue. Consumes the message. Accepts timeout value.
+         *
+         * @param queueName   The quename to get from
+         * @param readTimeout The timeout to use
+         *
+         * @return the content of the text message if any
+         *
+         * @throws javax.jms.JMSException any exception that occured
+         */
+        public Message getNextMessage(String queueName, long readTimeout) throws JMSException
+        {
+            if (!connected)
+            {
+                connect();
+            }
+
+            Queue queue = session.createQueue(queueName);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+
+            Message message = consumer.receive(readTimeout);
+            session.commit();
+            consumer.close();
+
+            Message result;
+
+            // all messages we consume should be TextMessages
+            if (message instanceof TextMessage)
+            {
+                result = ((TextMessage) message);
+            }
+            else if (null == message)
+            {
+                result = null;
+            }
+            else
+            {
+                _logger.info("warning: received non-text message");
+                result = message;
+            }
+
+            return result;
+        }
+
+        /**
+         * GET the top message on a queue. Consumes the message.
+         *
+         * @param queueName The Queuename to get from
+         *
+         * @return The string content of the text message, if any received
+         *
+         * @throws javax.jms.JMSException any exception that occurs
+         */
+        public Message getNextMessage(String queueName) throws JMSException
+        {
+            return getNextMessage(queueName, 0);
+        }
+
+        /**
+         * Completely clears a queue. For readTimeout behaviour see Javadocs for javax.jms.MessageConsumer.
+         *
+         * @param queueName   The Queue name to consume from
+         * @param readTimeout The timeout for each consume
+         *
+         * @throws javax.jms.JMSException Any exception that occurs during the consume
+         * @throws InterruptedException   If the consume thread was interrupted during a consume.
+         */
+        public void consume(String queueName, int readTimeout) throws JMSException, InterruptedException
+        {
+            if (!connected)
+            {
+                connect();
+            }
+
+            _logger.info("consuming queue " + queueName);
+            Queue queue = session.createQueue(queueName);
+
+            final MessageConsumer consumer = session.createConsumer(queue);
+            int messagesReceived = 0;
+
+            _logger.info("consuming...");
+            while ((consumer.receive(readTimeout)) != null)
+            {
+                messagesReceived++;
+            }
+
+            session.commit();
+            consumer.close();
+            _logger.info("consumed: " + messagesReceived);
+        }
+    }
+
+
+    public void testRequeue() throws JMSException, AMQException, URLSyntaxException
+    {
+        String virtualHost = "/test";
+        String brokerlist = "vm://:1";
+        String brokerUrl = "amqp://guest:guest@" + virtualHost + "?brokerlist='" + brokerlist + "'";
+
+        Connection conn = new AMQConnection(brokerUrl);
+        Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
+        Queue q = session.createQueue(queue);
+
+        _logger.info("Create Consumer");
+        MessageConsumer consumer = session.createConsumer(q);
+
+        try
+        {
+            Thread.sleep(2000);
+        }
+        catch (InterruptedException e)
+        {
+            //
+        }
+
+        _logger.info("Receiving msg");
+        Message msg = consumer.receive();
+
+        assertNotNull("Message should not be null", msg);
+
+        _logger.info("Close Consumer");
+        consumer.close();
+
+        _logger.info("Close Connection");
+        conn.close();
+    }
+
+}
\ No newline at end of file

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/message/StreamMessageTest.java Tue Mar  6 06:12:47 2007
@@ -80,7 +80,8 @@
 
 
         //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        // This is the default now
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
 

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java Tue Mar  6 06:12:47 2007
@@ -43,7 +43,8 @@
 public class CommitRollbackTest extends TestCase
 {
     protected AMQConnection conn;
-    protected final String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+    protected String queue = "direct://amq.direct//Qpid.Client.Transacted.CommitRollback.queue";
+    protected static int testMethod = 0;
     protected String payload = "xyzzy";
     private Session _session;
     private MessageProducer _publisher;
@@ -57,6 +58,11 @@
     {
         super.setUp();
         TransportConnection.createVMBroker(1);
+
+        testMethod++;
+        queue += testMethod;
+
+
         newConnection();
     }
 
@@ -84,7 +90,11 @@
         TransportConnection.killVMBroker(1);
     }
 
-    /** PUT a text message, disconnect before commit, confirm it is gone. */
+    /**
+     * PUT a text message, disconnect before commit, confirm it is gone.
+     *
+     * @throws Exception On error
+     */
     public void testPutThenDisconnect() throws Exception
     {
         assertTrue("session is not transacted", _session.getTransacted());
@@ -109,7 +119,11 @@
         assertNull("test message was put and disconnected before commit, but is still present", result);
     }
 
-    /** PUT a text message, disconnect before commit, confirm it is gone. */
+    /**
+     * PUT a text message, disconnect before commit, confirm it is gone.
+     *
+     * @throws Exception On error
+     */
     public void testPutThenCloseDisconnect() throws Exception
     {
         assertTrue("session is not transacted", _session.getTransacted());
@@ -140,6 +154,8 @@
     /**
      * PUT a text message, rollback, confirm message is gone. The consumer is on the same connection but different
      * session as producer
+     *
+     * @throws Exception On error
      */
     public void testPutThenRollback() throws Exception
     {
@@ -160,7 +176,11 @@
         assertNull("test message was put and rolled back, but is still present", result);
     }
 
-    /** GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection */
+    /**
+     * GET a text message, disconnect before commit, confirm it is still there. The consumer is on a new connection
+     *
+     * @throws Exception On error
+     */
     public void testGetThenDisconnect() throws Exception
     {
         assertTrue("session is not transacted", _session.getTransacted());
@@ -194,6 +214,8 @@
     /**
      * GET a text message, close consumer, disconnect before commit, confirm it is still there. The consumer is on the
      * same connection but different session as producer
+     *
+     * @throws Exception On error
      */
     public void testGetThenCloseDisconnect() throws Exception
     {
@@ -230,6 +252,8 @@
     /**
      * GET a text message, rollback, confirm it is still there. The consumer is on the same connection but differnt
      * session to the producer
+     *
+     * @throws Exception On error
      */
     public void testGetThenRollback() throws Exception
     {
@@ -266,6 +290,8 @@
     /**
      * GET a text message, close message producer, rollback, confirm it is still there. The consumer is on the same
      * connection but different session as producer
+     *
+     * @throws Exception On error
      */
     public void testGetThenCloseRollback() throws Exception
     {
@@ -304,7 +330,11 @@
     }
 
 
-    /** Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order */
+    /**
+     * Test that rolling back a session purges the dispatcher queue, and the messages arrive in the correct order
+     *
+     * @throws Exception On error
+     */
     public void testSend2ThenRollback() throws Exception
     {
         assertTrue("session is not transacted", _session.getTransacted());
@@ -339,37 +369,41 @@
 
     public void testSend2ThenCloseAfter1andTryAgain() throws Exception
     {
-//        assertTrue("session is not transacted", _session.getTransacted());
-//        assertTrue("session is not transacted", _pubSession.getTransacted());
-//
-//        _logger.info("sending two test messages");
-//        _publisher.send(_pubSession.createTextMessage("1"));
-//        _publisher.send(_pubSession.createTextMessage("2"));
-//        _pubSession.commit();
-//
-//        _logger.info("getting test message");
-//        assertEquals("1", ((TextMessage) _consumer.receive(1000)).getText());
-//
-//        _consumer.close();
-//
-//        _consumer = _session.createConsumer(_jmsQueue);
-//
-//        _logger.info("receiving result");
-//        Message result = _consumer.receive(1000);
-//        _logger.error("1:" + result);
-////        assertNotNull("test message was consumed and rolled back, but is gone", result);
-////        assertEquals("1" , ((TextMessage) result).getText());
-////        assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
-//
-//        result = _consumer.receive(1000);
-//        _logger.error("2" + result);
-////        assertNotNull("test message was consumed and rolled back, but is gone", result);
-////        assertEquals("2", ((TextMessage) result).getText());
-////        assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
-//
-//        result = _consumer.receive(1000);
-//        _logger.error("3" + result);
-//        assertNull("test message should be null:" + result, result);
+        assertTrue("session is not transacted", _session.getTransacted());
+        assertTrue("session is not transacted", _pubSession.getTransacted());
+
+        _logger.info("sending two test messages");
+        _publisher.send(_pubSession.createTextMessage("1"));
+        _publisher.send(_pubSession.createTextMessage("2"));
+        _pubSession.commit();
+
+        _logger.info("getting test message");
+        Message result = _consumer.receive(1000);
+
+        assertNotNull("Message received should not be null", result);
+        assertEquals("1", ((TextMessage) result).getText());
+        assertTrue("Messasge is marked as redelivered" + result, !result.getJMSRedelivered());
+
+
+        _logger.info("Closing Consumer");
+        _consumer.close();
+
+        _logger.info("Creating New consumer");
+        _consumer = _session.createConsumer(_jmsQueue);
+
+        _logger.info("receiving result");
+        result = _consumer.receive(1000);
+        assertNotNull("test message was consumed and rolled back, but is gone", result);
+        assertEquals("1", ((TextMessage) result).getText());
+        assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+        result = _consumer.receive(1000);
+        assertNotNull("test message was consumed and rolled back, but is gone", result);
+        assertEquals("2", ((TextMessage) result).getText());
+        assertTrue("Messasge is not marked as redelivered" + result, result.getJMSRedelivered());
+
+        result = _consumer.receive(1000);
+        assertNull("test message should be null:" + result, result);
     }
 
 }

Modified: incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/test/java/org/apache/qpid/test/unit/transacted/TransactedTest.java Tue Mar  6 06:12:47 2007
@@ -62,69 +62,125 @@
     {
         super.setUp();
         TransportConnection.createVMBroker(1);
+        _logger.info("Create Connection");
         con = new AMQConnection("vm://:1", "guest", "guest", "TransactedTest", "test");
+
+        _logger.info("Create Session");
         session = con.createSession(true, Session.SESSION_TRANSACTED);
+        _logger.info("Create Q1");
         queue1 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q1"), new AMQShortString("Q1"), false, true);
+        _logger.info("Create Q2");
         queue2 = new AMQQueue(session.getDefaultQueueExchangeName(), new AMQShortString("Q2"), false);
 
-
+        _logger.info("Create Consumer of Q1");
         consumer1 = session.createConsumer(queue1);
-        //Dummy just to create the queue. 
+        //Dummy just to create the queue.
+        _logger.info("Create Consumer of Q2");
         MessageConsumer consumer2 = session.createConsumer(queue2);
+        _logger.info("Close Consumer of Q2");
         consumer2.close();
+
+        _logger.info("Create producer to Q2");
         producer2 = session.createProducer(queue2);
+
+        _logger.info("Start Connection");
         con.start();
 
+        _logger.info("Create prep connection");
         prepCon = new AMQConnection("vm://:1", "guest", "guest", "PrepConnection", "test");
+
+        _logger.info("Create prep session");
         prepSession = prepCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+
+        _logger.info("Create prep producer to Q1");
         prepProducer1 = prepSession.createProducer(queue1);
+
+        _logger.info("Create prep connection start");
         prepCon.start();
 
-        //add some messages
-        prepProducer1.send(prepSession.createTextMessage("A"));
-        prepProducer1.send(prepSession.createTextMessage("B"));
-        prepProducer1.send(prepSession.createTextMessage("C"));
 
+        _logger.info("Create test connection");
         testCon = new AMQConnection("vm://:1", "guest", "guest", "TestConnection", "test");
+        _logger.info("Create test session");
         testSession = testCon.createSession(false, AMQSession.NO_ACKNOWLEDGE);
+        _logger.info("Create test consumer of q2");
         testConsumer2 = testSession.createConsumer(queue2);
-
     }
 
     protected void tearDown() throws Exception
     {
+        _logger.info("Close connection");
         con.close();
+        _logger.info("Close test connection");
         testCon.close();
+        _logger.info("Close prep connection");
         prepCon.close();
+        _logger.info("Kill broker");
         TransportConnection.killAllVMBrokers();
         super.tearDown();
     }
 
     public void testCommit() throws Exception
     {
+        //add some messages
+        _logger.info("Send prep A");
+        prepProducer1.send(prepSession.createTextMessage("A"));
+        _logger.info("Send prep B");
+        prepProducer1.send(prepSession.createTextMessage("B"));
+        _logger.info("Send prep C");
+        prepProducer1.send(prepSession.createTextMessage("C"));
+
         //send and receive some messages
+        _logger.info("Send X to Q2");
         producer2.send(session.createTextMessage("X"));
+        _logger.info("Send Y to Q2");
         producer2.send(session.createTextMessage("Y"));
+        _logger.info("Send Z to Q2");
         producer2.send(session.createTextMessage("Z"));
+
+
+        _logger.info("Read A from Q1");
         expect("A", consumer1.receive(1000));
+        _logger.info("Read B from Q1");
         expect("B", consumer1.receive(1000));
+        _logger.info("Read C from Q1");
         expect("C", consumer1.receive(1000));
 
         //commit
+        _logger.info("session commit");
         session.commit();
+        _logger.info("Start test Connection");
         testCon.start();
+
         //ensure sent messages can be received and received messages are gone
+        _logger.info("Read X from Q2");
         expect("X", testConsumer2.receive(1000));
+        _logger.info("Read Y from Q2");
         expect("Y", testConsumer2.receive(1000));
+        _logger.info("Read Z from Q2");
         expect("Z", testConsumer2.receive(1000));
 
+        _logger.info("create test session on Q1");
         testConsumer1 = testSession.createConsumer(queue1);
+        _logger.info("Read null from Q1");
         assertTrue(null == testConsumer1.receive(1000));
+        _logger.info("Read null from Q2");
         assertTrue(null == testConsumer2.receive(1000));
     }
 
     public void testRollback() throws Exception
     {
+        //add some messages
+        _logger.info("Send prep A");
+        prepProducer1.send(prepSession.createTextMessage("A"));
+        _logger.info("Send prep B");
+        prepProducer1.send(prepSession.createTextMessage("B"));
+        _logger.info("Send prep C");
+        prepProducer1.send(prepSession.createTextMessage("C"));
+
+        //Quick sleep to ensure all three get pre-fetched
+        Thread.sleep(500);
+
         _logger.info("Sending X Y Z");
         producer2.send(session.createTextMessage("X"));
         producer2.send(session.createTextMessage("Y"));
@@ -140,9 +196,9 @@
 
         _logger.info("Receiving A B C");
         //ensure sent messages are not visible and received messages are requeued
-        expect("A", consumer1.receive(1000));
-        expect("B", consumer1.receive(1000));
-        expect("C", consumer1.receive(1000));
+        expect("A", consumer1.receive(1000), true);
+        expect("B", consumer1.receive(1000), true);
+        expect("C", consumer1.receive(1000), true);
 
         _logger.info("Starting new connection");
         testCon.start();
@@ -152,20 +208,22 @@
         assertTrue(null == testConsumer2.receive(1000));
 
         session.commit();
+
+        _logger.info("Testing we have no messages left after commit");
+        assertTrue(null == testConsumer1.receive(1000));
+        assertTrue(null == testConsumer2.receive(1000));        
     }
 
     public void testResendsMsgsAfterSessionClose() throws Exception
     {
         AMQConnection con = new AMQConnection("vm://:1", "guest", "guest", "consumer1", "test");
 
-        Session consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Session consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
         AMQQueue queue3 = new AMQQueue(consumerSession.getDefaultQueueExchangeName(), new AMQShortString("Q3"), false);
         MessageConsumer consumer = consumerSession.createConsumer(queue3);
-        //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.DIRECT_EXCHANGE_NAME, ExchangeDefaults.DIRECT_EXCHANGE_CLASS);
 
         AMQConnection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "test");
-        Session producerSession = con2.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        Session producerSession = con2.createSession(true, Session.SESSION_TRANSACTED);
         MessageProducer producer = producerSession.createProducer(queue3);
 
         _logger.info("Sending four messages");
@@ -176,65 +234,77 @@
 
         producerSession.commit();
 
-
         _logger.info("Starting connection");
         con.start();
         TextMessage tm = (TextMessage) consumer.receive();
+        assertNotNull(tm);
+        assertEquals("msg1", tm.getText());
 
-        tm.acknowledge();
         consumerSession.commit();
 
-        _logger.info("Received and acknowledged first message");
+        _logger.info("Received and committed first message");
         tm = (TextMessage) consumer.receive(1000);
         assertNotNull(tm);
+        assertEquals("msg2", tm.getText());
+
         tm = (TextMessage) consumer.receive(1000);
         assertNotNull(tm);
+        assertEquals("msg3", tm.getText());
+
         tm = (TextMessage) consumer.receive(1000);
         assertNotNull(tm);
+        assertEquals("msg4", tm.getText());
+
         _logger.info("Received all four messages. Closing connection with three outstanding messages");
 
         consumerSession.close();
 
-        consumerSession = con.createSession(true, Session.CLIENT_ACKNOWLEDGE);
+        consumerSession = con.createSession(true, Session.SESSION_TRANSACTED);
 
         consumer = consumerSession.createConsumer(queue3);
 
         // no ack for last three messages so when I call recover I expect to get three messages back
-
         tm = (TextMessage) consumer.receive(3000);
         assertNotNull(tm);
         assertEquals("msg2", tm.getText());
+        assertTrue("Message is not redelivered", tm.getJMSRedelivered());
 
         tm = (TextMessage) consumer.receive(3000);
         assertNotNull(tm);
         assertEquals("msg3", tm.getText());
+        assertTrue("Message is not redelivered", tm.getJMSRedelivered());
 
         tm = (TextMessage) consumer.receive(3000);
         assertNotNull(tm);
         assertEquals("msg4", tm.getText());
+        assertTrue("Message is not redelivered", tm.getJMSRedelivered());
+
+        _logger.info("Received redelivery of three messages. Committing");
 
-        _logger.info("Received redelivery of three messages. Acknowledging last message");
-        tm.acknowledge();
         consumerSession.commit();
-        _logger.info("Calling acknowledge with no outstanding messages");
-        // all acked so no messages to be delivered
 
+        _logger.info("Called commit");
 
-        tm = (TextMessage) consumer.receiveNoWait();
+        tm = (TextMessage) consumer.receive(1000);
         assertNull(tm);
+
         _logger.info("No messages redelivered as is expected");
 
         con.close();
         con2.close();
-
     }
 
-
     private void expect(String text, Message msg) throws JMSException
     {
+        expect(text, msg, false);
+    }
+
+    private void expect(String text, Message msg, boolean requeued) throws JMSException
+    {
         assertNotNull("Message should not be null", msg);
         assertTrue("Message should be a text message", msg instanceof TextMessage);
         assertEquals("Message content does not match expected", text, ((TextMessage) msg).getText());
+        assertEquals("Message should " + (requeued ? "" : "not") + " be requeued", requeued, msg.getJMSRedelivered());
     }
 
     public static junit.framework.Test suite()

Modified: incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java (original)
+++ incubator/qpid/trunk/qpid/java/cluster/src/test/java/org/apache/qpid/server/cluster/SimpleClusterTest.java Tue Mar  6 06:12:47 2007
@@ -37,7 +37,7 @@
         AMQConnection con = new AMQConnection("localhost:9000", "guest", "guest", "test", "/test");
         AMQSession session = (AMQSession) con.createSession(false, AMQSession.NO_ACKNOWLEDGE);
         System.out.println("Session created");
-        session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"));
+        session.declareExchange(new AMQShortString("my_exchange"), new AMQShortString("direct"), true);
         System.out.println("Exchange declared");
         con.close();
         System.out.println("Connection closed");

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/protocol/AMQConstant.java Tue Mar  6 06:12:47 2007
@@ -100,7 +100,7 @@
 
     public static final AMQConstant RESOURCE_ERROR = new AMQConstant(506, "resource error", true);
 
-    public static final AMQConstant NOT_ALLOWED = new AMQConstant(507, "not allowed", true);
+    public static final AMQConstant NOT_ALLOWED = new AMQConstant(530, "not allowed", true);
 
     public static final AMQConstant NOT_IMPLEMENTED = new AMQConstant(540, "not implemented", true);
 

Modified: incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java (original)
+++ incubator/qpid/trunk/qpid/java/common/src/main/java/org/apache/qpid/util/ConcurrentLinkedMessageQueueAtomicSize.java Tue Mar  6 06:12:47 2007
@@ -41,6 +41,11 @@
         return super.size() + _messageHeadSize.get();
     }
 
+    public int headSize()
+    {
+        return _messageHeadSize.get();
+    }
+
     @Override
     public E poll()
     {
@@ -50,10 +55,14 @@
         }
         else
         {
-            _logger.debug("Providing item from message head");
-
             E e = _messageHead.poll();
 
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Providing item(" + e + ")from message head");
+            }
+
+
             if (e != null)
             {
                 _messageHeadSize.decrementAndGet();
@@ -159,8 +168,12 @@
         }
         else
         {
-            _logger.debug("Providing item from message head");
-            return _messageHead.peek();
+            E o = _messageHead.peek();
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Peeking item (" + o + ") from message head");
+            }
+            return o;
         }
 
     }
@@ -186,7 +199,10 @@
 
     public boolean pushHead(E o)
     {
-        _logger.debug("Adding item to head of queue");
+        if (_logger.isDebugEnabled())
+        {
+            _logger.debug("Adding item(" + o + ") to head of queue");
+        }
         if (_messageHead.offer(o))
         {
             _messageHeadSize.incrementAndGet();

Modified: incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java?view=diff&rev=515127&r1=515126&r2=515127
==============================================================================
--- incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java (original)
+++ incubator/qpid/trunk/qpid/java/systests/src/main/java/org/apache/qpid/server/exchange/ReturnUnroutableMandatoryMessageTest.java Tue Mar  6 06:12:47 2007
@@ -67,7 +67,8 @@
         MessageConsumer consumer = consumerSession.createConsumer(queue, AMQSession.DEFAULT_PREFETCH_LOW_MARK, AMQSession.DEFAULT_PREFETCH_HIGH_MARK, false, false, (String) null, ft);
 
         //force synch to ensure the consumer has resulted in a bound queue
-        ((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        //((AMQSession) consumerSession).declareExchangeSynch(ExchangeDefaults.HEADERS_EXCHANGE_NAME, ExchangeDefaults.HEADERS_EXCHANGE_CLASS);
+        // This is the default now
 
         Connection con2 = new AMQConnection("vm://:1", "guest", "guest", "producer1", "/test");