You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by or...@apache.org on 2018/05/10 16:29:10 UTC
[2/2] qpid-jms-amqp-0-x git commit: QPID-8185: [JMS AMQP 0-x][AMQP
0-8..0-91] Make sure that client closes TCP connection on failure with
sending connection.close
QPID-8185: [JMS AMQP 0-x][AMQP 0-8..0-91] Make sure that client closes TCP connection on failure with sending connection.close
Project: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/repo
Commit: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/commit/98382759
Tree: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/tree/98382759
Diff: http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/diff/98382759
Branch: refs/heads/master
Commit: 983827591c27fd7d5d7289bbd9373a71728ba191
Parents: f89f6c2
Author: Alex Rudyy <or...@apache.org>
Authored: Tue May 8 16:34:27 2018 +0100
Committer: Alex Rudyy <or...@apache.org>
Committed: Thu May 10 17:11:06 2018 +0100
----------------------------------------------------------------------
.../org/apache/qpid/client/AMQConnection.java | 4 ++--
.../apache/qpid/client/AMQProtocolHandler.java | 20 +++++++++++++++-----
.../org/apache/qpid/client/AMQSession_0_8.java | 2 +-
.../qpid/client/BasicMessageProducer_0_8.java | 12 +++++++-----
.../protocol/BlockingMethodFrameListener.java | 10 +++++++++-
.../apache/qpid/client/state/StateWaiter.java | 6 ++++++
.../listener/SpecificMethodFrameListener.java | 4 ++--
.../apache/qpid/client/util/BlockingWaiter.java | 6 ++++--
.../client/protocol/AMQProtocolHandlerTest.java | 2 +-
9 files changed, 47 insertions(+), 19 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQConnection.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQConnection.java b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
index c629414..4426fb6 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQConnection.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQConnection.java
@@ -1203,7 +1203,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere
}
catch (JMSException e)
{
- _logger.error("Error closing connection", e);
+ _logger.warn("Error closing connection", e);
throw JMSExceptionHelper.chainJMSException(new JMSException("Error closing connection: " + e), e);
}
finally
@@ -1271,7 +1271,7 @@ public class AMQConnection extends Closeable implements CommonConnection, Refere
}
catch (JMSException e)
{
- _logger.error("Error closing session: " + e);
+ _logger.warn("Error closing session: " + e);
sessionException = e;
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
index 5d59e50..6b50f90 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQProtocolHandler.java
@@ -672,10 +672,18 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
/** More convenient method to write a frame and wait for it's response. */
public AMQMethodEvent syncWrite(AMQFrame frame, Class responseClass, long timeout) throws QpidException, FailoverException
{
- return writeCommandFrameAndWaitForReply(frame, new SpecificMethodFrameListener(frame.getChannel(), responseClass),
+ return writeCommandFrameAndWaitForReply(frame,
+ new SpecificMethodFrameListener(frame.getChannel(),
+ responseClass,
+ getConnectionDetails()),
timeout);
}
+ public String getConnectionDetails()
+ {
+ return getLocalAddress() + "-" + getRemoteAddress();
+ }
+
public void closeSession(AMQSession session) throws QpidException
{
_protocolSession.closeSession(session);
@@ -707,17 +715,19 @@ public class AMQProtocolHandler implements ExceptionHandlingByteBufferReceiver,
final AMQFrame frame = body.generateFrame(0);
syncWrite(frame, ConnectionCloseOkBody.class, timeout);
- _network.close();
- closed();
- }
+ }
catch (AMQTimeoutException e)
{
- closed();
+ _logger.debug("Timeout on sending connection close : " + e);
}
catch (FailoverException e)
{
_logger.debug("FailoverException interrupted connection close, ignoring as connection closed anyway.");
}
+ finally
+ {
+ _network.close();
+ }
}
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
index 1acb3a1..6c7738a 100644
--- a/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/AMQSession_0_8.java
@@ -929,7 +929,7 @@ public class AMQSession_0_8 extends AMQSession<BasicMessageConsumer_0_8, BasicMe
public QueueDeclareOkHandler()
{
- super(getChannelId(), QueueDeclareOkBody.class);
+ super(getChannelId(), QueueDeclareOkBody.class, getProtocolHandler().getConnectionDetails());
}
public boolean processMethod(int channelId, AMQMethodBody frame) //throws AMQException
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
index 41166e0..8412849 100644
--- a/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
+++ b/client/src/main/java/org/apache/qpid/client/BasicMessageProducer_0_8.java
@@ -362,17 +362,19 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
&& (connectionDelegate80.isConfirmedPublishSupported()
|| (!getSession().isTransacted() && connectionDelegate80.isConfirmedPublishNonTransactionalSupported()));
+ AMQProtocolHandler protocolHandler = getConnection().getProtocolHandler();
if(!useConfirms)
{
- getConnection().getProtocolHandler().writeFrame(compositeFrame);
+ protocolHandler.writeFrame(compositeFrame);
}
else
{
- final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId());
+ final PublishConfirmMessageListener frameListener = new PublishConfirmMessageListener(getChannelId(),
+ protocolHandler.getConnectionDetails());
try
{
- getConnection().getProtocolHandler().writeCommandFrameAndWaitForReply(compositeFrame,
+ protocolHandler.writeCommandFrameAndWaitForReply(compositeFrame,
frameListener);
if(frameListener.isRejected())
@@ -468,9 +470,9 @@ public class BasicMessageProducer_0_8 extends BasicMessageProducer
*
* @param channelId The channel id to filter incoming methods with.
*/
- public PublishConfirmMessageListener(final int channelId)
+ public PublishConfirmMessageListener(final int channelId, String connectionDetails)
{
- super(channelId);
+ super(channelId, connectionDetails);
}
@Override
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
index 6618b34..5c56ad7 100644
--- a/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
+++ b/client/src/main/java/org/apache/qpid/client/protocol/BlockingMethodFrameListener.java
@@ -55,6 +55,7 @@ import org.apache.qpid.protocol.AMQMethodListener;
public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMethodEvent> implements AMQMethodListener
{
+ private final String _connectionDetails;
/** Holds the channel id for the channel upon which this listener is waiting for a response. */
private int _channelId;
@@ -62,10 +63,12 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
* Creates a new method listener, that filters incoming method to just those that match the specified channel id.
*
* @param channelId The channel id to filter incoming methods with.
+ * @param connectionDetails
*/
- public BlockingMethodFrameListener(int channelId)
+ public BlockingMethodFrameListener(int channelId, final String connectionDetails)
{
_channelId = channelId;
+ _connectionDetails = connectionDetails;
}
/**
@@ -121,4 +124,9 @@ public abstract class BlockingMethodFrameListener extends BlockingWaiter<AMQMeth
}
}
+ @Override
+ public String getConnectionDetails()
+ {
+ return _connectionDetails;
+ }
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
index 5f0a935..ce7c03a 100644
--- a/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
+++ b/client/src/main/java/org/apache/qpid/client/state/StateWaiter.java
@@ -82,6 +82,12 @@ public class StateWaiter extends BlockingWaiter<AMQState>
return _awaitStates.contains(state);
}
+ @Override
+ public String getConnectionDetails()
+ {
+ return null;
+ }
+
/**
* Await for the required State to be achieved within the default timeout.
* @return The achieved state that was requested.
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
index f0d7feb..9a3f733 100644
--- a/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
+++ b/client/src/main/java/org/apache/qpid/client/state/listener/SpecificMethodFrameListener.java
@@ -28,9 +28,9 @@ public class SpecificMethodFrameListener extends BlockingMethodFrameListener
{
private final Class _expectedClass;
- public SpecificMethodFrameListener(int channelId, Class expectedClass)
+ public SpecificMethodFrameListener(int channelId, Class expectedClass, final String connectionDetails)
{
- super(channelId);
+ super(channelId, connectionDetails);
_expectedClass = expectedClass;
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
----------------------------------------------------------------------
diff --git a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
index 66be535..23adf3c 100644
--- a/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
+++ b/client/src/main/java/org/apache/qpid/client/util/BlockingWaiter.java
@@ -170,8 +170,8 @@ public abstract class BlockingWaiter<T>
final String errorMsg = String.format(
"The server's response was not received within the time-out period of %d ms. "
+ "Possible reasons include: the server may be too busy, the network may be "
- + "overloaded, or this JVM itself may be too busy to process the response.",
- timeout);
+ + "overloaded, or this JVM itself may be too busy to process the response. [%s]",
+ timeout, getConnectionDetails() == null ? "" : getConnectionDetails());
_error = new AMQTimeoutException(errorMsg, null);
_ready = true;
}
@@ -338,4 +338,6 @@ public abstract class BlockingWaiter<T>
return new QpidException("Waiter was closed.", null);
}
+ public abstract String getConnectionDetails();
+
}
http://git-wip-us.apache.org/repos/asf/qpid-jms-amqp-0-x/blob/98382759/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
----------------------------------------------------------------------
diff --git a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
index 74a1809..69d180e 100644
--- a/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
+++ b/client/src/test/java/org/apache/qpid/client/protocol/AMQProtocolHandlerTest.java
@@ -286,7 +286,7 @@ public class AMQProtocolHandlerTest extends QpidTestCase
*/
public BlockToAccessFrameListener(int channelId)
{
- super(channelId);
+ super(channelId, "Test");
_logger.info("Creating a listener:" + this);
}
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org