You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/09/11 14:59:02 UTC

svn commit: r574582 - in /incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client: AMQConnection.java AMQConnectionDelegate.java AMQConnectionDelegate_0_10.java AMQConnectionDelegate_0_8.java

Author: rajith
Date: Tue Sep 11 05:59:01 2007
New Revision: 574582

URL: http://svn.apache.org/viewvc?rev=574582&view=rev
Log:
This is the first pass at refactoring the Connection stuff.
The AMQConnection was not made abstract to preserve the widely usd AMQConsutructor in test cases.
Instead 0-8,0-10 specific functionality was delegated to a version specific delegate.

The version is selected via a JVM argument (and based on the URL format)
Currently this mean that we can test only a single code path (o-8 or 0-10) at a time.

Next Step is to refactor the URL stuff.


Added:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
Modified:
    incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java

Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=574582&r1=574581&r2=574582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Sep 11 05:59:01 2007
@@ -20,34 +20,17 @@
  */
 package org.apache.qpid.client;
 
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ChannelLimitReachedException;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
 
 import javax.jms.ConnectionConsumer;
 import javax.jms.ConnectionMetaData;
@@ -66,103 +49,111 @@
 import javax.naming.Reference;
 import javax.naming.Referenceable;
 import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpidity.url.QpidURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
 {
     private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
 
-    private AtomicInteger _idFactory = new AtomicInteger(0);
+    protected AtomicInteger _idFactory = new AtomicInteger(0);
 
     /**
      * This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
      * held by any child objects of this connection such as the session, producers and consumers.
      */
-    private final Object _failoverMutex = new Object();
+    protected final Object _failoverMutex = new Object();
 
     /**
      * A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
      * and we must prevent the client from opening too many. Zero means unlimited.
      */
-    private long _maximumChannelCount;
+    protected long _maximumChannelCount;
 
     /** The maximum size of frame supported by the server */
-    private long _maximumFrameSize;
+    protected long _maximumFrameSize;
 
     /**
      * The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
      * the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
      * handler.
      */
-    private AMQProtocolHandler _protocolHandler;
+    protected AMQProtocolHandler _protocolHandler;
 
     /** Maps from session id (Integer) to AMQSession instance */
-    private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+    protected final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
 
-    private String _clientName;
+    protected String _clientName;
 
     /** The user name to use for authentication */
-    private String _username;
+    protected String _username;
 
     /** The password to use for authentication */
-    private String _password;
+    protected String _password;
 
     /** The virtual path to connect to on the AMQ server */
-    private String _virtualHost;
+    protected String _virtualHost;
 
-    private ExceptionListener _exceptionListener;
+    protected ExceptionListener _exceptionListener;
 
-    private ConnectionListener _connectionListener;
+    protected ConnectionListener _connectionListener;
 
-    private ConnectionURL _connectionURL;
+    protected ConnectionURL _connectionURL;
 
     /**
      * Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
      * publication.
      */
-    private boolean _started;
+    protected boolean _started;
 
     /** Policy dictating how to failover */
-    private FailoverPolicy _failoverPolicy;
+    protected FailoverPolicy _failoverPolicy;
 
     /*
      * _Connected should be refactored with a suitable wait object.
      */
-    private boolean _connected;
+    protected boolean _connected;
 
     /*
      * The last error code that occured on the connection. Used to return the correct exception to the client
      */
-    private AMQException _lastAMQException = null;
+    protected AMQException _lastAMQException = null;
 
     /*
      * The connection meta data
      */
-    private QpidConnectionMetaData _connectionMetaData;
+    protected QpidConnectionMetaData _connectionMetaData;
 
     /** Configuration info for SSL */
-    private SSLConfiguration _sslConfiguration;
+    protected SSLConfiguration _sslConfiguration;
 
-    private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
-    private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
-    private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
-    private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+    protected AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+    protected AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+    protected AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+    protected AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
 
     /** Thread Pool for executing connection level processes. Such as returning bounced messages. */
-    private final ExecutorService _taskPool = Executors.newCachedThreadPool();
-    private static final long DEFAULT_TIMEOUT = 1000 * 30;
+    protected final ExecutorService _taskPool = Executors.newCachedThreadPool();
+    protected static final long DEFAULT_TIMEOUT = 1000 * 30;
+
+    private AMQConnectionDelegate _delegate;
 
     /**
      * @param broker      brokerdetails
@@ -237,12 +228,27 @@
         this(new AMQConnectionURL(connection), sslConfig);
     }
 
+    // 0-10 stuff
+    public AMQConnection(QpidURL connectionURL) throws AMQException
+    {
+
+    }
+
     /**
      * @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
      *       was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
      */
     public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
     {
+        if (Boolean.getBoolean("0-10"))
+        {
+            _delegate = new AMQConnectionDelegate_0_10(this);
+        }
+        else
+        {
+            _delegate = new AMQConnectionDelegate_0_8(this);
+        }
+
         if (_logger.isInfoEnabled())
         {
             _logger.info("Connection:" + connectionURL);
@@ -366,18 +372,6 @@
         _connectionMetaData = new QpidConnectionMetaData(this);
     }
 
-    protected boolean checkException(Throwable thrown)
-    {
-        Throwable cause = thrown.getCause();
-
-        if (cause == null)
-        {
-            cause = thrown;
-        }
-
-        return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
-    }
-
     protected AMQConnection(String username, String password, String clientName, String virtualHost)
     {
         _clientName = clientName;
@@ -396,26 +390,20 @@
         _virtualHost = virtualHost;
     }
 
-    private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+    protected boolean checkException(Throwable thrown)
     {
-        try
-        {
-            TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
-            // this blocks until the connection has been set up or when an error
-            // has prevented the connection being set up
-            _protocolHandler.attainState(AMQState.CONNECTION_OPEN);
-            _failoverPolicy.attainedConnection();
+        Throwable cause = thrown.getCause();
 
-            // Again this should be changed to a suitable notify
-            _connected = true;
-        }
-        catch (AMQException e)
+        if (cause == null)
         {
-            _lastAMQException = e;
-            throw e;
+            cause = thrown;
         }
+
+        return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
     }
 
+
+
     public boolean attemptReconnection(String host, int port)
     {
         BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
@@ -475,6 +463,11 @@
         return false;
     }
 
+    public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+    {
+        _delegate.makeBrokerConnection(brokerDetail);
+    }
+
     /**
      * Get the details of the currently active broker
      *
@@ -512,114 +505,8 @@
     public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
                                                      final int prefetchHigh, final int prefetchLow) throws JMSException
     {
-        checkNotClosed();
-
-        if (channelLimitReached())
-        {
-            throw new ChannelLimitReachedException(_maximumChannelCount);
-        }
-
-        return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
-                new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
-                {
-                    public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
-                    {
-                        int channelId = _idFactory.incrementAndGet();
-
-                        if (_logger.isDebugEnabled())
-                        {
-                            _logger.debug("Write channel open frame for channel id " + channelId);
-                        }
-
-                        // We must create the session and register it before actually sending the frame to the server to
-                        // open it, so that there is no window where we could receive data on the channel and not be set
-                        // up to handle it appropriately.
-                        AMQSession session =
-                                new AMQSession_0_8(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
-                                               prefetchLow);
-                        // _protocolHandler.addSessionByChannel(channelId, session);
-                        registerSession(channelId, session);
-
-                        boolean success = false;
-                        try
-                        {
-                            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
-                            success = true;
-                        }
-                        catch (AMQException e)
-                        {
-                            JMSException jmse = new JMSException("Error creating session: " + e);
-                            jmse.setLinkedException(e);
-                            throw jmse;
-                        }
-                        finally
-                        {
-                            if (!success)
-                            {
-                                deregisterSession(channelId);
-                            }
-                        }
-
-                        if (_started)
-                        {
-                            try
-                            {
-                                session.start();
-                            }
-                            catch (AMQException e)
-                            {
-                                throw new JMSAMQException(e);
-                            }
-                        }
-
-                        return session;
-                    }
-                }, this).execute();
-    }
-
-    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
-            throws AMQException, FailoverException
-    {
-
-        // TODO: Be aware of possible changes to parameter order as versions change.
-
-        _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
-                                                                  _protocolHandler.getProtocolMinorVersion(), null), // outOfBand
-                                                                                                                     ChannelOpenOkBody.class);
-
-        // todo send low water mark when protocol allows.
-        // todo Be aware of possible changes to parameter order as versions change.
-        _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
-                                                               _protocolHandler.getProtocolMinorVersion(), false, // global
-                                                               prefetchHigh, // prefetchCount
-                                                               0), // prefetchSize
-                                                                   BasicQosOkBody.class);
-
-        if (transacted)
-        {
-            if (_logger.isDebugEnabled())
-            {
-                _logger.debug("Issuing TxSelect for " + channelId);
-            }
-
-            // TODO: Be aware of possible changes to parameter order as versions change.
-            _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
-                                                                   _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
-        }
-    }
 
-    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
-            throws AMQException, FailoverException
-    {
-        try
-        {
-            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
-        }
-        catch (AMQException e)
-        {
-            deregisterSession(channelId);
-            throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
-        }
+        return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
     }
 
     public void setFailoverPolicy(FailoverPolicy policy)
@@ -664,7 +551,7 @@
         return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
     }
 
-    private boolean channelLimitReached()
+    public boolean channelLimitReached()
     {
         return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
     }
@@ -805,18 +692,6 @@
         }
     }
 
-    private long adjustTimeout(long timeout, long startTime)
-    {
-        long now = System.currentTimeMillis();
-        timeout -= now - startTime;
-        if (timeout < 0)
-        {
-            timeout = 0;
-        }
-
-        return timeout;
-    }
-
     /**
      * Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
      * mark objects "visible" in userland as closed after failover or other significant event that impacts the
@@ -880,6 +755,19 @@
         }
     }
 
+
+    private long adjustTimeout(long timeout, long startTime)
+    {
+        long now = System.currentTimeMillis();
+        timeout -= now - startTime;
+        if (timeout < 0)
+        {
+            timeout = 0;
+        }
+
+        return timeout;
+    }
+
     public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
                                                        ServerSessionPool sessionPool, int maxMessages) throws JMSException
     {
@@ -1055,6 +943,11 @@
         return _failoverMutex;
     }
 
+    public void resubscribeSessions() throws JMSException, AMQException, FailoverException
+    {
+        _delegate.resubscribeSessions();
+    }
+
     /**
      * If failover is taking place this will block until it has completed. If failover is not taking place it will
      * return immediately.
@@ -1068,7 +961,18 @@
 
     /**
      * Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
-     * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+     * to a JMS exception liste
+    {
+        ArrayList sessions = new ArrayList(_sessions.values());
+        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+        for (Iterator it = sessions.iterator(); it.hasNext();)
+        {
+            AMQSession s = (AMQSession) it.next();
+            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+            s.resubscribe();
+        }
+    }ner, if configured, and propagates the exception to sessions, which in turn will
      * propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
      *
      * @param cause the exception
@@ -1158,23 +1062,6 @@
     void deregisterSession(int channelId)
     {
         _sessions.remove(channelId);
-    }
-
-    /**
-     * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
-     * The caller must hold the failover mutex before calling this method.
-     */
-    public void resubscribeSessions() throws JMSException, AMQException, FailoverException
-    {
-        ArrayList sessions = new ArrayList(_sessions.values());
-        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
-        for (Iterator it = sessions.iterator(); it.hasNext();)
-        {
-            AMQSession s = (AMQSession) it.next();
-            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
-            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
-            s.resubscribe();
-        }
     }
 
     public String toString()

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=574582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Tue Sep 11 05:59:01 2007
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.jms.BrokerDetails;
+
+public interface AMQConnectionDelegate
+{
+    public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
+
+    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+            final int prefetchHigh, final int prefetchLow) throws JMSException;
+
+    public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
+}

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=574582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Sep 11 05:59:01 2007
@@ -0,0 +1,43 @@
+package org.apache.qpid.client;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
+{
+
+    private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
+    private AMQConnection _conn;
+
+    public AMQConnectionDelegate_0_10(AMQConnection conn)
+    {
+        _conn = conn;
+    }
+
+    public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow) throws JMSException
+    {
+        // TODO Auto-generated method stub
+        return null;
+    }
+
+    public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+    public void resubscribeSessions() throws JMSException, AMQException, FailoverException
+    {
+        // TODO Auto-generated method stub
+
+    }
+
+}

Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java?rev=574582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java Tue Sep 11 05:59:01 2007
@@ -0,0 +1,226 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *   http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied.  See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate
+{
+    private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_8.class);
+    private AMQConnection _conn;
+
+    public AMQConnectionDelegate_0_8(AMQConnection conn)
+    {
+        _conn = conn;
+    }
+
+    protected boolean checkException(Throwable thrown)
+    {
+        Throwable cause = thrown.getCause();
+
+        if (cause == null)
+        {
+            cause = thrown;
+        }
+
+        return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
+    }
+
+    public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+    {
+        try
+        {
+            TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+            // this blocks until the connection has been set up or when an error
+            // has prevented the connection being set up
+            _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN);
+            _conn._failoverPolicy.attainedConnection();
+
+            // Again this should be changed to a suitable notify
+            _conn._connected = true;
+        }
+        catch (AMQException e)
+        {
+            _conn._lastAMQException = e;
+            throw e;
+        }
+    }
+
+    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
+            throws JMSException
+    {
+        return createSession(transacted, acknowledgeMode, prefetch, prefetch);
+    }
+
+    public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+                                                     final int prefetchHigh, final int prefetchLow) throws JMSException
+    {
+        _conn.checkNotClosed();
+
+        if (_conn.channelLimitReached())
+        {
+            throw new ChannelLimitReachedException(_conn._maximumChannelCount);
+        }
+
+        return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+                new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+                {
+                    public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+                    {
+                        int channelId = _conn._idFactory.incrementAndGet();
+
+                        if (_logger.isDebugEnabled())
+                        {
+                            _logger.debug("Write channel open frame for channel id " + channelId);
+                        }
+
+                        // We must create the session and register it before actually sending the frame to the server to
+                        // open it, so that there is no window where we could receive data on the channel and not be set
+                        // up to handle it appropriately.
+                        AMQSession session =
+                                new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
+                                               prefetchLow);
+                        // _protocolHandler.addSessionByChannel(channelId, session);
+                        _conn.registerSession(channelId, session);
+
+                        boolean success = false;
+                        try
+                        {
+                            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+                            success = true;
+                        }
+                        catch (AMQException e)
+                        {
+                            JMSException jmse = new JMSException("Error creating session: " + e);
+                            jmse.setLinkedException(e);
+                            throw jmse;
+                        }
+                        finally
+                        {
+                            if (!success)
+                            {
+                                _conn.deregisterSession(channelId);
+                            }
+                        }
+
+                        if (_conn._started)
+                        {
+                            try
+                            {
+                                session.start();
+                            }
+                            catch (AMQException e)
+                            {
+                                throw new JMSAMQException(e);
+                            }
+                        }
+
+                        return session;
+                    }
+                }, _conn).execute();
+    }
+
+    private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+            throws AMQException, FailoverException
+    {
+
+        // TODO: Be aware of possible changes to parameter order as versions change.
+
+        _conn._protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
+                _conn._protocolHandler.getProtocolMinorVersion(), null), // outOfBand
+                                                                                                                     ChannelOpenOkBody.class);
+
+        // todo send low water mark when protocol allows.
+        // todo Be aware of possible changes to parameter order as versions change.
+        _conn._protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
+                _conn._protocolHandler.getProtocolMinorVersion(), false, // global
+                                                               prefetchHigh, // prefetchCount
+                                                               0), // prefetchSize
+                                                                   BasicQosOkBody.class);
+
+        if (transacted)
+        {
+            if (_logger.isDebugEnabled())
+            {
+                _logger.debug("Issuing TxSelect for " + channelId);
+            }
+
+            // TODO: Be aware of possible changes to parameter order as versions change.
+            _conn._protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
+                    _conn._protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
+        }
+    }
+
+    /**
+     * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+     * The caller must hold the failover mutex before calling this method.
+     */
+    public void resubscribeSessions() throws JMSException, AMQException, FailoverException
+    {
+        ArrayList sessions = new ArrayList(_conn._sessions.values());
+        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+        for (Iterator it = sessions.iterator(); it.hasNext();)
+        {
+            AMQSession s = (AMQSession) it.next();
+            // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+            reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+            s.resubscribe();
+        }
+    }
+
+    private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+    throws AMQException, FailoverException
+    {
+        try
+        {
+            createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+        }
+        catch (AMQException e)
+        {
+            _conn.deregisterSession(channelId);
+            throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
+        }
+    }
+}