You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Ivan Kelly (JIRA)" <ji...@apache.org> on 2013/02/13 16:49:16 UTC

[jira] [Closed] (BOOKKEEPER-503) The test case of TestThrottlingDelivery#testServerSideThrottle failed sometimes

     [ https://issues.apache.org/jira/browse/BOOKKEEPER-503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]

Ivan Kelly closed BOOKKEEPER-503.
---------------------------------

    
> The test case of TestThrottlingDelivery#testServerSideThrottle failed sometimes
> -------------------------------------------------------------------------------
>
>                 Key: BOOKKEEPER-503
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-503
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-server
>            Reporter: Jiannan Wang
>            Assignee: Jiannan Wang
>             Fix For: 4.2.0
>
>         Attachments: 0001-BOOKKEEPER-503-Test-case.patch, BOOKKEEPER-503.patch, BOOKKEEPER-503.patch, logs.tar
>
>
> Running follow script in hedwig-server project
> {code:java}
> while mvn test -Dtest=TestThrottlingDelivery; do echo .; done
> {code}
> We may get assertion failure:
> {code:java}
> testServerSideThrottle[0](org.apache.hedwig.server.delivery.TestThrottlingDelivery)  Time elapsed: 14.922 sec  <<< FAILURE!
> junit.framework.AssertionFailedError: Timed out waiting for messages 31
> 	at junit.framework.Assert.fail(Assert.java:47)
> 	at junit.framework.Assert.assertTrue(Assert.java:20)
> 	at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:159)
> 	at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:206)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> 	at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
> 	at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
> 	at java.lang.reflect.Method.invoke(Method.java:601)
> 	at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
> 	at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
> 	at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
> 	at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
> 	at org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28)
> {code}
> This is a race issue which may cause messages been throttled by mistake, the root cause is ActiveSubscriberState.messageConsumed() and ActiveSubscriberState.deliverNextMessage() may be executed in different threads by AbstractSubscriptionManager and FIFODeliveryManager.
> Read the log in attachement around Line 2420 if you want to get more information, here I replay the logs onto the code (Line XX denotes code listed below):
> # Messages 1-30 are to be delivered and the message window size in Hub server is 10.
> # Messages 1-10 are delivered to subscriber while message 11-30 is throttled by the window size limitation.
> # Subscriber calls consume 1-10 asynchronously.
> # CONSUME 1 is handled and FIFODeliveryManager continue to deliver message 11.
> # Subscriber receive message 11 and quickly ack CONSUME 11 to Hub.
> # Now there are two threads operate on a same ActiveSubscriberState:
> #* Thread in AbstractSubscriptionManager: call ActiveSubscriberState.messageConsumed() for message 2, 3, 11 (4-10 is still on the way since it's asynchronous consume). Let's assume this thread happen to run in Line (14) for message 11.
> #* Thread in FIFODeliveryManager:  Coincidently, it's in Line (36) now (with last delivered 11, last consumed 1 and variable isThrottled is still false).
> # If thread in AbstractSubscriptionManager executed before FIFODeliveryManager, then consume operator for 11 does nothing more.
> # CONSUME [4-10] will be just ignored by the if statement in Line (2) since lastSeqIdConsumedUtil is now 11.
> # Further messages like 12 have no chance to been delivered at this time.
> {code:java}
> (01) protected void messageConsumed(long newSeqIdConsumed) {
> (02)     if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
> (03)         return;
> (04)     }
> (05)     if (logger.isDebugEnabled()) {
> (06)         logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
> (07)                      va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
> (08)     }
> (09)     lastSeqIdConsumedUtil = newSeqIdConsumed;
> (10)     // after updated seq id check whether it still exceed msg limitation
> (11)     if (msgLimitExceeded()) {
> (12)         return;
> (13)     }
> (14)     if (isThrottled) {
> (15)         isThrottled = false;
> (16)         logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
> (17)                     va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
> (18) 
> (19)         enqueueWithoutFailure(new DeliveryManagerRequest() {
> (20)             @Override
> (21)             public void performRequest() {
> (22)                 // enqueue 
> (23)                 clearRetryDelayForSubscriber(ActiveSubscriberState.this);            
> (24)             }
> (25)         });
> (26)     }
> (27) }
> (28) 
> (29) public void deliverNextMessage() {
> (30)     if (!isConnected()) {
> (31)         return;
> (32)     }
> (33) 
> (34)     // check whether we have delivered enough messages without receiving their consumes
> (35)     if (msgLimitExceeded()) {
> (36)         logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.",
> (37)                     va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
> (38)         isThrottled = true;
> (39)         // do nothing, since the delivery process would be throttled.
> (40)         // After message consumed, it would be added back to retry queue.
> (41)         return;
> (42)     }
> (43) 
> (44)     localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
> (45) 
> (46)     ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
> (47)             /* callback= */this, /* ctx= */null);
> (48) 
> (49)     persistenceMgr.scanSingleMessage(scanRequest);
> (50) }
> {code}
> By the way, we should also take care of thread-safe issue in other methods for ActiveSubscriberState, which implements some other callback interface.

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira