You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by ru...@apache.org on 2007/06/01 16:33:09 UTC
svn commit: r543496 [1/4] - in /incubator/qpid/branches/M2/java:
client/src/main/java/org/apache/qpid/client/
client/src/main/java/org/apache/qpid/client/failover/
client/src/main/java/org/apache/qpid/client/protocol/
client/src/main/java/org/apache/qp...
Author: rupertlssmith
Date: Fri Jun 1 07:33:07 2007
New Revision: 543496
URL: http://svn.apache.org/viewvc?view=rev&rev=543496
Log:
QPID-402: FailoverException falling through to client. All blocking operations now wrapped in failover support wrappers.
Added:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverNoopSupport.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverProtectedOperation.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverRetrySupport.java
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnectionHelper.java
Removed:
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/testutil/QpidClientConnection.java
Modified:
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQSession.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/BasicMessageConsumer.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/JMSAMQException.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverException.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverHandler.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/failover/FailoverSupport.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/AMQProtocolHandler.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/util/FlowControllingBlockingQueue.java
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/client/channelclose/ChannelCloseTest.java
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/close/MessageRequeueTest.java
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/test/unit/transacted/CommitRollbackTest.java
incubator/qpid/branches/M2/java/client/src/test/java/org/apache/qpid/testutil/QpidClientConnection.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQConnectionClosedException.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/AMQPInvalidClassException.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/protocol/AMQMethodListener.java
incubator/qpid/branches/M2/java/common/src/main/java/org/apache/qpid/util/PrettyPrintingUtils.java
incubator/qpid/branches/M2/java/perftests/RunningPerformanceTests.txt
incubator/qpid/branches/M2/java/perftests/pom.xml
incubator/qpid/branches/M2/java/systests/src/main/java/org/apache/qpid/server/failure/HeapExhaustion.java
Modified: incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java
URL: http://svn.apache.org/viewvc/incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java?view=diff&rev=543496&r1=543495&r2=543496
==============================================================================
--- incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java (original)
+++ incubator/qpid/branches/M2/java/client/src/main/java/org/apache/qpid/client/AMQConnection.java Fri Jun 1 07:33:07 2007
@@ -20,34 +20,15 @@
*/
package org.apache.qpid.client;
-import java.io.IOException;
-import java.net.ConnectException;
-import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
-import java.util.ArrayList;
-import java.util.Iterator;
-import java.util.LinkedHashMap;
-import java.util.LinkedList;
-import java.util.Map;
-import java.util.concurrent.ExecutorService;
-import java.util.concurrent.Executors;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.atomic.AtomicInteger;
-
-import javax.jms.*;
-import javax.jms.IllegalStateException;
-import javax.naming.NamingException;
-import javax.naming.Reference;
-import javax.naming.Referenceable;
-import javax.naming.StringRefAddr;
-
import org.apache.log4j.Logger;
import org.apache.qpid.AMQConnectionFailureException;
import org.apache.qpid.AMQException;
import org.apache.qpid.AMQUndeliveredException;
import org.apache.qpid.AMQUnresolvedAddressException;
-import org.apache.qpid.client.failover.FailoverSupport;
+import org.apache.qpid.client.failover.FailoverException;
+import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.protocol.AMQProtocolHandler;
import org.apache.qpid.client.state.AMQState;
import org.apache.qpid.client.transport.TransportConnection;
@@ -67,6 +48,27 @@
import org.apache.qpid.jms.FailoverPolicy;
import org.apache.qpid.url.URLSyntaxException;
+import javax.jms.*;
+import javax.jms.IllegalStateException;
+import javax.naming.NamingException;
+import javax.naming.Reference;
+import javax.naming.Referenceable;
+import javax.naming.StringRefAddr;
+
+import java.io.IOException;
+import java.net.ConnectException;
+import java.nio.channels.UnresolvedAddressException;
+import java.text.MessageFormat;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.LinkedHashMap;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
public class AMQConnection extends Closeable implements Connection, QueueConnection, TopicConnection, Referenceable
{
private static final Logger _logger = Logger.getLogger(AMQConnection.class);
@@ -96,8 +98,7 @@
private AMQProtocolHandler _protocolHandler;
/** Maps from session id (Integer) to AMQSession instance */
- private final Map<Integer,AMQSession> _sessions = new LinkedHashMap<Integer,AMQSession>();
-
+ private final Map<Integer, AMQSession> _sessions = new LinkedHashMap<Integer, AMQSession>();
private String _clientName;
@@ -486,72 +487,72 @@
final int prefetchHigh, final int prefetchLow) throws JMSException
{
checkNotClosed();
+
if (channelLimitReached())
{
throw new ChannelLimitReachedException(_maximumChannelCount);
}
- else
- {
- return (org.apache.qpid.jms.Session) new FailoverSupport()
+
+ return new FailoverRetrySupport<org.apache.qpid.jms.Session, JMSException>(
+ new FailoverProtectedOperation<org.apache.qpid.jms.Session, JMSException>()
+ {
+ public org.apache.qpid.jms.Session execute() throws JMSException, FailoverException
{
- public Object operation() throws JMSException
+ int channelId = _idFactory.incrementAndGet();
+
+ if (_logger.isDebugEnabled())
{
- int channelId = _idFactory.incrementAndGet();
+ _logger.debug("Write channel open frame for channel id " + channelId);
+ }
+
+ // We must create the session and register it before actually sending the frame to the server to
+ // open it, so that there is no window where we could receive data on the channel and not be set
+ // up to handle it appropriately.
+ AMQSession session =
+ new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
+ prefetchLow);
+ // _protocolHandler.addSessionByChannel(channelId, session);
+ registerSession(channelId, session);
- if (_logger.isDebugEnabled())
+ boolean success = false;
+ try
+ {
+ createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
+ success = true;
+ }
+ catch (AMQException e)
+ {
+ JMSException jmse = new JMSException("Error creating session: " + e);
+ jmse.setLinkedException(e);
+ throw jmse;
+ }
+ finally
+ {
+ if (!success)
{
- _logger.debug("Write channel open frame for channel id " + channelId);
+ deregisterSession(channelId);
}
+ }
- // We must create the session and register it before actually sending the frame to the server to
- // open it, so that there is no window where we could receive data on the channel and not be set
- // up to handle it appropriately.
- AMQSession session =
- new AMQSession(AMQConnection.this, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow);
- //_protocolHandler.addSessionByChannel(channelId, session);
- registerSession(channelId, session);
-
- boolean success = false;
+ if (_started)
+ {
try
{
- createChannelOverWire(channelId, prefetchHigh, prefetchLow, transacted);
- success = true;
+ session.start();
}
catch (AMQException e)
{
- JMSException jmse = new JMSException("Error creating session: " + e);
- jmse.setLinkedException(e);
- throw jmse;
- }
- finally
- {
- if (!success)
- {
- deregisterSession(channelId);
- }
+ throw new JMSAMQException(e);
}
-
- if (_started)
- {
- try
- {
- session.start();
- }
- catch (AMQException e)
- {
- throw new JMSAMQException(e);
- }
- }
-
- return session;
}
- }.execute(this);
- }
+
+ return session;
+ }
+ }, this).execute();
}
private void createChannelOverWire(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
- throws AMQException
+ throws AMQException, FailoverException
{
// TODO: Be aware of possible changes to parameter order as versions change.
@@ -581,7 +582,8 @@
}
}
- private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted) throws AMQException
+ private void reopenChannel(int channelId, int prefetchHigh, int prefetchLow, boolean transacted)
+ throws AMQException, FailoverException
{
try
{
@@ -1128,14 +1130,14 @@
* For all sessions, and for all consumers in those sessions, resubscribe. This is called during failover handling.
* The caller must hold the failover mutex before calling this method.
*/
- public void resubscribeSessions() throws JMSException, AMQException
+ public void resubscribeSessions() throws JMSException, AMQException, FailoverException
{
ArrayList sessions = new ArrayList(_sessions.values());
_logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession s = (AMQSession) it.next();
- //_protocolHandler.addSessionByChannel(s.getChannelId(), s);
+ // _protocolHandler.addSessionByChannel(s.getChannelId(), s);
reopenChannel(s.getChannelId(), s.getDefaultPrefetchHigh(), s.getDefaultPrefetchLow(), s.getTransacted());
s.resubscribe();
}
@@ -1222,7 +1224,6 @@
{
_taskPool.execute(task);
}
-
public AMQSession getSession(int channelId)
{