You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ra...@apache.org on 2007/09/11 14:59:02 UTC
svn commit: r574582 - in
/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client:
AMQConnection.java AMQConnectionDelegate.java
AMQConnectionDelegate_0_10.java AMQConnectionDelegate_0_8.java
Author: rajith
Date: Tue Sep 11 05:59:01 2007
New Revision: 574582
URL: http://svn.apache.org/viewvc?rev=574582&view=rev
Log:
This is the first pass at refactoring the Connection stuff.
The AMQConnection was not made abstract to preserve the widely usd AMQConsutructor in test cases.
Instead 0-8,0-10 specific functionality was delegated to a version specific delegate.
The version is selected via a JVM argument (and based on the URL format)
Currently this mean that we can test only a single code path (o-8 or 0-10) at a time.
Next Step is to refactor the URL stuff.
Added:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
Modified:
incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
Modified: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?rev=574582&r1=574581&r2=574582&view=diff
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Tue Sep 11 05:59:01 2007
@@ -20,34 +20,17 @@
*/
package org.apache.qpid.client;
-import org.apache.qpid.AMQConnectionFailureException;
-import org.apache.qpid.AMQException;
-import org.apache.qpid.AMQUndeliveredException;
-import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverException;
-import org.apache.qpid.client.failover.FailoverProtectedOperation;
-import org.apache.qpid.client.failover.FailoverRetrySupport;
-import org.apache.qpid.client.protocol.AMQProtocolHandler;
-import org.apache.qpid.client.state.AMQState;
-import org.apache.qpid.client.transport.TransportConnection;
-import org.apache.qpid.exchange.ExchangeDefaults;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.BasicQosBody;
-import org.apache.qpid.framing.BasicQosOkBody;
-import org.apache.qpid.framing.ChannelOpenBody;
-import org.apache.qpid.framing.ChannelOpenOkBody;
-import org.apache.qpid.framing.TxSelectBody;
-import org.apache.qpid.framing.TxSelectOkBody;
-import org.apache.qpid.jms.BrokerDetails;
-import org.apache.qpid.jms.ChannelLimitReachedException;
-import org.apache.qpid.jms.Connection;
-import org.apache.qpid.jms.ConnectionListener;
-import org.apache.qpid.jms.ConnectionURL;
-import org.apache.qpid.jms.FailoverPolicy;
-import org.apache.qpid.protocol.AMQConstant;
-import org.apache.qpid.url.URLSyntaxException;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
import javax.jms.ConnectionConsumer;
import javax.jms.ConnectionMetaData;
@@ -66,103 +49,111 @@
import javax.naming.Reference;
import javax.naming.Referenceable;
import javax.naming.StringRefAddr;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
+
+import org.apache.qpid.AMQConnectionFailureException;
+import org.apache.qpid.AMQException;
+import org.apache.qpid.AMQUndeliveredException;
+import org.apache.qpid.AMQUnresolvedAddressException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.protocol.AMQProtocolHandler;
+import org.apache.qpid.exchange.ExchangeDefaults;
+import org.apache.qpid.framing.AMQShortString;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Connection;
+import org.apache.qpid.jms.ConnectionListener;
+import org.apache.qpid.jms.ConnectionURL;
+import org.apache.qpid.jms.FailoverPolicy;
+import org.apache.qpid.protocol.AMQConstant;
+import org.apache.qpid.url.URLSyntaxException;
+import org.apache.qpidity.url.QpidURL;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = LoggerFactory.getLogger(AMQConnection.class);
- private AtomicInteger _idFactory = new AtomicInteger(0);
+ protected AtomicInteger _idFactory = new AtomicInteger(0);
/**
* This is the "root" mutex that must be held when doing anything that could be impacted by failover. This must be
* held by any child objects of this connection such as the session, producers and consumers.
*/
- private final Object _failoverMutex = new Object();
+ protected final Object _failoverMutex = new Object();
/**
* A channel is roughly analogous to a session. The server can negotiate the maximum number of channels per session
* and we must prevent the client from opening too many. Zero means unlimited.
*/
- private long _maximumChannelCount;
+ protected long _maximumChannelCount;
/** The maximum size of frame supported by the server */
- private long _maximumFrameSize;
+ protected long _maximumFrameSize;
/**
* The protocol handler dispatches protocol events for this connection. For example, when the connection is dropped
* the handler deals with this. It also deals with the initial dispatch of any protocol frames to their appropriate
* handler.
*/
- private AMQProtocolHandler _protocolHandler;
+ protected AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
+ protected final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
- private String _clientName;
+ protected String _clientName;
/** The user name to use for authentication */
- private String _username;
+ protected String _username;
/** The password to use for authentication */
- private String _password;
+ protected String _password;
/** The virtual path to connect to on the AMQ server */
- private String _virtualHost;
+ protected String _virtualHost;
- private ExceptionListener _exceptionListener;
+ protected ExceptionListener _exceptionListener;
- private ConnectionListener _connectionListener;
+ protected ConnectionListener _connectionListener;
- private ConnectionURL _connectionURL;
+ protected ConnectionURL _connectionURL;
/**
* Whether this connection is started, i.e. whether messages are flowing to consumers. It has no meaning for message
* publication.
*/
- private boolean _started;
+ protected boolean _started;
/** Policy dictating how to failover */
- private FailoverPolicy _failoverPolicy;
+ protected FailoverPolicy _failoverPolicy;
/*
* _Connected should be refactored with a suitable wait object.
*/
- private boolean _connected;
+ protected boolean _connected;
/*
* The last error code that occured on the connection. Used to return the correct exception to the client
*/
- private AMQException _lastAMQException = null;
+ protected AMQException _lastAMQException = null;
/*
* The connection meta data
*/
- private QpidConnectionMetaData _connectionMetaData;
+ protected QpidConnectionMetaData _connectionMetaData;
/** Configuration info for SSL */
- private SSLConfiguration _sslConfiguration;
+ protected SSLConfiguration _sslConfiguration;
- private AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- private AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
- private AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
- private AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ protected AMQShortString _defaultTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ protected AMQShortString _defaultQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
+ protected AMQShortString _temporaryTopicExchangeName = ExchangeDefaults.TOPIC_EXCHANGE_NAME;
+ protected AMQShortString _temporaryQueueExchangeName = ExchangeDefaults.DIRECT_EXCHANGE_NAME;
/** Thread Pool for executing connection level processes. Such as returning bounced messages. */
- private final ExecutorService _taskPool = Executors.newCachedThreadPool();
- private static final long DEFAULT_TIMEOUT = 1000 * 30;
+ protected final ExecutorService _taskPool = Executors.newCachedThreadPool();
+ protected static final long DEFAULT_TIMEOUT = 1000 * 30;
+
+ private AMQConnectionDelegate _delegate;
/**
* @param broker brokerdetails
@@ -237,12 +228,27 @@
this(new AMQConnectionURL(connection), sslConfig);
}
+ // 0-10 stuff
+ public AMQConnection(QpidURL connectionURL) throws AMQException
+ {
+
+ }
+
/**
* @todo Some horrible stuff going on here with setting exceptions to be non-null to detect if an exception
* was thrown during the connection! Intention not clear. Use a flag anyway, not exceptions... Will fix soon.
*/
public AMQConnection(ConnectionURL connectionURL, SSLConfiguration sslConfig) throws AMQException
{
+ if (Boolean.getBoolean("0-10"))
+ {
+ _delegate = new AMQConnectionDelegate_0_10(this);
+ }
+ else
+ {
+ _delegate = new AMQConnectionDelegate_0_8(this);
+ }
+
if (_logger.isInfoEnabled())
{
_logger.info("Connection:" + connectionURL);
@@ -366,18 +372,6 @@
_connectionMetaData = new QpidConnectionMetaData(this);
}
- protected boolean checkException(Throwable thrown)
- {
- Throwable cause = thrown.getCause();
-
- if (cause == null)
- {
- cause = thrown;
- }
-
- return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
- }
-
protected AMQConnection(String username, String password, String clientName, String virtualHost)
{
_clientName = clientName;
@@ -396,26 +390,20 @@
_virtualHost = virtualHost;
}
- private void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ protected boolean checkException(Throwable thrown)
{
- try
- {
- TransportConnection.getInstance(brokerDetail).connect(_protocolHandler, brokerDetail);
- // this blocks until the connection has been set up or when an error
- // has prevented the connection being set up
- _protocolHandler.attainState(AMQState.CONNECTION_OPEN);
- _failoverPolicy.attainedConnection();
+ Throwable cause = thrown.getCause();
- // Again this should be changed to a suitable notify
- _connected = true;
- }
- catch (AMQException e)
+ if (cause == null)
{
- _lastAMQException = e;
- throw e;
+ cause = thrown;
}
+
+ return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
}
+
+
public boolean attemptReconnection(String host, int port)
{
BrokerDetails bd = new AMQBrokerDetails(host, port, _sslConfiguration);
@@ -475,6 +463,11 @@
return false;
}
+ public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ {
+ _delegate.makeBrokerConnection(brokerDetail);
+ }
+
/**
* Get the details of the currently active broker
*
@@ -512,114 +505,8 @@
public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
final int prefetchHigh, final int prefetchLow) throws JMSException
{
- checkNotClosed();
-
- if (channelLimitReached())
- {
- throw new ChannelLimitReachedException(_maximumChannelCount);
- }
-
- return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
- new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
- {
- public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
- {
- int channelId = _idFactory.incrementAndGet();
-
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Write channel open frame for channel id " + channelId);
- }
-
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session =
- new AMQSession_0_8(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
- // _protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
-
- boolean success = false;
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
- }
- catch (AMQException e)
- {
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
- {
- deregisterSession(channelId);
- }
- }
-
- if (_started)
- {
- try
- {
- session.start();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
- }
-
- return session;
- }
- }, this).execute();
- }
-
- private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
-
- // TODO: Be aware of possible changes to parameter order as versions change.
-
- _protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), null), // outOfBand
- ChannelOpenOkBody.class);
-
- // todo send low water mark when protocol allows.
- // todo Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion(), false, // global
- prefetchHigh, // prefetchCount
- 0), // prefetchSize
- BasicQosOkBody.class);
-
- if (transacted)
- {
- if (_logger.isDebugEnabled())
- {
- _logger.debug("Issuing TxSelect for " + channelId);
- }
-
- // TODO: Be aware of possible changes to parameter order as versions change.
- _protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _protocolHandler.getProtocolMajorVersion(),
- _protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
- }
- }
- private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException, FailoverException
- {
- try
- {
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- }
- catch (AMQException e)
- {
- deregisterSession(channelId);
- throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
- }
+ return _delegate.createSession(transacted, acknowledgeMode, prefetchHigh, prefetchLow);
}
public void setFailoverPolicy(FailoverPolicy policy)
@@ -664,7 +551,7 @@
return new AMQTopicSessionAdaptor(createSession(transacted, acknowledgeMode));
}
- private boolean channelLimitReached()
+ public boolean channelLimitReached()
{
return (_maximumChannelCount != 0) && (_sessions.size() == _maximumChannelCount);
}
@@ -805,18 +692,6 @@
}
}
- private long adjustTimeout(long timeout, long startTime)
- {
- long now = System.currentTimeMillis();
- timeout -= now - startTime;
- if (timeout < 0)
- {
- timeout = 0;
- }
-
- return timeout;
- }
-
/**
* Marks all sessions and their children as closed without sending any protocol messages. Useful when you need to
* mark objects "visible" in userland as closed after failover or other significant event that impacts the
@@ -880,6 +755,19 @@
}
}
+
+ private long adjustTimeout(long timeout, long startTime)
+ {
+ long now = System.currentTimeMillis();
+ timeout -= now - startTime;
+ if (timeout < 0)
+ {
+ timeout = 0;
+ }
+
+ return timeout;
+ }
+
public ConnectionConsumer createConnectionConsumer(Destination destination, String messageSelector,
ServerSessionPool sessionPool, int maxMessages) throws JMSException
{
@@ -1055,6 +943,11 @@
return _failoverMutex;
}
+ public void resubscribeSessions() throws JMSException, AMQException, FailoverException
+ {
+ _delegate.resubscribeSessions();
+ }
+
/**
* If failover is taking place this will block until it has completed. If failover is not taking place it will
* return immediately.
@@ -1068,7 +961,18 @@
/**
* Invoked by the AMQProtocolSession when a protocol session exception has occurred. This method sends the exception
- * to a JMS exception listener, if configured, and propagates the exception to sessions, which in turn will
+ * to a JMS exception liste
+ {
+ ArrayList sessions = new ArrayList(_sessions.values());
+ _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+ for (Iterator it = sessions.iterator(); it.hasNext();)
+ {
+ AMQSession s = (AMQSession) it.next();
+ // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+ s.resubscribe();
+ }
+ }ner, if configured, and propagates the exception to sessions, which in turn will
* propagate to consumers. This allows synchronous consumers to have exceptions thrown to them.
*
* @param cause the exception
@@ -1158,23 +1062,6 @@
void deregisterSession(int channelId)
{
_sessions.remove(channelId);
- }
-
- /**
- * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
- * The caller must hold the failover mutex before calling this method.
- */
- public void resubscribeSessions() throws JMSException, AMQException, FailoverException
- {
- ArrayList sessions = new ArrayList(_sessions.values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
- for (Iterator it = sessions.iterator(); it.hasNext();)
- {
- AMQSession s = (AMQSession) it.next();
- // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
- reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
- s.resubscribe();
- }
}
public String toString()
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java?rev=574582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate.java Tue Sep 11 05:59:01 2007
@@ -0,0 +1,39 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.jms.BrokerDetails;
+
+public interface AMQConnectionDelegate
+{
+ public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException;
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+ final int prefetchHigh, final int prefetchLow) throws JMSException;
+
+ public void resubscribeSessions() throws JMSException, AMQException, FailoverException;
+}
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=574582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Tue Sep 11 05:59:01 2007
@@ -0,0 +1,43 @@
+package org.apache.qpid.client;
+
+import java.io.IOException;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.Session;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQConnectionDelegate_0_10 implements AMQConnectionDelegate
+{
+
+ private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_10.class);
+ private AMQConnection _conn;
+
+ public AMQConnectionDelegate_0_10(AMQConnection conn)
+ {
+ _conn = conn;
+ }
+
+ public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow) throws JMSException
+ {
+ // TODO Auto-generated method stub
+ return null;
+ }
+
+ public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+ public void resubscribeSessions() throws JMSException, AMQException, FailoverException
+ {
+ // TODO Auto-generated method stub
+
+ }
+
+}
Added: incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java
URL: http://svn.apache.org/viewvc/incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java?rev=574582&view=auto
==============================================================================
--- incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java (added)
+++ incubator/qpid/trunk/qpid/java/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_8.java Tue Sep 11 05:59:01 2007
@@ -0,0 +1,226 @@
+/*
+ *
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing,
+ * software distributed under the License is distributed on an
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
+ * KIND, either express or implied. See the License for the
+ * specific language governing permissions and limitations
+ * under the License.
+ *
+ */
+package org.apache.qpid.client;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+
+import javax.jms.JMSException;
+
+import org.apache.qpid.AMQException;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
+import org.apache.qpid.client.state.AMQState;
+import org.apache.qpid.client.transport.TransportConnection;
+import org.apache.qpid.framing.BasicQosBody;
+import org.apache.qpid.framing.BasicQosOkBody;
+import org.apache.qpid.framing.ChannelOpenBody;
+import org.apache.qpid.framing.ChannelOpenOkBody;
+import org.apache.qpid.framing.TxSelectBody;
+import org.apache.qpid.framing.TxSelectOkBody;
+import org.apache.qpid.jms.BrokerDetails;
+import org.apache.qpid.jms.ChannelLimitReachedException;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+public class AMQConnectionDelegate_0_8 implements AMQConnectionDelegate
+{
+ private static final Logger _logger = LoggerFactory.getLogger(AMQConnectionDelegate_0_8.class);
+ private AMQConnection _conn;
+
+ public AMQConnectionDelegate_0_8(AMQConnection conn)
+ {
+ _conn = conn;
+ }
+
+ protected boolean checkException(Throwable thrown)
+ {
+ Throwable cause = thrown.getCause();
+
+ if (cause == null)
+ {
+ cause = thrown;
+ }
+
+ return ((cause instanceof ConnectException) || (cause instanceof UnresolvedAddressException));
+ }
+
+ public void makeBrokerConnection(BrokerDetails brokerDetail) throws IOException, AMQException
+ {
+ try
+ {
+ TransportConnection.getInstance(brokerDetail).connect(_conn._protocolHandler, brokerDetail);
+ // this blocks until the connection has been set up or when an error
+ // has prevented the connection being set up
+ _conn._protocolHandler.attainState(AMQState.CONNECTION_OPEN);
+ _conn._failoverPolicy.attainedConnection();
+
+ // Again this should be changed to a suitable notify
+ _conn._connected = true;
+ }
+ catch (AMQException e)
+ {
+ _conn._lastAMQException = e;
+ throw e;
+ }
+ }
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetch)
+ throws JMSException
+ {
+ return createSession(transacted, acknowledgeMode, prefetch, prefetch);
+ }
+
+ public org.apache.qpid.jms.Session createSession(final boolean transacted, final int acknowledgeMode,
+ final int prefetchHigh, final int prefetchLow) throws JMSException
+ {
+ _conn.checkNotClosed();
+
+ if (_conn.channelLimitReached())
+ {
+ throw new ChannelLimitReachedException(_conn._maximumChannelCount);
+ }
+
+ return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+ new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+ {
+ public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
+ {
+ int channelId = _conn._idFactory.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession_0_8(_conn, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ // _protocolHandler.addSessionByChannel(channelId, session);
+ _conn.registerSession(channelId, session);
+
+ boolean success = false;
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
+ {
+ _conn.deregisterSession(channelId);
+ }
+ }
+
+ if (_conn._started)
+ {
+ try
+ {
+ session.start();
+ }
+ catch (AMQException e)
+ {
+ throw new JMSAMQException(e);
+ }
+ }
+
+ return session;
+ }
+ }, _conn).execute();
+ }
+
+ private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ throws AMQException, FailoverException
+ {
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+
+ _conn._protocolHandler.syncWrite(ChannelOpenBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
+ _conn._protocolHandler.getProtocolMinorVersion(), null), // outOfBand
+ ChannelOpenOkBody.class);
+
+ // todo send low water mark when protocol allows.
+ // todo Be aware of possible changes to parameter order as versions change.
+ _conn._protocolHandler.syncWrite(BasicQosBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
+ _conn._protocolHandler.getProtocolMinorVersion(), false, // global
+ prefetchHigh, // prefetchCount
+ 0), // prefetchSize
+ BasicQosOkBody.class);
+
+ if (transacted)
+ {
+ if (_logger.isDebugEnabled())
+ {
+ _logger.debug("Issuing TxSelect for " + channelId);
+ }
+
+ // TODO: Be aware of possible changes to parameter order as versions change.
+ _conn._protocolHandler.syncWrite(TxSelectBody.createAMQFrame(channelId, _conn._protocolHandler.getProtocolMajorVersion(),
+ _conn._protocolHandler.getProtocolMinorVersion()), TxSelectOkBody.class);
+ }
+ }
+
+ /**
+ * For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
+ * The caller must hold the failover mutex before calling this method.
+ */
+ public void resubscribeSessions() throws JMSException, AMQException, FailoverException
+ {
+ ArrayList sessions = new ArrayList(_conn._sessions.values());
+ _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+ for (Iterator it = sessions.iterator(); it.hasNext();)
+ {
+ AMQSession s = (AMQSession) it.next();
+ // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
+ s.resubscribe();
+ }
+ }
+
+ private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ throws AMQException, FailoverException
+ {
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ }
+ catch (AMQException e)
+ {
+ _conn.deregisterSession(channelId);
+ throw new AMQException(null, "Error reopening channel " + channelId + " after failover: " + e, e);
+ }
+ }
+}