You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ri...@apache.org on 2007/11/06 12:12:37 UTC

svn commit: r592374 - in /incubator/qpid/branches/M2.1/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/protocol/ client/src/main/java/org/apache/qpid/client/state/ systests/src/main/java/org/apache/qpid/se...

Author: ritchiem
Date: Tue Nov  6 03:12:37 2007
New Revision: 592374

URL: http://svn.apache.org/viewvc?rev=592374&view=rev
Log:
QPID-662 Transactional state not correctly reported after fail over. We now record if we have sent any messages
from here we can check if we have failed over and so have lost messages from the transaction making it invalid.

Added:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java   (with props)
Modified:
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
    incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSession.java Tue Nov  6 03:12:37 2007
@@ -20,11 +20,11 @@
  */
 package org.apache.qpid.client;
 
+import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQInvalidArgumentException;
 import org.apache.qpid.AMQInvalidRoutingKeyException;
 import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQDisconnectedException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverNoopSupport;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
@@ -100,6 +100,7 @@
 import javax.jms.TopicPublisher;
 import javax.jms.TopicSession;
 import javax.jms.TopicSubscriber;
+import javax.jms.TransactionRolledBackException;
 import java.io.Serializable;
 import java.text.MessageFormat;
 import java.util.ArrayList;
@@ -293,6 +294,11 @@
     private final boolean _strictAMQPFATAL;
     private final Object _messageDeliveryLock = new Object();
 
+    /** Session state : used to detect if commit is a) required b) allowed , i.e. does the tx span failover. */
+    private boolean _dirty;
+    /** Has failover occured on this session */
+    private boolean _failedOver;
+
     /**
      * Creates a new session on a connection.
      *
@@ -610,30 +616,65 @@
     {
         checkTransacted();
 
-        try
+        new FailoverNoopSupport<Object, JMSException>(new FailoverProtectedOperation<Object, JMSException>()
         {
-            // Acknowledge up to message last delivered (if any) for each consumer.
-            // need to send ack for messages delivered to consumers so far
-            for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+            public Object execute() throws JMSException, FailoverException
             {
-                // Sends acknowledgement to server
-                i.next().acknowledgeLastDelivered();
-            }
+                //Check that we are clean to commit.
+                if (_failedOver && _dirty)
+                {
+                    rollback();
 
-            // Commits outstanding messages sent and outstanding acknowledgements.
-            final AMQProtocolHandler handler = getProtocolHandler();
+                    throw new TransactionRolledBackException("Connection failover has occured since last send. " +
+                                                             "Forced rollback");
+                }
 
-            handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
-                              TxCommitOkBody.class);
-        }
-        catch (AMQException e)
-        {
-            throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
-        }
-        catch (FailoverException e)
-        {
-            throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
-        }
+                try
+                {
+                    // Acknowledge up to message last delivered (if any) on this session.
+                    // We only need to find the highest value and ack that as commit is session level.
+                    Long lastTag = -1L;
+
+                    for (Iterator<BasicMessageConsumer> i = _consumers.values().iterator(); i.hasNext();)
+                    {
+//                        i.next().acknowledgeLastDelivered();
+//                    }
+
+                        // get next acknowledgement to server
+                        Long next = i.next().getLastDelivered();
+                        if (next != null && next > lastTag)
+                        {
+                            lastTag = next;
+                        }
+                    }
+
+                    if (lastTag != -1)
+                    {
+                        acknowledgeMessage(lastTag, true);
+                    }
+
+                    // Commits outstanding messages sent and outstanding acknowledgements.
+                    final AMQProtocolHandler handler = getProtocolHandler();
+
+                    handler.syncWrite(TxCommitBody.createAMQFrame(_channelId, getProtocolMajorVersion(), getProtocolMinorVersion()),
+                                      TxCommitOkBody.class);
+
+                    markClean();
+                }
+
+                catch (AMQException e)
+                {
+                    throw new JMSAMQException("Failed to commit: " + e.getMessage(), e);
+                }
+
+                catch (FailoverException e)
+                {
+                    throw new JMSAMQException("Fail-over interrupted commit. Status of the commit is uncertain.", e);
+                }
+
+                return null;
+            }
+        }, _connection).execute();
     }
 
     public void confirmConsumerCancelled(AMQShortString consumerTag)
@@ -1431,6 +1472,8 @@
                 _connection.getProtocolHandler().syncWrite(TxRollbackBody.createAMQFrame(_channelId,
                                                                                          getProtocolMajorVersion(), getProtocolMinorVersion()), TxRollbackOkBody.class);
 
+                markClean();
+
                 if (!isSuspended)
                 {
                     suspendChannel(false);
@@ -1731,6 +1774,7 @@
      */
     void resubscribe() throws AMQException
     {
+        _failedOver = true;
         resubscribeProducers();
         resubscribeConsumers();
     }
@@ -2530,6 +2574,41 @@
     Object getMessageDeliveryLock()
     {
         return _messageDeliveryLock;
+    }
+
+    /**
+     * Signifies that the session has pending sends to commit.
+     */
+    public void markDirty()
+    {
+        _dirty = true;
+    }
+
+    /**
+     * Signifies that the session has no pending sends to commit.
+     */
+    public void markClean()
+    {
+        _dirty = false;
+        _failedOver = false;
+    }
+
+    /**
+     * Check to see if failover has occured since the last call to markClean(commit or rollback).
+     * @return boolean true if failover has occured.
+     */
+    public boolean hasFailedOver()
+    {
+        return _failedOver;
+    }
+
+    /**
+     * Check to see if any message have been sent in this transaction and have not been commited.
+     * @return boolean true if a message has been sent but not commited
+     */
+    public boolean isDirty()
+    {
+        return _dirty;
     }
 
     /** Responsible for decoding a message fragment and passing it to the appropriate message consumer. */

Added: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java?rev=592374&view=auto
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java (added)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java Tue Nov  6 03:12:37 2007
@@ -0,0 +1,42 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.protocol.AMQConstant;
+
+/**
+ * AMQSessionDirtyException represents all failures to send data on a transacted session that is
+ * no longer in a state that the client expects. i.e. failover has occured so previously sent messages
+ * will not be part of the transaction.
+ *
+ * <p/><table id="crc"><caption>CRC Card</caption>
+ * <tr><th> Responsibilities <th> Collaborations
+ * <tr><td> Represent attempt to perform additional sends on a dirty session.
+ * </table>
+ */
+public class AMQSessionDirtyException extends AMQException
+{
+    public AMQSessionDirtyException(String msg)
+    {
+        super(AMQConstant.RESOURCE_ERROR, msg);
+    }
+}

Propchange: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/AMQSessionDirtyException.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java Tue Nov  6 03:12:37 2007
@@ -754,6 +754,30 @@
         }
     }
 
+    /**
+     * Acknowledge up to last message delivered (if any). Used when commiting.
+     *
+     * @return the lastDeliveryTag to acknowledge
+     */
+    Long getLastDelivered()
+    {
+        if (!_receivedDeliveryTags.isEmpty())
+        {
+            Long lastDeliveryTag = _receivedDeliveryTags.poll();
+
+            while (!_receivedDeliveryTags.isEmpty())
+            {
+                lastDeliveryTag = _receivedDeliveryTags.poll();
+            }
+
+            assert _receivedDeliveryTags.isEmpty();
+
+            return lastDeliveryTag;
+        }
+
+        return null;
+    }
+
     /** Acknowledge up to last message delivered (if any). Used when commiting. */
     void acknowledgeLastDelivered()
     {
@@ -772,6 +796,7 @@
         }
     }
 
+
     void notifyError(Throwable cause)
     {
         // synchronized (_closed)
@@ -783,7 +808,7 @@
                 if (_closedStack != null)
                 {
                     _logger.trace(_consumerTag + " notifyError():"
-                        + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
+                                  + Arrays.asList(stackTrace).subList(3, stackTrace.length - 1));
                     _logger.trace(_consumerTag + " previously" + _closedStack.toString());
                 }
                 else
@@ -904,7 +929,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Rejecting the messages(" + _receivedDeliveryTags.size() + ") in _receivedDTs (RQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                              + "for consumer with tag:" + _consumerTag);
             }
 
             Long tag = _receivedDeliveryTags.poll();
@@ -934,7 +959,7 @@
             if (_logger.isDebugEnabled())
             {
                 _logger.debug("Rejecting the messages(" + _synchronousQueue.size() + ") in _syncQueue (PRQ)"
-                    + "for consumer with tag:" + _consumerTag);
+                              + "for consumer with tag:" + _consumerTag);
             }
 
             Iterator iterator = _synchronousQueue.iterator();

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/BasicMessageProducer.java Tue Nov  6 03:12:37 2007
@@ -60,46 +60,30 @@
 
     private AMQConnection _connection;
 
-    /**
-     * If true, messages will not get a timestamp.
-     */
+    /** If true, messages will not get a timestamp. */
     private boolean _disableTimestamps;
 
-    /**
-     * Priority of messages created by this producer.
-     */
+    /** Priority of messages created by this producer. */
     private int _messagePriority;
 
-    /**
-     * Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution.
-     */
+    /** Time to live of messages. Specified in milliseconds but AMQ has 1 second resolution. */
     private long _timeToLive;
 
-    /**
-     * Delivery mode used for this producer.
-     */
+    /** Delivery mode used for this producer. */
     private int _deliveryMode = DeliveryMode.PERSISTENT;
 
-    /**
-     * The Destination used for this consumer, if specified upon creation.
-     */
+    /** The Destination used for this consumer, if specified upon creation. */
     protected AMQDestination _destination;
 
-    /**
-     * Default encoding used for messages produced by this producer.
-     */
+    /** Default encoding used for messages produced by this producer. */
     private String _encoding;
 
-    /**
-     * Default encoding used for message produced by this producer.
-     */
+    /** Default encoding used for message produced by this producer. */
     private String _mimeType;
 
     private AMQProtocolHandler _protocolHandler;
 
-    /**
-     * True if this producer was created from a transacted session
-     */
+    /** True if this producer was created from a transacted session */
     private boolean _transacted;
 
     private int _channelId;
@@ -112,9 +96,7 @@
      */
     private long _producerId;
 
-    /**
-     * The session used to create this producer
-     */
+    /** The session used to create this producer */
     private AMQSession _session;
 
     private final boolean _immediate;
@@ -128,8 +110,8 @@
     private static final ContentBody[] NO_CONTENT_BODIES = new ContentBody[0];
 
     protected BasicMessageProducer(AMQConnection connection, AMQDestination destination, boolean transacted, int channelId,
-        AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
-        boolean waitUntilSent)
+                                   AMQSession session, AMQProtocolHandler protocolHandler, long producerId, boolean immediate, boolean mandatory,
+                                   boolean waitUntilSent)
     {
         _connection = connection;
         _destination = destination;
@@ -162,16 +144,16 @@
         // Note that the durable and internal arguments are ignored since passive is set to false
         // TODO: Be aware of possible changes to parameter order as versions change.
         AMQFrame declare =
-            ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), null, // arguments
-                false, // autoDelete
-                false, // durable
-                destination.getExchangeName(), // exchange
-                false, // internal
-                true, // nowait
-                false, // passive
-                _session.getTicket(), // ticket
-                destination.getExchangeClass()); // type
+                ExchangeDeclareBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                                                   _protocolHandler.getProtocolMinorVersion(), null, // arguments
+                                                   false, // autoDelete
+                                                   false, // durable
+                                                   destination.getExchangeName(), // exchange
+                                                   false, // internal
+                                                   true, // nowait
+                                                   false, // passive
+                                                   _session.getTicket(), // ticket
+                                                   destination.getExchangeClass()); // type
         _protocolHandler.writeFrame(declare);
     }
 
@@ -208,7 +190,7 @@
         if ((i != DeliveryMode.NON_PERSISTENT) && (i != DeliveryMode.PERSISTENT))
         {
             throw new JMSException("DeliveryMode must be either NON_PERSISTENT or PERSISTENT. Value of " + i
-                + " is illegal");
+                                   + " is illegal");
         }
 
         _deliveryMode = i;
@@ -320,12 +302,12 @@
         {
             validateDestination(destination);
             sendImpl((AMQDestination) destination, message, _deliveryMode, _messagePriority, _timeToLive, _mandatory,
-                _immediate);
+                     _immediate);
         }
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive)
-        throws JMSException
+            throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -337,7 +319,7 @@
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory) throws JMSException
+                     boolean mandatory) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -349,7 +331,7 @@
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate) throws JMSException
+                     boolean mandatory, boolean immediate) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -361,7 +343,7 @@
     }
 
     public void send(Destination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
+                     boolean mandatory, boolean immediate, boolean waitUntilSent) throws JMSException
     {
         checkPreConditions();
         checkDestination(destination);
@@ -369,7 +351,7 @@
         {
             validateDestination(destination);
             sendImpl((AMQDestination) destination, message, deliveryMode, priority, timeToLive, mandatory, immediate,
-                waitUntilSent);
+                     waitUntilSent);
         }
     }
 
@@ -415,7 +397,7 @@
             else
             {
                 throw new JMSException("Unable to send message, due to class conversion error: "
-                    + message.getClass().getName());
+                                       + message.getClass().getName());
             }
         }
     }
@@ -425,14 +407,14 @@
         if (!(destination instanceof AMQDestination))
         {
             throw new JMSException("Unsupported destination class: "
-                + ((destination != null) ? destination.getClass() : null));
+                                   + ((destination != null) ? destination.getClass() : null));
         }
 
         declareDestination((AMQDestination) destination);
     }
 
     protected void sendImpl(AMQDestination destination, Message message, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate) throws JMSException
+                            boolean mandatory, boolean immediate) throws JMSException
     {
         sendImpl(destination, message, deliveryMode, priority, timeToLive, mandatory, immediate, _waitUntilSent);
     }
@@ -447,16 +429,27 @@
      * @param timeToLive
      * @param mandatory
      * @param immediate
+     *
      * @throws JMSException
      */
     protected void sendImpl(AMQDestination destination, Message origMessage, int deliveryMode, int priority, long timeToLive,
-        boolean mandatory, boolean immediate, boolean wait) throws JMSException
+                            boolean mandatory, boolean immediate, boolean wait) throws JMSException
     {
         checkTemporaryDestination(destination);
         origMessage.setJMSDestination(destination);
 
         AbstractJMSMessage message = convertToNativeMessage(origMessage);
 
+        if (_transacted)
+        {
+            if (_session.hasFailedOver() && _session.isDirty())
+            {
+                throw new JMSAMQException("Failover has occurred and session is dirty so unable to send.",
+                                          new AMQSessionDirtyException("Failover has occurred and session is dirty " +
+                                                                       "so unable to send."));
+            }
+        }
+
         if (_disableMessageId)
         {
             message.setJMSMessageID(null);
@@ -489,12 +482,12 @@
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         AMQFrame publishFrame =
-            BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
-                _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
-                immediate, // immediate
-                mandatory, // mandatory
-                destination.getRoutingKey(), // routingKey
-                _session.getTicket()); // ticket
+                BasicPublishBody.createAMQFrame(_channelId, _protocolHandler.getProtocolMajorVersion(),
+                                                _protocolHandler.getProtocolMinorVersion(), destination.getExchangeName(), // exchange
+                                                immediate, // immediate
+                                                mandatory, // mandatory
+                                                destination.getRoutingKey(), // routingKey
+                                                _session.getTicket()); // ticket
 
         message.prepareForSending();
         ByteBuffer payload = message.getData();
@@ -536,9 +529,9 @@
         // AMQP version change: Hardwire the version to 0-8 (major=8, minor=0)
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         AMQFrame contentHeaderFrame =
-            ContentHeaderBody.createAMQFrame(_channelId,
-                BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
-                    _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
+                ContentHeaderBody.createAMQFrame(_channelId,
+                                                 BasicConsumeBody.getClazz(_protocolHandler.getProtocolMajorVersion(),
+                                                                           _protocolHandler.getProtocolMinorVersion()), 0, contentHeaderProperties, size);
         if (_logger.isDebugEnabled())
         {
             _logger.debug("Sending content header frame to " + destination);
@@ -558,6 +551,11 @@
             origMessage.setJMSExpiration(message.getJMSExpiration());
             origMessage.setJMSMessageID(message.getJMSMessageID());
         }
+
+        if (_transacted)
+        {
+            _session.markDirty();
+        }
     }
 
     private void checkTemporaryDestination(AMQDestination destination) throws JMSException
@@ -669,7 +667,7 @@
         if ((_destination != null) && (suppliedDestination != null))
         {
             throw new UnsupportedOperationException(
-                "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
+                    "This message producer was created with a Destination, therefore you cannot use an unidentified Destination");
         }
 
         if (suppliedDestination == null)

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java Tue Nov  6 03:12:37 2007
@@ -104,23 +104,22 @@
  * <p/><table id="crc"><caption>CRC Card</caption>
  * <tr><th> Responsibilities <th> Collaborations
  * <tr><td> Create the filter chain to filter this handlers events.
- *     <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
+ * <td> {@link ProtocolCodecFilter}, {@link SSLContextFactory}, {@link SSLFilter}, {@link ReadWriteThreadModel}.
  *
  * <tr><td> Maintain fail-over state.
  * <tr><td>
  * </table>
  *
  * @todo Explain the system property: amqj.shared_read_write_pool. How does putting the protocol codec filter before the
- *       async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
- *       anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
- *       filter before it mean not doing the read/write asynchronously but in the main filter thread?
- *
+ * async write filter make it a shared pool? The pooling filter uses the same thread pool for reading and writing
+ * anyway, see {@link org.apache.qpid.pool.PoolingFilter}, docs for comments. Will putting the protocol codec
+ * filter before it mean not doing the read/write asynchronously but in the main filter thread?
  * @todo Use a single handler instance, by shifting everything to do with the 'protocol session' state, including
- *       failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
- *       AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
- *       be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
- *       held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
- *       that lifecycles of the fields match lifecycles of their containing objects.
+ * failover state, into AMQProtocolSession, and tracking that from AMQConnection? The lifecycles of
+ * AMQProtocolSesssion and AMQConnection will be the same, so if there is high cohesion between them, they could
+ * be merged, although there is sense in keeping the session model seperate. Will clarify things by having data
+ * held per protocol handler, per protocol session, per network connection, per channel, in seperate classes, so
+ * that lifecycles of the fields match lifecycles of their containing objects.
  */
 public class AMQProtocolHandler extends IoHandlerAdapter
 {
@@ -200,7 +199,7 @@
         {
             SSLConfiguration sslConfig = _connection.getSSLConfiguration();
             SSLContextFactory sslFactory =
-                new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
+                    new SSLContextFactory(sslConfig.getKeystorePath(), sslConfig.getKeystorePassword(), sslConfig.getCertType());
             SSLFilter sslFilter = new SSLFilter(sslFactory.buildClientContext());
             sslFilter.setUseClientMode(true);
             session.getFilterChain().addBefore("protocolFilter", "ssl", sslFilter);
@@ -235,7 +234,7 @@
      * @param session The MINA session.
      *
      * @todo Clarify: presumably exceptionCaught is called when the client is sending during a connection failure and
-     *       not otherwise? The above comment doesn't make that clear.
+     * not otherwise? The above comment doesn't make that clear.
      */
     public void sessionClosed(IoSession session)
     {
@@ -413,74 +412,74 @@
 
         switch (bodyFrame.getFrameType())
         {
-        case AMQMethodBody.TYPE:
+            case AMQMethodBody.TYPE:
 
-            if (debug)
-            {
-                _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
-            }
+                if (debug)
+                {
+                    _logger.debug("(" + System.identityHashCode(this) + ")Method frame received: " + frame);
+                }
 
-            final AMQMethodEvent<AMQMethodBody> evt =
-                new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
+                final AMQMethodEvent<AMQMethodBody> evt =
+                        new AMQMethodEvent<AMQMethodBody>(frame.getChannel(), (AMQMethodBody) bodyFrame);
 
-            try
-            {
-
-                boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
-                if (!_frameListeners.isEmpty())
+                try
                 {
-                    Iterator it = _frameListeners.iterator();
-                    while (it.hasNext())
+
+                    boolean wasAnyoneInterested = getStateManager().methodReceived(evt);
+                    if (!_frameListeners.isEmpty())
                     {
-                        final AMQMethodListener listener = (AMQMethodListener) it.next();
-                        wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                        Iterator it = _frameListeners.iterator();
+                        while (it.hasNext())
+                        {
+                            final AMQMethodListener listener = (AMQMethodListener) it.next();
+                            wasAnyoneInterested = listener.methodReceived(evt) || wasAnyoneInterested;
+                        }
                     }
-                }
 
-                if (!wasAnyoneInterested)
-                {
-                    throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
-                        + _frameListeners);
+                    if (!wasAnyoneInterested)
+                    {
+                        throw new AMQException("AMQMethodEvent " + evt + " was not processed by any listener.  Listeners:"
+                                               + _frameListeners);
+                    }
                 }
-            }
-            catch (AMQException e)
-            {
-                getStateManager().error(e);
-                if (!_frameListeners.isEmpty())
+                catch (AMQException e)
                 {
-                    Iterator it = _frameListeners.iterator();
-                    while (it.hasNext())
+                    getStateManager().error(e);
+                    if (!_frameListeners.isEmpty())
                     {
-                        final AMQMethodListener listener = (AMQMethodListener) it.next();
-                        listener.error(e);
+                        Iterator it = _frameListeners.iterator();
+                        while (it.hasNext())
+                        {
+                            final AMQMethodListener listener = (AMQMethodListener) it.next();
+                            listener.error(e);
+                        }
                     }
-                }
 
-                exceptionCaught(session, e);
-            }
+                    exceptionCaught(session, e);
+                }
 
-            break;
+                break;
 
-        case ContentHeaderBody.TYPE:
+            case ContentHeaderBody.TYPE:
 
-            _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
-            break;
+                _protocolSession.messageContentHeaderReceived(frame.getChannel(), (ContentHeaderBody) bodyFrame);
+                break;
 
-        case ContentBody.TYPE:
+            case ContentBody.TYPE:
 
-            _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
-            break;
+                _protocolSession.messageContentBodyReceived(frame.getChannel(), (ContentBody) bodyFrame);
+                break;
 
-        case HeartbeatBody.TYPE:
+            case HeartbeatBody.TYPE:
 
-            if (debug)
-            {
-                _logger.debug("Received heartbeat");
-            }
+                if (debug)
+                {
+                    _logger.debug("Received heartbeat");
+                }
 
-            break;
+                break;
 
-        default:
+            default:
 
         }
 
@@ -491,6 +490,8 @@
 
     public void messageSent(IoSession session, Object message) throws Exception
     {
+//        System.err.println("Sent PS:" + System.identityHashCode(_protocolSession) + ":" + message);
+
         final long sentMessages = _messagesOut++;
 
         final boolean debug = _logger.isDebugEnabled();
@@ -547,7 +548,7 @@
      * @param listener the blocking listener. Note the calling thread will block.
      */
     public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener)
-        throws AMQException, FailoverException
+            throws AMQException, FailoverException
     {
         return writeCommandFrameAndWaitForReply(frame, listener, DEFAULT_SYNC_TIMEOUT);
     }
@@ -560,7 +561,7 @@
      * @param listener the blocking listener. Note the calling thread will block.
      */
     public AMQMethodEvent writeCommandFrameAndWaitForReply(AMQFrame frame, BlockingMethodFrameListener listener,
-        long timeout) throws AMQException, FailoverException
+                                                           long timeout) throws AMQException, FailoverException
     {
         try
         {
@@ -570,8 +571,8 @@
             AMQMethodEvent e = listener.blockForFrame(timeout);
 
             return e;
-                // When control resumes before this line, a reply will have been received
-                // that matches the criteria defined in the blocking listener
+            // When control resumes before this line, a reply will have been received
+            // that matches the criteria defined in the blocking listener
         }
         catch (AMQException e)
         {
@@ -595,7 +596,7 @@
     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws AMQException, FailoverException
     {
         return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
-                timeout);
+                                                timeout);
     }
 
     public void closeSession(AMQSession session) throws AMQException
@@ -621,12 +622,12 @@
         // TODO: Connect this to the session version obtained from ProtocolInitiation for this session.
         // Be aware of possible changes to parameter order as versions change.
         final AMQFrame frame =
-            ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
-                _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
-                0, // classId
-                0, // methodId
-                AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
-                new AMQShortString("JMS client is closing the connection.")); // replyText
+                ConnectionCloseBody.createAMQFrame(0, _protocolSession.getProtocolMajorVersion(),
+                                                   _protocolSession.getProtocolMinorVersion(), // AMQP version (major, minor)
+                                                   0, // classId
+                                                   0, // methodId
+                                                   AMQConstant.REPLY_SUCCESS.getCode(), // replyCode
+                                                   new AMQShortString("JMS client is closing the connection.")); // replyText
 
         try
         {

Modified: incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java (original)
+++ incubator/qpid/branches/M2.1/java/client/src/main/java/org/apache/qpid/client/state/AMQStateManager.java Tue Nov  6 03:12:37 2007
@@ -255,9 +255,9 @@
             if (_currentState != s)
             {
                 _logger.warn("State not achieved within permitted time.  Current state " + _currentState
-                    + ", desired state: " + s);
+                             + ", desired state: " + s);
                 throw new AMQException("State not achieved within permitted time.  Current state " + _currentState
-                    + ", desired state: " + s);
+                                       + ", desired state: " + s);
             }
         }
 

Modified: incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java?rev=592374&r1=592373&r2=592374&view=diff
==============================================================================
--- incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java (original)
+++ incubator/qpid/branches/M2.1/java/systests/src/main/java/org/apache/qpid/server/txn/TxnTest.java Tue Nov  6 03:12:37 2007
@@ -22,23 +22,26 @@
 package org.apache.qpid.server.txn;
 
 import junit.framework.TestCase;
-import junit.framework.Assert;
+import org.apache.log4j.Logger;
+import org.apache.qpid.client.AMQSessionDirtyException;
 import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.client.vmbroker.AMQVMBrokerCreationException;
 import org.apache.qpid.jndi.PropertiesFileInitialContextFactory;
-import org.apache.log4j.Logger;
+import org.apache.qpid.server.registry.ApplicationRegistry;
 
+import javax.jms.Connection;
+import javax.jms.ConnectionFactory;
 import javax.jms.JMSException;
-import javax.jms.Session;
+import javax.jms.Message;
 import javax.jms.MessageConsumer;
+import javax.jms.MessageListener;
 import javax.jms.MessageProducer;
 import javax.jms.Queue;
-import javax.jms.ConnectionFactory;
-import javax.jms.Connection;
-import javax.jms.Message;
+import javax.jms.Session;
 import javax.jms.TextMessage;
-import javax.jms.MessageListener;
-import javax.naming.spi.InitialContextFactory;
+import javax.jms.TransactionRolledBackException;
 import javax.naming.Context;
+import javax.naming.spi.InitialContextFactory;
 import java.util.Hashtable;
 import java.util.concurrent.CountDownLatch;
 
@@ -49,7 +52,8 @@
     private static final Logger _logger = Logger.getLogger(TxnTest.class);
 
 
-    protected final String BROKER = "vm://:1";//"localhost";
+    //Set retries quite high to ensure that it continues to retry whilst the InVM broker is restarted.
+    protected final String BROKER = "vm://:1?retries='1000'";
     protected final String VHOST = "/test";
     protected final String QUEUE = "TxnTestQueue";
 
@@ -75,7 +79,11 @@
         Hashtable<String, String> env = new Hashtable<String, String>();
 
         env.put("connectionfactory.connection", "amqp://guest:guest@TTL_TEST_ID" + VHOST + "?brokerlist='" + BROKER + "'");
-        env.put("queue.queue", QUEUE);
+
+        // Ensure that the queue is unique for each test run.
+        // There appears to be other old sesssion/consumers when looping the tests this means that sometimes a message
+        // will disappear. When it has actually gone to the old client.
+        env.put("queue.queue", QUEUE + "-" + System.currentTimeMillis());
 
         _context = factory.getInitialContext(env);
 
@@ -109,7 +117,7 @@
         {
             _producerConnection.close();
         }
-        
+
         super.tearDown();
 
         if (BROKER.startsWith("vm://"))
@@ -124,10 +132,8 @@
         _consumer.setMessageListener(this);
         _clientConnection.start();
 
-        //Set TTL
         _producer.send(_producerSession.createTextMessage("TxtTestML"));
 
-
         try
         {
             //Wait for message to arrive
@@ -150,7 +156,6 @@
 
     public void onMessage(Message message)
     {
-
         try
         {
             assertEquals("Incorrect Message Received.", "TxtTestML", ((TextMessage) message).getText());
@@ -170,19 +175,235 @@
     {
         _clientConnection.start();
 
-        //Set TTL
         _producer.send(_producerSession.createTextMessage("TxtTestReceive"));
 
         //Receive Message
         Message received = _consumer.receive(1000);
 
+        _clientSession.commit();
+
         assertEquals("Incorrect Message Received.", "TxtTestReceive", ((TextMessage) received).getText());
-        //Receive Message
 
+        //Receive Message
         received = _consumer.receive(1000);
 
         assertNull("More messages received", received);
 
         _consumer.close();
     }
+
+    /**
+     * Test that after the connection has failed over that a sent message is still correctly receieved.
+     * Using Auto-Ack consumer.
+     *
+     * @throws JMSException
+     */
+    public void testReceiveAfterFailover() throws JMSException
+    {
+//        System.err.println("testReceiveAfterFailover");
+        _clientConnection.close();
+
+        MessageConsumer consumer = _producerSession.createConsumer(_queue);
+
+        failServer();
+
+//        System.err.println("Server restarted");
+
+        String MESSAGE_TXT = "TxtTestReceiveAfterFailoverTX";
+
+//        System.err.println("Prod Session:" + _producerSession + ":" + ((AMQSession) _producerSession).isClosed());
+
+        Message sent = _producerSession.createTextMessage(MESSAGE_TXT);
+//        System.err.println("Created message");
+
+        _producer.send(sent);
+//        System.err.println("Sent message");
+
+        //Verify correct message received
+        Message received = consumer.receive(10000);
+//        System.err.println("Message Receieved:" + received);
+
+        assertNotNull("Message should be received.", received);
+        assertEquals("Incorrect Message Received.", MESSAGE_TXT, ((TextMessage) received).getText());
+
+        //Check no more messages are received
+        received = consumer.receive(1000);
+        System.err.println("Second receive completed.");
+
+        assertNull("More messages received", received);
+
+        _producer.close();
+//        System.err.println("Close producer");
+
+        consumer.close();
+//        System.err.println("Close consumer");
+
+        _producerConnection.close();
+    }
+
+    /**
+     * Test that after the connection has failed over the dirty transaction is notified when calling commit
+     *
+     * @throws JMSException
+     */
+    public void testSendBeforeFailoverThenCommitTx() throws JMSException
+    {
+//        System.err.println("testSendBeforeFailoverThenCommitTx");
+        _clientConnection.start();
+
+        //Create a transacted producer.
+        MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+        String MESSAGE_TXT = "testSendBeforeFailoverThenCommitTx";
+
+        //Send the first message
+        txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+        failServer();
+
+        //Check that the message isn't received.
+        Message received = _consumer.receive(1000);
+        assertNull("Message received after failover to clean broker!", received);
+
+        //Attempt to commit session
+        try
+        {
+            _clientSession.commit();
+            fail("TransactionRolledBackException not thrown");
+        }
+        catch (JMSException jmse)
+        {
+            if (!(jmse instanceof TransactionRolledBackException))
+            {
+                fail(jmse.toString());
+            }
+        }
+
+        //Close consumer & producer
+        _consumer.close();
+        txProducer.close();
+    }
+
+    /**
+     * Test that after the connection has failed over the dirty transaction is fast failed by throwing an
+     * Exception on the next send.
+     *
+     * @throws JMSException
+     */
+    public void testSendBeforeFailoverThenSendTx() throws JMSException
+    {
+//        System.err.println("testSendBeforeFailoverThenSendTx");
+
+        _clientConnection.start();
+        MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+        String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+        //Send the first message
+        txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+        failServer();
+
+        //Check that the message isn't received.
+        Message received = _consumer.receive(1000);
+        assertNull("Message received after failover to clean broker!", received);
+
+        //Attempt to send another message on the session, here we should fast fail.
+        try
+        {
+            txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+            fail("JMSException not thrown");
+        }
+        catch (JMSException jmse)
+        {
+            if (!(jmse.getLinkedException() instanceof AMQSessionDirtyException))
+            {
+                fail(jmse.toString());
+            }
+        }
+
+
+        _consumer.close();
+    }
+
+    public void testSendBeforeFailoverThenSend2Tx() throws JMSException
+    {
+//        System.err.println("testSendBeforeFailoverThenSendTx");
+
+        _clientConnection.start();
+        MessageProducer txProducer = _clientSession.createProducer(_queue);
+
+        String MESSAGE_TXT = "TxtTestSendBeforeFailoverTx";
+
+        //Send the first message
+        txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+
+        failServer();
+
+        //Check that the message isn't received.
+        Message received = _consumer.receive(1000);
+        assertNull("Message received after failover to clean broker!", received);
+
+        _clientSession.rollback();
+
+        //Attempt to send another message on the session, here we should fast fail.
+        try
+        {
+            txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+            txProducer.send(_clientSession.createTextMessage(MESSAGE_TXT));
+        }
+        catch (JMSException jmse)
+        {
+            if (jmse.getLinkedException() instanceof AMQSessionDirtyException)
+            {
+                fail(jmse.toString());
+            }
+        }
+
+
+        _consumer.close();
+    }
+
+
+    private void failServer()
+    {
+        if (BROKER.startsWith("vm://"))
+        {
+            //Work around for MessageStore not being initialise and the send not fully completing before the failover occurs.
+            try
+            {
+                Thread.sleep(5000);
+            }
+            catch (InterruptedException e)
+            {
+
+            }
+
+            TransportConnection.killAllVMBrokers();
+            ApplicationRegistry.remove(1);
+            try
+            {
+                TransportConnection.createVMBroker(1);
+            }
+            catch (AMQVMBrokerCreationException e)
+            {
+                _logger.error("Unable to restart broker due to :" + e);
+            }
+
+            //Work around for receive not being failover aware.. because it is the first receive it trys to
+            // unsuspend the channel but in this case the ChannelFlow command goes on the old session and the response on the
+            // new one ... though I thought the statemanager recorded the listeners so should be ok.???
+            try
+            {
+                Thread.sleep(5000);
+            }
+            catch (InterruptedException e)
+            {
+
+            }
+
+        }
+
+    }
+
 }