You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "Keith Wall (JIRA)" <ji...@apache.org> on 2015/10/16 16:51:05 UTC

[jira] [Resolved] (QPID-6735) [Java Broker] Out of memory when consuming messages in very large transaction

     [ https://issues.apache.org/jira/browse/QPID-6735?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Keith Wall resolved QPID-6735.
------------------------------
       Resolution: Fixed
    Fix Version/s: qpid-java-6.0

> [Java Broker] Out of memory when consuming messages in very large transaction
> -----------------------------------------------------------------------------
>
>                 Key: QPID-6735
>                 URL: https://issues.apache.org/jira/browse/QPID-6735
>             Project: Qpid
>          Issue Type: Bug
>          Components: Java Broker
>            Reporter: Rob Godfrey
>            Assignee: Keith Wall
>             Fix For: qpid-java-6.0
>
>
> The flow to disk mechanism for the Java Broker is not sufficient for dealing with the case where large (or large numbers of) flowed to disk messages are being sent from the broker to the client in a transaction.  When sending the message it is retrieved from disk and memory allocated for this... however unless the housekeeping thread evicts it again it stays in memory until the transaction is committed.
> The following simple test program 
> {code:java}
> public class Test
> {
>     public static void main(String[] args) throws Exception
>     {
>         System.setProperty("qpid.amqp.version","0-9-1");
>         AMQConnection conn = new AMQConnection("127.0.0.1","admin","admin", "foo","test");
>         conn.start();
>         Session session = conn.createSession(false, Session.AUTO_ACKNOWLEDGE);
>         Queue queue = session.createQueue("queue");
>         final MessageProducer prod = session.createProducer(queue);
>         for(int i = 0; i < 20000; i++)
>         {
>             final BytesMessage bytesMessage = session.createBytesMessage();
>             bytesMessage.writeBytes(new byte[1024*1024]);
>             prod.send(bytesMessage);
>         }
>         Session session2 = conn.createSession(true, Session.SESSION_TRANSACTED);
>         MessageConsumer cons = session2.createConsumer(queue);
>         while(cons.receive(1000) != null);
>         session2.commit();
>         conn.close();
>     }
> }
> {code}
> with the following exception
> {noformat}
> ########################################################################
> #
> # Unhandled Exception java.lang.OutOfMemoryError: Direct buffer memory in Thread virtualhost-test-iopool-3
> #
> # Exiting
> #
> ########################################################################
> java.lang.OutOfMemoryError: Direct buffer memory
> 	at java.nio.Bits.reserveMemory(Bits.java:658)
> 	at java.nio.DirectByteBuffer.<init>(DirectByteBuffer.java:123)
> 	at java.nio.ByteBuffer.allocateDirect(ByteBuffer.java:306)
> 	at org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirect(QpidByteBuffer.java:464)
> 	at org.apache.qpid.bytebuffer.QpidByteBuffer.allocateDirectCollection(QpidByteBuffer.java:514)
> 	at org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore.getAllContent(AbstractBDBMessageStore.java:413)
> 	at org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContentAsByteBuffer(AbstractBDBMessageStore.java:1100)
> 	at org.apache.qpid.server.store.berkeleydb.AbstractBDBMessageStore$StoredBDBMessage.getContent(AbstractBDBMessageStore.java:1126)
> 	at org.apache.qpid.server.message.AbstractServerMessageImpl.getContent(AbstractServerMessageImpl.java:172)
> 	at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$MessageContentSourceBody.writePayload(ProtocolOutputConverterImpl.java:292)
> 	at org.apache.qpid.framing.AMQFrame.writePayload(AMQFrame.java:81)
> 	at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl$CompositeAMQBodyBlock.writePayload(ProtocolOutputConverterImpl.java:513)
> 	at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.writeFrame(AMQPConnection_0_8.java:434)
> 	at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeFrame(ProtocolOutputConverterImpl.java:465)
> 	at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDeliveryUnchanged(ProtocolOutputConverterImpl.java:224)
> 	at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:148)
> 	at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeMessageDelivery(ProtocolOutputConverterImpl.java:101)
> 	at org.apache.qpid.server.protocol.v0_8.ProtocolOutputConverterImpl.writeDeliver(ProtocolOutputConverterImpl.java:78)
> 	at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8$WriteDeliverMethod.deliverToClient(AMQPConnection_0_8.java:1421)
> 	at org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8.sendToClient(ConsumerTarget_0_8.java:489)
> 	at org.apache.qpid.server.protocol.v0_8.ConsumerTarget_0_8$AckConsumer.doSend(ConsumerTarget_0_8.java:293)
> 	at org.apache.qpid.server.consumer.AbstractConsumerTarget.sendNextMessage(AbstractConsumerTarget.java:211)
> 	at org.apache.qpid.server.consumer.AbstractConsumerTarget.processPending(AbstractConsumerTarget.java:63)
> 	at org.apache.qpid.server.protocol.v0_8.AMQChannel.processPending(AMQChannel.java:3757)
> 	at org.apache.qpid.server.protocol.v0_8.AMQPConnection_0_8.processPending(AMQPConnection_0_8.java:1549)
> 	at org.apache.qpid.server.transport.MultiVersionProtocolEngine.processPending(MultiVersionProtocolEngine.java:197)
> 	at org.apache.qpid.server.transport.NonBlockingConnection.doWork(NonBlockingConnection.java:220)
> 	at org.apache.qpid.server.transport.NetworkConnectionScheduler.processConnection(NetworkConnectionScheduler.java:121)
> 	at org.apache.qpid.server.transport.NetworkConnectionScheduler.access$000(NetworkConnectionScheduler.java:37)
> 	at org.apache.qpid.server.transport.NetworkConnectionScheduler$2.run(NetworkConnectionScheduler.java:102)
> 	at java.util.concurrent.ThreadPoolExecutor.runWorker(ThreadPoolExecutor.java:1145)
> 	at java.util.concurrent.ThreadPoolExecutor$Worker.run(ThreadPoolExecutor.java:615)
> 	at java.lang.Thread.run(Thread.java:724)
> {noformat}
> with the broker VM memory settings as follows 
> {noformat}
> -Dvirtualhost.connectionThreadPool.maximum=20 -Dvirtualhost.connectionThreadPool.minimum=16 -verbose:gc -Xmx2048m -Xms1024m -XX:MaxDirectMemorySize=3072m
> {noformat}



--
This message was sent by Atlassian JIRA
(v6.3.4#6332)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org