You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Sijie Guo (JIRA)" <ji...@apache.org> on 2013/01/09 07:36:13 UTC

[jira] [Commented] (BOOKKEEPER-539) ClientNotSubscribedException & doesn't receive enough messages in TestThrottlingDelivery#testServerSideThrottle

    [ https://issues.apache.org/jira/browse/BOOKKEEPER-539?page=com.atlassian.jira.plugin.system.issuetabpanels:comment-tabpanel&focusedCommentId=13547689#comment-13547689 ] 

Sijie Guo commented on BOOKKEEPER-539:
--------------------------------------

the test case failed in Multiplex hedwig client. it was a race condition in FIFODeliveryManager#stopServingSubscriber.

it could be reproduced in following sequence.

1) subscriber #subscribe. FIFODeliveryManager#startServingSubscriber.
2) subscriber #closesub. FIFODeliveryManager#stopServingSubscriber (it is important that currently #stopServingSubscriber doesn't remove active subscriber from the mapping)
3) subscriber #subscribe. #startServingSubscriber replaced the active subscriber created at 2) with a newly active subscriber. after the replacement, it inserted another #stopServingSubscriber operatio in the queue.
4) the #stopServingSubscriber operation enqueued in 3) executed and delivered a FORCED_CLOSE_SUBSCRIPTION event to client,which would clear the subscription state established in 3) at the client side.

so 

a) if the clear operation executed before startDelivery, #startDelivery would throw ClientNotSubscribedException.
b) if the clear operation executed after startDelivery, it might cause 'doesn't receive enough messages'.

beside this critical issue, I found that there is a race condition over connected variable in ActiveSubscriber from the attached log. 'connected' variable isn't volatile, but it was used between DeliveryManager thread and ReadAheadCacheThread. some message would still be delivered to client side even someone close the subscription, because two threads see different value of 'connected' in different CPU cores.
                
> ClientNotSubscribedException & doesn't receive enough messages in TestThrottlingDelivery#testServerSideThrottle
> ---------------------------------------------------------------------------------------------------------------
>
>                 Key: BOOKKEEPER-539
>                 URL: https://issues.apache.org/jira/browse/BOOKKEEPER-539
>             Project: Bookkeeper
>          Issue Type: Bug
>          Components: hedwig-server
>            Reporter: Sijie Guo
>            Assignee: Sijie Guo
>            Priority: Blocker
>             Fix For: 4.2.0
>
>         Attachments: org.apache.hedwig.server.delivery.TestThrottlingDelivery-output.txt, org.apache.hedwig.server.delivery.TestThrottlingDelivery-output.txt
>
>
> ClientNotSubscribedException & doesn't receive enough messages failure in TestThrottlingDelivery#testServerSideThrottle.
> {code}
> -------------------------------------------------------------------------------
> Test set: org.apache.hedwig.server.delivery.TestThrottlingDelivery
> -------------------------------------------------------------------------------
> Tests run: 2, Failures: 1, Errors: 0, Skipped: 0, Time elapsed: 18.544 sec <<< FAILURE!
> testServerSideThrottle[1](org.apache.hedwig.server.delivery.TestThrottlingDelivery)  Time elapsed: 6.776 sec  <<< FAILURE!   junit.framework.AssertionFailedError: Should be expected messages with only 6 expected:<6> but was:<2>
>         at junit.framework.Assert.fail(Assert.java:47)
>         at junit.framework.Assert.failNotEquals(Assert.java:283)
>         at junit.framework.Assert.assertEquals(Assert.java:64)
>         at junit.framework.Assert.assertEquals(Assert.java:195)
>         at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:151)
>         at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:216)
>         at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)        at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>         at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>         at org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28)
> {code}
> {code}
> -------------------------------------------------------------------------------
> Test set: org.apache.hedwig.server.delivery.TestThrottlingDelivery
> -------------------------------------------------------------------------------
> Tests run: 2, Failures: 0, Errors: 1, Skipped: 0, Time elapsed: 18.294 sec <<< FAILURE!
> testServerSideThrottle[1](org.apache.hedwig.server.delivery.TestThrottlingDelivery)  Time elapsed: 6.763 sec  <<< ERROR!
> org.apache.hedwig.exceptions.PubSubException$ClientNotSubscribedException: Client is not yet subscribed to Topic:            testServerSideThrottleWithHigherValue, SubscriberId: serverThrottleSub
>         at org.apache.hedwig.client.netty.impl.multiplex.MultiplexHChannelManager.startDelivery(MultiplexHChannelManager.    java:221)
>         at org.apache.hedwig.client.netty.impl.multiplex.MultiplexHChannelManager.startDelivery(MultiplexHChannelManager.    java:199)
>         at org.apache.hedwig.client.netty.HedwigSubscriber.startDelivery(HedwigSubscriber.java:358)
>         at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:113)
>         at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:226)          at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
>         at sun.reflect.NativeMethodAccessorImpl.invoke(NativeMethodAccessorImpl.java:57)
>         at sun.reflect.DelegatingMethodAccessorImpl.invoke(DelegatingMethodAccessorImpl.java:43)
>         at java.lang.reflect.Method.invoke(Method.java:601)
>         at org.junit.runners.model.FrameworkMethod$1.runReflectiveCall(FrameworkMethod.java:44)
>         at org.junit.internal.runners.model.ReflectiveCallable.run(ReflectiveCallable.java:15)
>         at org.junit.runners.model.FrameworkMethod.invokeExplosively(FrameworkMethod.java:41)
>         at org.junit.internal.runners.statements.InvokeMethod.evaluate(InvokeMethod.java:20)
>         at org.junit.internal.runners.statements.FailOnTimeout$1.run(FailOnTimeout.java:28)
> {code}

--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira