You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by kw...@apache.org on 2016/05/06 13:47:10 UTC

svn commit: r1742544 - in /qpid/java/trunk/client/src/main/java/org/apache/qpid/client: AMQConnectionDelegate_0_10.java AMQConnectionDelegate_8_0.java

Author: kwall
Date: Fri May  6 13:47:10 2016
New Revision: 1742544

URL: http://svn.apache.org/viewvc?rev=1742544&view=rev
Log:
QPID-7253: [Java Client] [0-10] Ensure session creation awaits failover completion

Modified:
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
    qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java?rev=1742544&r1=1742543&r2=1742544&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_0_10.java Fri May  6 13:47:10 2016
@@ -40,6 +40,7 @@ import org.apache.qpid.QpidException;
 import org.apache.qpid.AMQException;
 import org.apache.qpid.client.failover.FailoverException;
 import org.apache.qpid.client.failover.FailoverProtectedOperation;
+import org.apache.qpid.client.failover.FailoverRetrySupport;
 import org.apache.qpid.client.transport.ClientConnectionDelegate;
 import org.apache.qpid.client.util.JMSExceptionHelper;
 import org.apache.qpid.common.ServerPropertyNames;
@@ -100,7 +101,7 @@ public class AMQConnectionDelegate_0_10
         return createSession(transacted,acknowledgeMode,prefetchHigh,prefetchLow,null);
     }
 
-    public Session createSession(boolean transacted, int acknowledgeMode, int prefetchHigh, int prefetchLow, String name)
+    private Session createSession(final boolean transacted, final int acknowledgeMode, final int prefetchHigh, final int prefetchLow, final String name)
             throws JMSException
     {
         _conn.checkNotClosed();
@@ -110,24 +111,36 @@ public class AMQConnectionDelegate_0_10
             throw new ChannelLimitReachedException(_conn.getMaximumChannelCount());
         }
 
-        int channelId = _conn.getNextChannelID();
-        AMQSession session;
-        try
+        return new FailoverRetrySupport<>(new FailoverProtectedOperation<Session, JMSException>()
         {
-            session = new AMQSession_0_10(_qpidConnection, _conn, channelId, transacted, acknowledgeMode, prefetchHigh,
-                    prefetchLow,name);
-            _conn.registerSession(channelId, session);
-            if (_conn.started())
+            @Override
+            public Session execute() throws JMSException, FailoverException
             {
-                session.start();
+                int channelId = _conn.getNextChannelID();
+                try
+                {
+                    AMQSession session = new AMQSession_0_10(_qpidConnection,
+                                                             _conn,
+                                                             channelId,
+                                                             transacted,
+                                                             acknowledgeMode,
+                                                             prefetchHigh,
+                                                             prefetchLow,
+                                                             name);
+                    _conn.registerSession(channelId, session);
+                    if (_conn.started())
+                    {
+                        session.start();
+                    }
+                    return session;
+                }
+                catch (Exception e)
+                {
+                    _logger.error("exception creating session:", e);
+                    throw JMSExceptionHelper.chainJMSException(new JMSException("cannot create session"), e);
+                }
             }
-        }
-        catch (Exception e)
-        {
-            _logger.error("exception creating session:", e);
-            throw JMSExceptionHelper.chainJMSException(new JMSException("cannot create session"), e);
-        }
-        return session;
+        }, _conn).execute();
     }
 
     /**
@@ -271,10 +284,10 @@ public class AMQConnectionDelegate_0_10
 
     public void resubscribeSessions() throws JMSException, QpidException, FailoverException
     {
-        _logger.info("Resuming connection");
+        _logger.debug("Resuming connection");
         getQpidConnection().resume();
-        List<AMQSession> sessions = new ArrayList<AMQSession>(_conn.getSessions().values());
-        _logger.info(String.format("Resubscribing sessions = %s sessions.size=%d", sessions, sessions.size()));
+        List<AMQSession> sessions = _conn.getSessions().values();
+        _logger.debug("Resubscribing sessions = {} sessions.size = {}", sessions, sessions.size());
         for (AMQSession s : sessions)
         {
             s.resubscribe();

Modified: qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java?rev=1742544&r1=1742543&r2=1742544&view=diff
==============================================================================
--- qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java (original)
+++ qpid/java/trunk/client/src/main/java/org/apache/qpid/client/AMQConnectionDelegate_8_0.java Fri May  6 13:47:10 2016
@@ -23,7 +23,6 @@ package org.apache.qpid.client;
 import java.net.ConnectException;
 import java.nio.ByteBuffer;
 import java.nio.channels.UnresolvedAddressException;
-import java.text.MessageFormat;
 import java.util.*;
 import java.util.concurrent.CountDownLatch;
 import java.util.concurrent.TimeUnit;
@@ -365,8 +364,8 @@ public class AMQConnectionDelegate_8_0 i
      */
     public void resubscribeSessions() throws JMSException, QpidException, FailoverException
     {
-        ArrayList sessions = new ArrayList(_conn.getSessions().values());
-        _logger.info(MessageFormat.format("Resubscribing sessions = {0} sessions.size={1}", sessions, sessions.size())); // FIXME: removeKey?
+        List<AMQSession> sessions = _conn.getSessions().values();
+        _logger.debug("Resubscribing sessions = {} sessions.size = {}", sessions, sessions.size());
         for (Iterator it = sessions.iterator(); it.hasNext();)
         {
             AMQSession_0_8 s = (AMQSession_0_8) it.next();



---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org