You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@qpid.apache.org by rg...@apache.org on 2015/06/05 16:46:03 UTC
svn commit: r1683773 -
/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Author: rgodfrey
Date: Fri Jun 5 14:46:02 2015
New Revision: 1683773
URL: http://svn.apache.org/r1683773
Log:
QPID-6573 : Add broker connection close guard to protect against a client that does not respond to the connection close command in a timely manner
Modified:
qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
Modified: qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java
URL: http://svn.apache.org/viewvc/qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java?rev=1683773&r1=1683772&r2=1683773&view=diff
==============================================================================
--- qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java (original)
+++ qpid/java/trunk/systests/src/test/java/org/apache/qpid/transport/MaxFrameSizeTest.java Fri Jun 5 14:46:02 2015
@@ -43,17 +43,7 @@ import javax.security.sasl.SaslException
import org.apache.qpid.codec.AMQDecoder;
import org.apache.qpid.codec.ClientDecoder;
-import org.apache.qpid.framing.AMQDataBlock;
-import org.apache.qpid.framing.AMQFrame;
-import org.apache.qpid.framing.AMQFrameDecodingException;
-import org.apache.qpid.framing.AMQProtocolVersionException;
-import org.apache.qpid.framing.AMQShortString;
-import org.apache.qpid.framing.ConnectionCloseBody;
-import org.apache.qpid.framing.ConnectionStartOkBody;
-import org.apache.qpid.framing.ConnectionTuneOkBody;
-import org.apache.qpid.framing.FieldTable;
-import org.apache.qpid.framing.FrameCreatingMethodProcessor;
-import org.apache.qpid.framing.ProtocolVersion;
+import org.apache.qpid.framing.*;
import org.apache.qpid.jms.BrokerDetails;
import org.apache.qpid.server.model.AuthenticationProvider;
import org.apache.qpid.server.model.Broker;
@@ -111,18 +101,40 @@ public class MaxFrameSizeTest extends Qp
{
@Override
- public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
+ public boolean evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
- if(!socket.isClosed())
+ if(containsFrame(frames, ConnectionOpenOkBody.class))
{
- AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
- assertTrue("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE, lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+ fail("Connection should not be possible with a frame size < " + Constant.MIN_MAX_FRAME_SIZE);
+ return false;
+ }
+ else if(containsFrame(frames, ConnectionCloseBody.class))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
}
}
});
}
}
+ private boolean containsFrame(final List<AMQDataBlock> frames,
+ final Class<? extends AMQMethodBodyImpl> frameClass)
+ {
+ for(AMQDataBlock block : frames)
+ {
+ AMQFrame frame = (AMQFrame) block;
+ if(frameClass.isInstance(frame.getBodyFrame()))
+ {
+ return true;
+ }
+ }
+ return false;
+ }
+
public void testTooLargeFrameSize() throws Exception
{
@@ -160,12 +172,20 @@ public class MaxFrameSizeTest extends Qp
{
@Override
- public void evaluate(final Socket socket, final List<AMQDataBlock> frames)
+ public boolean evaluate(final Socket socket, final List<AMQDataBlock> frames)
{
- if(!socket.isClosed())
+ if(containsFrame(frames, ConnectionOpenOkBody.class))
{
- AMQFrame lastFrame = (AMQFrame) frames.get(frames.size() - 1);
- assertTrue("Connection should not be possible with a frame size larger than the broker requested", lastFrame.getBodyFrame() instanceof ConnectionCloseBody);
+ fail("Connection should not be possible with a frame size larger than the broker requested");
+ return false;
+ }
+ else if(containsFrame(frames, ConnectionCloseBody.class))
+ {
+ return false;
+ }
+ else
+ {
+ return true;
}
}
});
@@ -174,7 +194,7 @@ public class MaxFrameSizeTest extends Qp
private static interface ResultEvaluator
{
- void evaluate(Socket socket, List<AMQDataBlock> frames);
+ boolean evaluate(Socket socket, List<AMQDataBlock> frames);
}
private void doAMQP08test(int frameSize, ResultEvaluator evaluator)
@@ -187,17 +207,22 @@ public class MaxFrameSizeTest extends Qp
OutputStream os = socket.getOutputStream();
byte[] protocolHeader;
+ ConnectionCloseOkBody closeOk;
+
Protocol protocol = getBrokerProtocol();
switch(protocol)
{
case AMQP_0_8:
protocolHeader = (ProtocolEngineCreator_0_8.getInstance().getHeaderIdentifier());
+ closeOk = ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_8;
break;
case AMQP_0_9:
protocolHeader = (ProtocolEngineCreator_0_9.getInstance().getHeaderIdentifier());
+ closeOk = ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9;
break;
case AMQP_0_9_1:
protocolHeader = (ProtocolEngineCreator_0_9_1.getInstance().getHeaderIdentifier());
+ closeOk = ConnectionCloseOkBody.CONNECTION_CLOSE_OK_0_9;
break;
default:
throw new RuntimeException("Unexpected Protocol Version: " + protocol);
@@ -225,21 +250,51 @@ public class MaxFrameSizeTest extends Qp
ConnectionTuneOkBody tuneOk = new ConnectionTuneOkBody(256, frameSize, 0);
new AMQFrame(0, tuneOk).writePayload(dos);
dos.flush();
- socket.setSoTimeout(5000);
- ByteArrayOutputStream baos = new ByteArrayOutputStream();
+ ConnectionOpenBody open = new ConnectionOpenBody(AMQShortString.valueOf(""),AMQShortString.EMPTY_STRING, false);
+
+ try
+ {
+ new AMQFrame(0, open).writePayload(dos);
+ dos.flush();
+
+ socket.setSoTimeout(5000);
+ }
+ catch (IOException e)
+ {
+ // ignore - the broker may have closed the socket already
+ }
+
+ final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
+ AMQDecoder decoder = new ClientDecoder(methodProcessor);
+
+
byte[] buffer = new byte[1024];
+
+
int size;
while((size = is.read(buffer)) > 0)
{
- baos.write(buffer,0,size);
+ decoder.decodeBuffer(ByteBuffer.wrap(buffer, 0, size));
+ if(!evaluator.evaluate(socket,methodProcessor.getProcessedMethods()))
+ {
+ break;
+ }
+ }
+
+
+
+ try
+ {
+ new AMQFrame(0, closeOk).writePayload(dos);
+ dos.flush();
+
+ }
+ catch (IOException e)
+ {
+ // ignore - the broker may have closed the socket already
}
- byte[] serverData = baos.toByteArray();
- final FrameCreatingMethodProcessor methodProcessor = new FrameCreatingMethodProcessor(ProtocolVersion.v0_91);
- AMQDecoder decoder = new ClientDecoder(methodProcessor);
- decoder.decodeBuffer(ByteBuffer.wrap(serverData));
- evaluator.evaluate(socket, methodProcessor.getProcessedMethods());
}
private static class TestClientDelegate extends ClientDelegate
---------------------------------------------------------------------
To unsubscribe, e-mail: commits-unsubscribe@qpid.apache.org
For additional commands, e-mail: commits-help@qpid.apache.org