You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Ivan Kelly (JIRA)" <ji...@apache.org> on 2013/02/13 16:49:16 UTC
[jira] [Closed] (BOOKKEEPER-503) The test case of
TestThrottlingDelivery#testServerSideThrottle failed sometimes
[ https://issues.apache.org/jira/browse/BOOKKEEPER-503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Ivan Kelly closed BOOKKEEPER-503.
---------------------------------
> The test case of TestThrottlingDelivery#testServerSideThrottle failed sometimes
> -------------------------------------------------------------------------------
>
> Key: BOOKKEEPER-503
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-503
> Project: Bookkeeper
> Issue Type: Bug
> Components: hedwig-server
> Reporter: Jiannan Wang
> Assignee: Jiannan Wang
> Fix For: 4.2.0
>
> Attachments: 0001-BOOKKEEPER-503-Test-case.patch, BOOKKEEPER-503.patch, BOOKKEEPER-503.patch, logs.tar
>
>
> Running follow script in hedwig-server project
> {code:java}
> while mvn test -Dtest=TestThrottlingDelivery; do echo .; done
> {code}
> We may get assertion failure:
> {code:java}
> testServerSideThrottle[0](org.apache.hedwig.server.delivery.TestThrottlingDelivery) Time elapsed: 14.922 sec <<< FAILURE!
> junit.framework.AssertionFailedError: Timed out waiting for messages 31
> at junit.framework.Assert.fail(Assert.java:47)
> at junit.framework.Assert.assertTrue(Assert.java:20)
> at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:159)
> at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:206)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> at java.lang.reflect.Method.invoke(Method.java:601)
> at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
> at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
> at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
> at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
> at org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28)
> {code}
> This is a race issue which may cause messages been throttled by mistake, the root cause is ActiveSubscriberState.messageConsumed() and ActiveSubscriberState.deliverNextMessage() may be executed in different threads by AbstractSubscriptionManager and FIFODeliveryManager.
> Read the log in attachement around Line 2420 if you want to get more information, here I replay the logs onto the code (Line XX denotes code listed below):
> # Messages 1-30 are to be delivered and the message window size in Hub server is 10.
> # Messages 1-10 are delivered to subscriber while message 11-30 is throttled by the window size limitation.
> # Subscriber calls consume 1-10 asynchronously.
> # CONSUME 1 is handled and FIFODeliveryManager continue to deliver message 11.
> # Subscriber receive message 11 and quickly ack CONSUME 11 to Hub.
> # Now there are two threads operate on a same ActiveSubscriberState:
> #* Thread in AbstractSubscriptionManager: call ActiveSubscriberState.messageConsumed() for message 2, 3, 11 (4-10 is still on the way since it's asynchronous consume). Let's assume this thread happen to run in Line (14) for message 11.
> #* Thread in FIFODeliveryManager: Coincidently, it's in Line (36) now (with last delivered 11, last consumed 1 and variable isThrottled is still false).
> # If thread in AbstractSubscriptionManager executed before FIFODeliveryManager, then consume operator for 11 does nothing more.
> # CONSUME [4-10] will be just ignored by the if statement in Line (2) since lastSeqIdConsumedUtil is now 11.
> # Further messages like 12 have no chance to been delivered at this time.
> {code:java}
> (01) protected void messageConsumed(long newSeqIdConsumed) {
> (02) if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
> (03) return;
> (04) }
> (05) if (logger.isDebugEnabled()) {
> (06) logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
> (07) va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
> (08) }
> (09) lastSeqIdConsumedUtil = newSeqIdConsumed;
> (10) // after updated seq id check whether it still exceed msg limitation
> (11) if (msgLimitExceeded()) {
> (12) return;
> (13) }
> (14) if (isThrottled) {
> (15) isThrottled = false;
> (16) logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
> (17) va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
> (18)
> (19) enqueueWithoutFailure(new DeliveryManagerRequest() {
> (20) @Override
> (21) public void performRequest() {
> (22) // enqueue
> (23) clearRetryDelayForSubscriber(ActiveSubscriberState.this);
> (24) }
> (25) });
> (26) }
> (27) }
> (28)
> (29) public void deliverNextMessage() {
> (30) if (!isConnected()) {
> (31) return;
> (32) }
> (33)
> (34) // check whether we have delivered enough messages without receiving their consumes
> (35) if (msgLimitExceeded()) {
> (36) logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.",
> (37) va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
> (38) isThrottled = true;
> (39) // do nothing, since the delivery process would be throttled.
> (40) // After message consumed, it would be added back to retry queue.
> (41) return;
> (42) }
> (43)
> (44) localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
> (45)
> (46) ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
> (47) /* callback= */this, /* ctx= */null);
> (48)
> (49) persistenceMgr.scanSingleMessage(scanRequest);
> (50) }
> {code}
> By the way, we should also take care of thread-safe issue in other methods for ActiveSubscriberState, which implements some other callback interface.
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira