You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/06/01 16:33:09 UTC

svn commit: r543496 [1/4] - in /incubator/qpid/branches/M2/java: client/src/main/java/org/apache/qpid/client/ client/src/main/java/org/apache/qpid/client/failover/ client/src/main/java/org/apache/qpid/client/protocol/ client/src/main/java/org/apache/qp...

Author: rupertlssmith
Date: Fri Jun  1 07:33:07 2007
New Revision: 543496

URL: http://svn.apache.org/viewvc?view=rev&rev=543496
Log:
QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers.

Added:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java
Removed:
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
Modified:
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
    incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
    incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
    incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java
    incubator/qpid/branches/M2/java/perftests/RunningPerformanceTests.txt
    incubator/qpid/branches/M2/java/perftests/pom.xml
    incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java

Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=543496&r1=543495&r2=543496
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Jun  1 07:33:07 2007
@@ -20,34 +20,15 @@
  */
 package org.apache.qpid.client;
 
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
 import org.apache.log4j.Logger;
 
 import org.apache.qpid.AMQConnectionFailureException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.AMQUndeliveredException;
 import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.protocol.AMQProtocolHandler;
 import org.apache.qpid.client.state.AMQState;
 import org.apache.qpid.client.transport.TransportConnection;
@@ -67,6 +48,27 @@
 import org.apache.qpid.jms.FailoverPolicy;
 import org.apache.qpid.url.URLSyntaxException;
 
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
     private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -96,8 +98,7 @@
     private AMQProtocolHandler _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
-    private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>();
-
+    private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
 
     private String _clientName;
 
@@ -486,72 +487,72 @@
         final int prefetchHigh, final int prefetchLow) throws JMSException
     {
         checkNotClosed();
+
         if (channelLimitReached())
         {
             throw new ChannelLimitReachedException(_maximumChannelCount);
         }
-        else
-        {
-            return (org.apache.qpid.jms.Session) new FailoverSupport()
+
+        return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+                new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+                {
+                    public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
                     {
-                        public Object operation() throws JMSException
+                        int channelId = _idFactory.incrementAndGet();
+
+                        if (_logger.isDebugEnabled())
                         {
-                            int channelId = _idFactory.incrementAndGet();
+                            _logger.debug("Write channel open frame for channel id " + channelId);
+                        }
+
+                        // We must create the session and register it before actually sending the frame to the server to
+                        // open it, so that there is no window where we could receive data on the channel and not be set
+                        // up to handle it appropriately.
+                        AMQSession session =
+                            new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+                                prefetchLow);
+                        // _protocolHandler.addSessionByChannel(channelId, session);
+                        registerSession(channelId, session);
 
-                            if (_logger.isDebugEnabled())
+                        boolean success = false;
+                        try
+                        {
+                            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+                            success = true;
+                        }
+                        catch (AMQException e)
+                        {
+                            JMSException jmse = new JMSException("Error creating session: " + e);
+                            jmse.setLinkedException(e);
+                            throw jmse;
+                        }
+                        finally
+                        {
+                            if (!success)
                             {
-                                _logger.debug("Write channel open frame for channel id " + channelId);
+                                deregisterSession(channelId);
                             }
+                        }
 
-                            // We must create the session and register it before actually sending the frame to the server to
-                            // open it, so that there is no window where we could receive data on the channel and not be set
-                            // up to handle it appropriately.
-                            AMQSession session =
-                                new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
-                                    prefetchLow);
-                            //_protocolHandler.addSessionByChannel(channelId, session);
-                            registerSession(channelId, session);
-
-                            boolean success = false;
+                        if (_started)
+                        {
                             try
                             {
-                                createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
-                                success = true;
+                                session.start();
                             }
                             catch (AMQException e)
                             {
-                                JMSException jmse = new JMSException("Error creating session: " + e);
-                                jmse.setLinkedException(e);
-                                throw jmse;
-                            }
-                            finally
-                            {
-                                if (!success)
-                                {
-                                    deregisterSession(channelId);
-                                }
+                                throw new JMSAMQException(e);
                             }
-
-                            if (_started)
-                            {
-                                try
-                                {
-                                    session.start();
-                                }
-                                catch (AMQException e)
-                                {
-                                    throw new JMSAMQException(e);
-                                }
-                            }
-
-                            return session;
                         }
-                    }.execute(this);
-        }
+
+                        return session;
+                    }
+                }, this).execute();
     }
 
     private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
-        throws AMQException
+        throws AMQException, FailoverException
     {
 
         // TODO: Be aware of possible changes to parameter order as versions change.
@@ -581,7 +582,8 @@
         }
     }
 
-    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
+    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+        throws AMQException, FailoverException
     {
         try
         {
@@ -1128,14 +1130,14 @@
      * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
      * The caller must hold the failover mutex before calling this method.
      */
-    public void resubscribeSessions() throws JMSException, AMQException
+    public void resubscribeSessions() throws JMSException, AMQException, FailoverException
     {
         ArrayList sessions = new ArrayList(_sessions.values());
         _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {
             AMQSession s = (AMQSession) it.next();
-            //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
             reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
             s.resubscribe();
         }
@@ -1222,7 +1224,6 @@
     {
         _taskPool.execute(task);
     }
-
 
     public AMQSession getSession(int channelId)
     {