You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/05/10 16:29:10 UTC

[2/2] qpid-jms-amqp-0-x git commit: QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that client closes TCP connection on failure with sending connection.close

QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that client closes TCP connection on failure with sending connection.close


Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/98382759
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/98382759
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/98382759

Branch: refs/heads/master
Commit: 983827591c27fd7d5d7289bbd9373a71728ba191
Parents: f89f6c2
Author: Alex Rudyy <or...@apache.org>
Authored: Tue May 8 16:34:27 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu May 10 17:11:06 2018 +0100

----------------------------------------------------------------------
 .../org/apache/qpid/client/AMQConnection.java   |  4 ++--
 .../apache/qpid/client/AMQProtocolHandler.java  | 20 +++++++++++++++-----
 .../org/apache/qpid/client/AMQSession_0_8.java  |  2 +-
 .../qpid/client/BasicMessageProducer_0_8.java   | 12 +++++++-----
 .../protocol/BlockingMethodFrameListener.java   | 10 +++++++++-
 .../apache/qpid/client/state/StateWaiter.java   |  6 ++++++
 .../listener/SpecificMethodFrameListener.java   |  4 ++--
 .../apache/qpid/client/util/BlockingWaiter.java |  6 ++++--
 .../client/protocol/AMQProtocolHandlerTest.java |  2 +-
 9 files changed, 47 insertions(+), 19 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQConnection.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index c629414..4426fb6 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1203,7 +1203,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere
                 }
                 catch (JMSException e)
                 {
-                    _logger.error("Error closing connection", e);
+                    _logger.warn("Error closing connection", e);
                     throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e);
                 }
                 finally
@@ -1271,7 +1271,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere
                 }
                 catch (JMSException e)
                 {
-                    _logger.error("Error closing session: " + e);
+                    _logger.warn("Error closing session: " + e);
                     sessionException = e;
                 }
             }

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
index 5d59e50..6b50f90 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
@@ -672,10 +672,18 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
     /** More convenient method to write a frame and wait for it's response. */
     public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws QpidException, FailoverException
     {
-        return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
+        return writeCommandFrameAndWaitForReply(frame,
+                                                new SpecificMethodFrameListener(frame.getChannel(),
+                                                                                responseClass,
+                                                                                getConnectionDetails()),
                                                 timeout);
     }
 
+    public String getConnectionDetails()
+    {
+        return getLocalAddress() + "-" + getRemoteAddress();
+    }
+
     public void closeSession(AMQSession session) throws QpidException
     {
         _protocolSession.closeSession(session);
@@ -707,17 +715,19 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
                 final AMQFrame frame = body.generateFrame(0);
 
                 syncWrite(frame, ConnectionCloseOkBody.class, timeout);
-                _network.close();
-                closed();
-            }
+             }
             catch (AMQTimeoutException e)
             {
-                closed();
+                _logger.debug("Timeout on sending connection close : " + e);
             }
             catch (FailoverException e)
             {
                 _logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
             }
+            finally
+            {
+                _network.close();
+            }
         }
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 1acb3a1..6c7738a 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -929,7 +929,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
 
         public QueueDeclareOkHandler()
         {
-            super(getChannelId(), QueueDeclareOkBody.class);
+            super(getChannelId(), QueueDeclareOkBody.class, getProtocolHandler().getConnectionDetails());
         }
 
         public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 41166e0..8412849 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -362,17 +362,19 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
                               && (connectionDelegate80.isConfirmedPublishSupported()
                                || (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));
 
+        AMQProtocolHandler protocolHandler = getConnection().getProtocolHandler();
         if(!useConfirms)
         {
-            getConnection().getProtocolHandler().writeFrame(compositeFrame);
+            protocolHandler.writeFrame(compositeFrame);
         }
         else
         {
-            final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId());
+            final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId(),
+                                                                                                  protocolHandler.getConnectionDetails());
             try
             {
 
-                getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame,
+                protocolHandler.writeCommandFrameAndWaitForReply(compositeFrame,
                                                                                       frameListener);
 
                 if(frameListener.isRejected())
@@ -468,9 +470,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
          *
          * @param channelId The channel id to filter incoming methods with.
          */
-        public PublishConfirmMessageListener(final int channelId)
+        public PublishConfirmMessageListener(final int channelId, String connectionDetails)
         {
-            super(channelId);
+            super(channelId, connectionDetails);
         }
 
         @Override

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 6618b34..5c56ad7 100644
--- a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -55,6 +55,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
 public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
 {
 
+    private final String _connectionDetails;
     /** Holds the channel id for the channel upon which this listener is waiting for a response. */
     private int _channelId;
 
@@ -62,10 +63,12 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
      * Creates a new method listener, that filters incoming method to just those that match the specified channel id.
      *
      * @param channelId The channel id to filter incoming methods with.
+     * @param connectionDetails
      */
-    public BlockingMethodFrameListener(int channelId)
+    public BlockingMethodFrameListener(int channelId, final String connectionDetails)
     {
         _channelId = channelId;
+        _connectionDetails = connectionDetails;
     }
 
     /**
@@ -121,4 +124,9 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
         }
     }
 
+    @Override
+    public String getConnectionDetails()
+    {
+        return _connectionDetails;
+    }
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 5f0a935..ce7c03a 100644
--- a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -82,6 +82,12 @@ public class StateWaiter extends BlockingWaiter<AMQState>
         return _awaitStates.contains(state);
     }
 
+    @Override
+    public String getConnectionDetails()
+    {
+        return null;
+    }
+
     /**
      * Await for the required State to be achieved within the default timeout.
      * @return The achieved state that was requested.

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index f0d7feb..9a3f733 100644
--- a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -28,9 +28,9 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener
 {
     private final Class _expectedClass;
 
-    public SpecificMethodFrameListener(int channelId, Class expectedClass)
+    public SpecificMethodFrameListener(int channelId, Class expectedClass, final String connectionDetails)
     {
-        super(channelId);
+        super(channelId, connectionDetails);
         _expectedClass = expectedClass;
     }
 

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 66be535..23adf3c 100644
--- a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -170,8 +170,8 @@ public abstract class BlockingWaiter<T>
                             final String errorMsg = String.format(
                                     "The server's response was not received within the time-out period of %d ms. "
                                     + "Possible reasons include: the server may be too busy, the network may be "
-                                    + "overloaded, or this JVM itself may be too busy to process the response.",
-                                    timeout);
+                                    + "overloaded, or this JVM itself may be too busy to process the response. [%s]",
+                                    timeout, getConnectionDetails() == null ? "" : getConnectionDetails());
                             _error = new AMQTimeoutException(errorMsg, null);
                             _ready = true;
                         }
@@ -338,4 +338,6 @@ public abstract class BlockingWaiter<T>
         return new QpidException("Waiter was closed.", null);
     }
 
+    public abstract String getConnectionDetails();
+
 }

http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index 74a1809..69d180e 100644
--- a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -286,7 +286,7 @@ public class AMQProtocolHandlerTest extends QpidTestCase
          */
         public BlockToAccessFrameListener(int channelId)
         {
-            super(channelId);
+            super(channelId, "Test");
             _logger.info("Creating a listener:" + this);
         }
 


---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org