You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/05/06 13:47:10 UTC
svn commit: r1742544 - in
/qpid/java/trunk/client/src/main/java/org/apache/qpid/client:
AMQConnectionDelegate_0_10.java AMQConnectionDelegate_8_0.java
Author: kwall
Date: Fri May 6 13:47:10 2016
New Revision: 1742544
URL: http://svn.apache.org/viewvc?rev=1742544&view=rev
Log:
QPID-7253: [Java Client] [0-10] Ensure session creation awaits failover completion
Modified:
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1742544&r1=1742543&r2=1742544&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri May 6 13:47:10 2016
@@ -40,6 +40,7 @@ import org.apache.qpid.QpidException;
import org.apache.qpid.AMQException;
import org.apache.qpid.client.failover.FailoverException;
import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
import org.apache.qpid.client.transport.ClientConnectionDelegate;
import org.apache.qpid.client.util.JMSExceptionHelper;
import org.apache.qpid.common.ServerPropertyNames;
@@ -100,7 +101,7 @@ public class AMQConnectionDelegate_0_10
return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
}
- public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow, String name)
+ private Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow, final String name)
throws JMSException
{
_conn.checkNotClosed();
@@ -110,24 +111,36 @@ public class AMQConnectionDelegate_0_10
throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
}
- int channelId = _conn.getNextChannelID();
- AMQSession session;
- try
+ return new FailoverRetrySupport<>(new FailoverProtectedOperation<Session, JMSException>()
{
- session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
- prefetchLow,name);
- _conn.registerSession(channelId, session);
- if (_conn.started())
+ @Override
+ public Session execute() throws JMSException, FailoverException
{
- session.start();
+ int channelId = _conn.getNextChannelID();
+ try
+ {
+ AMQSession session = new AMQSession_0_10(_qpidConnection,
+ _conn,
+ channelId,
+ transacted,
+ acknowledgeMode,
+ prefetchHigh,
+ prefetchLow,
+ name);
+ _conn.registerSession(channelId, session);
+ if (_conn.started())
+ {
+ session.start();
+ }
+ return session;
+ }
+ catch (Exception e)
+ {
+ _logger.error("exception creating session:", e);
+ throw JMSExceptionHelper.chainJMSException(new JMSException("cannot create session"), e);
+ }
}
- }
- catch (Exception e)
- {
- _logger.error("exception creating session:", e);
- throw JMSExceptionHelper.chainJMSException(new JMSException("cannot create session"), e);
- }
- return session;
+ }, _conn).execute();
}
/**
@@ -271,10 +284,10 @@ public class AMQConnectionDelegate_0_10
public void resubscribeSessions() throws JMSException, QpidException, FailoverException
{
- _logger.info("Resuming connection");
+ _logger.debug("Resuming connection");
getQpidConnection().resume();
- List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
- _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
+ List<AMQSession> sessions = _conn.getSessions().values();
+ _logger.debug("Resubscribing sessions = {} sessions.size = {}", sessions, sessions.size());
for (AMQSession s : sessions)
{
s.resubscribe();
Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1742544&r1=1742543&r2=1742544&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri May 6 13:47:10 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
import java.net.ConnectException;
import java.nio.ByteBuffer;
import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
import java.util.*;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
@@ -365,8 +364,8 @@ public class AMQConnectionDelegate_8_0 i
*/
public void resubscribeSessions() throws JMSException, QpidException, FailoverException
{
- ArrayList sessions = new ArrayList(_conn.getSessions().values());
- _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+ List<AMQSession> sessions = _conn.getSessions().values();
+ _logger.debug("Resubscribing sessions = {} sessions.size = {}", sessions, sessions.size());
for (Iterator it = sessions.iterator(); it.hasNext();)
{
AMQSession_0_8 s = (AMQSession_0_8) it.next();
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org