You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@qpid.apache.org by "ASF GitHub Bot (Jira)" <ji...@apache.org> on 2022/06/07 07:37:00 UTC

[jira] [Commented] (QPID-8590) [Broker-J] Purge on flow to disk queue

    [ https://issues.apache.org/jira/browse/QPID-8590?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=17550850#comment-17550850 ] 

ASF GitHub Bot commented on QPID-8590:
--------------------------------------

dakirily opened a new pull request, #128:
URL: https://github.com/apache/qpid-broker-j/pull/128

   This PR addresses JIRA QPID-8590 to improve flow-to-disk mechanism: messages should be flowed to disk when their amount exceeds overflow limit or restored in memory when their amount is less than overflow limit. Although performance of purging messages from queue ( cleanQueue() operation ) is increased.




> [Broker-J] Purge on flow to disk queue
> --------------------------------------
>
>                 Key: QPID-8590
>                 URL: https://issues.apache.org/jira/browse/QPID-8590
>             Project: Qpid
>          Issue Type: Improvement
>          Components: Broker-J
>    Affects Versions: qpid-java-broker-8.0.6
>            Reporter: Daniil Kirilyuk
>            Priority: Minor
>
> Purging of flow to disk queues is very slow (~10messages/s).
> Steps to replicate:
>  * send 100000 messages to flow to disk queue with overflow count set to 10
>  * initiate queue purge by REST API
> Note: When broker is stop is initiated, queue is purged much faster (approximatelly ~60000messages/s).
> Threaddumps show following stacktraces:
> {code:java}
> &quot;qtp1722120611-7283&quot; #7283 prio=5 os_prio=0 cpu=47303542.52ms elapsed=76831.32s tid=0x00007fcf64097800 nid=0x3d5b runnable  [0x00007fcf3c7be000]
>    java.lang.Thread.State: RUNNABLE
> 	at org.apache.qpid.server.message.AbstractServerMessageImpl.decrementReference(AbstractServerMessageImpl.java:136)
> 	at org.apache.qpid.server.message.AbstractServerMessageImpl.access$500(AbstractServerMessageImpl.java:40)
> 	at org.apache.qpid.server.message.AbstractServerMessageImpl$Reference.release(AbstractServerMessageImpl.java:356)
> 	- locked &lt;0x0000000600516aa0&gt; (a org.apache.qpid.server.message.AbstractServerMessageImpl$Reference)
> 	at org.apache.qpid.server.message.AbstractServerMessageImpl$Reference.close(AbstractServerMessageImpl.java:363)
> 	at org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.flowToDisk(FlowToDiskOverflowPolicyHandler.java:135)
> 	at org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.flowTailToDiskIfNecessary(FlowToDiskOverflowPolicyHandler.java:105)
> 	at org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.checkOverflow(FlowToDiskOverflowPolicyHandler.java:68)
> 	at org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler$Handler.access$100(FlowToDiskOverflowPolicyHandler.java:44)
> 	at org.apache.qpid.server.queue.FlowToDiskOverflowPolicyHandler.checkOverflow(FlowToDiskOverflowPolicyHandler.java:40)
> 	at org.apache.qpid.server.queue.AbstractQueue.checkCapacity(AbstractQueue.java:2117)
> 	at org.apache.qpid.server.queue.AbstractQueueEntryList.updateStatsOnStateChange(AbstractQueueEntryList.java:99)
> 	at org.apache.qpid.server.queue.OrderedQueueEntryList.updateStatsOnStateChange(OrderedQueueEntryList.java:33)
> 	at org.apache.qpid.server.queue.QueueEntryImpl.notifyStateChange(QueueEntryImpl.java:559)
> 	at org.apache.qpid.server.queue.QueueEntryImpl.dispose(QueueEntryImpl.java:578)
> 	at org.apache.qpid.server.queue.QueueEntryImpl.delete(QueueEntryImpl.java:596)
> 	at org.apache.qpid.server.queue.AbstractQueue$7.postCommit(AbstractQueue.java:1836)
> 	at org.apache.qpid.server.txn.LocalTransaction.doPostTransactionActions(LocalTransaction.java:473)
> 	at org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:402)
> 	at org.apache.qpid.server.txn.LocalTransaction.commit(LocalTransaction.java:374)
> 	at org.apache.qpid.server.queue.AbstractQueue.clearQueue(AbstractQueue.java:1816)
> 	at org.apache.qpid.server.queue.StandardQueueImplWithAccessChecking.clearQueue(StandardQueueImplWithAccessChecking.java:152)
> 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke0(java.base@11.0.10/Native Method)
> 	at jdk.internal.reflect.NativeMethodAccessorImpl.invoke(java.base@11.0.10/NativeMethodAccessorImpl.java:62)
> 	at jdk.internal.reflect.DelegatingMethodAccessorImpl.invoke(java.base@11.0.10/DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(java.base@11.0.10/Method.java:566)
> 	at org.apache.qpid.server.model.ConfiguredObjectMethodOperation.perform(ConfiguredObjectMethodOperation.java:125)
> 	at org.apache.qpid.server.management.plugin.controller.latest.LatestManagementController.invoke(LatestManagementController.java:372)
> 	at org.apache.qpid.server.management.plugin.controller.AbstractManagementController.handlePostOrPut(AbstractManagementController.java:147)
> 	at org.apache.qpid.server.management.plugin.controller.AbstractManagementController.handlePost(AbstractManagementController.java:100)
> 	at org.apache.qpid.server.management.plugin.servlet.rest.RestServlet.doPost(RestServlet.java:136)
> 	at org.apache.qpid.server.management.plugin.servlet.rest.AbstractServlet.doPost(AbstractServlet.java:146)
> 	at javax.servlet.http.HttpServlet.service(HttpServlet.java:707)
> 	at javax.servlet.http.HttpServlet.service(HttpServlet.java:790)
> 	at org.eclipse.jetty.servlet.ServletHolder.handle(ServletHolder.java:791)
> 	at org.eclipse.jetty.servlet.ServletHandler$ChainEnd.doFilter(ServletHandler.java:1626)
> 	at org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter$1.run(AuthenticationCheckFilter.java:161)
> 	at org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter$1.run(AuthenticationCheckFilter.java:157)
> 	at java.security.AccessController.doPrivileged(java.base@11.0.10/Native Method)
> 	at javax.security.auth.Subject.doAs(java.base@11.0.10/Subject.java:423)
> 	at org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter.doFilterChainAs(AuthenticationCheckFilter.java:156)
> 	at org.apache.qpid.server.management.plugin.filter.AuthenticationCheckFilter.doFilter(AuthenticationCheckFilter.java:126)
> 	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> 	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> 	at org.apache.qpid.server.management.plugin.filter.LoggingFilter.doFilter(LoggingFilter.java:63)
> 	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> 	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> 	at org.apache.qpid.server.management.plugin.filter.MethodFilter.doFilter(MethodFilter.java:67)
> 	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> 	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> 	at org.eclipse.jetty.servlets.CrossOriginFilter.handle(CrossOriginFilter.java:319)
> 	at org.eclipse.jetty.servlets.CrossOriginFilter.doFilter(CrossOriginFilter.java:273)
> 	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> 	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> 	at org.apache.qpid.server.management.plugin.filter.ExceptionHandlingFilter.doFilter(ExceptionHandlingFilter.java:59)
> 	at org.eclipse.jetty.servlet.FilterHolder.doFilter(FilterHolder.java:193)
> 	at org.eclipse.jetty.servlet.ServletHandler$Chain.doFilter(ServletHandler.java:1601)
> 	at org.eclipse.jetty.servlet.ServletHandler.doHandle(ServletHandler.java:548)
> 	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
> 	at org.eclipse.jetty.server.session.SessionHandler.doHandle(SessionHandler.java:1624)
> 	at org.eclipse.jetty.server.handler.ScopedHandler.nextHandle(ScopedHandler.java:233)
> 	at org.eclipse.jetty.server.handler.ContextHandler.doHandle(ContextHandler.java:1435)
> 	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:188)
> 	at org.eclipse.jetty.servlet.ServletHandler.doScope(ServletHandler.java:501)
> 	at org.eclipse.jetty.server.session.SessionHandler.doScope(SessionHandler.java:1594)
> 	at org.eclipse.jetty.server.handler.ScopedHandler.nextScope(ScopedHandler.java:186)
> 	at org.eclipse.jetty.server.handler.ContextHandler.doScope(ContextHandler.java:1350)
> 	at org.eclipse.jetty.server.handler.ScopedHandler.handle(ScopedHandler.java:141)
> 	at org.eclipse.jetty.server.handler.HandlerWrapper.handle(HandlerWrapper.java:127)
> 	at org.eclipse.jetty.server.Server.handle(Server.java:516)
> 	at org.eclipse.jetty.server.HttpChannel.lambda$handle$1(HttpChannel.java:388)
> 	at org.eclipse.jetty.server.HttpChannel$$Lambda$220/0x00000008403d1040.dispatch(Unknown Source)
> 	at org.eclipse.jetty.server.HttpChannel.dispatch(HttpChannel.java:633)
> 	at org.eclipse.jetty.server.HttpChannel.handle(HttpChannel.java:380)
> 	at org.eclipse.jetty.server.HttpConnection.onFillable(HttpConnection.java:273)
> 	at org.eclipse.jetty.io.AbstractConnection$ReadCallback.succeeded(AbstractConnection.java:311)
> 	at org.eclipse.jetty.io.FillInterest.fillable(FillInterest.java:105)
> 	at org.eclipse.jetty.io.ChannelEndPoint$1.run(ChannelEndPoint.java:104)
> 	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.runTask(EatWhatYouKill.java:336)
> 	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.doProduce(EatWhatYouKill.java:313)
> 	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.tryProduce(EatWhatYouKill.java:171)
> 	at org.eclipse.jetty.util.thread.strategy.EatWhatYouKill.run(EatWhatYouKill.java:129)
> 	at org.eclipse.jetty.util.thread.ReservedThreadExecutor$ReservedThread.run(ReservedThreadExecutor.java:375)
> 	at org.eclipse.jetty.util.thread.QueuedThreadPool.runJob(QueuedThreadPool.java:773)
> 	at org.eclipse.jetty.util.thread.QueuedThreadPool$Runner.run(QueuedThreadPool.java:905)
> 	at org.apache.qpid.server.bytebuffer.QpidByteBufferFactory.lambda$createQpidByteBufferTrackingThreadFactory$0(QpidByteBufferFactory.java:464)
> 	at org.apache.qpid.server.bytebuffer.QpidByteBufferFactory$$Lambda$73/0x0000000840157c40.run(Unknown Source)
> 	at java.lang.Thread.run(java.base@11.0.10/Thread.java:834)
> {code}
> It seems that deletion of a queue entry in queue with FLOW_TO_DISK overflow policy triggers FlowToDiskOverflowPolicyHandler to iterate over queue entries leading to significant performance drop.
>  



--
This message was sent by Atlassian Jira
(v8.20.7#820007)

---------------------------------------------------------------------
To unsubscribe, e-mail: dev-unsubscribe@qpid.apache.org
For additional commands, e-mail: dev-help@qpid.apache.org