You are viewing a plain text version of this content. The canonical link for it is here.
Posted to dev@bookkeeper.apache.org by "Jiannan Wang (JIRA)" <ji...@apache.org> on 2013/01/04 21:28:12 UTC
[jira] [Updated] (BOOKKEEPER-503) The test case of
TestThrottlingDelivery#testServerSideThrottle failed sometimes
[ https://issues.apache.org/jira/browse/BOOKKEEPER-503?page=com.atlassian.jira.plugin.system.issuetabpanels:all-tabpanel ]
Jiannan Wang updated BOOKKEEPER-503:
------------------------------------
Description:
Running follow script in hedwig-server project
{code:java}
while mvn test -Dtest=TestThrottlingDelivery#testServerSideThrottle; do echo .; done
{code}
We may get assertion failure:
{code:java}
testServerSideThrottle[1](org.apache.hedwig.server.delivery.TestThrottlingDelivery) Time elapsed: 3.314 sec <<< FAILURE!
junit.framework.AssertionFailedError: Received more messages than throttle value 5
at junit.framework.Assert.fail(Assert.java:47)
at junit.framework.Assert.assertTrue(Assert.java:20)
at junit.framework.Assert.assertFalse(Assert.java:34)
at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:149)
at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:216)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
{code}
This is a race issue which may cause messages been throttled by mistake:
# Suppose messages 1-12 are to be delivered and the message window size in Hub server side is 10.
# Messages 1-10 are delivered to subscriber while message 11-12 is throttled by the window size limitation.
# Subscriber calls consume 1-10 separately.
#* CONSUME 1 is handled and FIFODeliveryManager continue to deliver message 11. (Message 12 is still been throttled now)
#* Subscriber receive message 11 and quickly ack CONSUME 11 to Hub.
#* If *Hub handle CONSUME 11 prior to CONSUME [2-10]* then CONSUME [2-10] will be just ignored by the first if statement in following code, since lastSeqIdConsumedUtil is now 11:
#* Message 12 has no chance to been delivered.
{code:java}
FIFODeliveryManager.java Line 475
protected void messageConsumed(long newSeqIdConsumed) {
if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
return;
}
if (logger.isDebugEnabled()) {
logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
}
lastSeqIdConsumedUtil = newSeqIdConsumed;
// after updated seq id check whether it still exceed msg limitation
if (msgLimitExceeded()) {
return;
}
if (isThrottled) {
isThrottled = false;
logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
enqueueWithoutFailure(new DeliveryManagerRequest() {
@Override
public void performRequest() {
// enqueue
clearRetryDelayForSubscriber(ActiveSubscriberState.this);
}
});
}
}
{code}
was:
Running follow script in hedwig-server project
{code:java}
while mvn test -Dtest=TestThrottlingDelivery#testServerSideThrottle; do echo .; done
{code}
We may get assertion failure:
{code:java}
testServerSideThrottle[1](org.apache.hedwig.server.delivery.TestThrottlingDelivery) Time elapsed: 3.314 sec <<< FAILURE!
junit.framework.AssertionFailedError: Received more messages than throttle value 5
at junit.framework.Assert.fail(Assert.java:47)
at junit.framework.Assert.assertTrue(Assert.java:20)
at junit.framework.Assert.assertFalse(Assert.java:34)
at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:149)
at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:216)
at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
{code}
> The test case of TestThrottlingDelivery#testServerSideThrottle failed sometimes
> -------------------------------------------------------------------------------
>
> Key: BOOKKEEPER-503
> URL: https://issues.apache.org/jira/browse/BOOKKEEPER-503
> Project: Bookkeeper
> Issue Type: Bug
> Components: hedwig-server
> Reporter: Jiannan Wang
> Fix For: 4.2.0
>
>
> Running follow script in hedwig-server project
> {code:java}
> while mvn test -Dtest=TestThrottlingDelivery#testServerSideThrottle; do echo .; done
> {code}
> We may get assertion failure:
> {code:java}
> testServerSideThrottle[1](org.apache.hedwig.server.delivery.TestThrottlingDelivery) Time elapsed: 3.314 sec <<< FAILURE!
> junit.framework.AssertionFailedError: Received more messages than throttle value 5
> at junit.framework.Assert.fail(Assert.java:47)
> at junit.framework.Assert.assertTrue(Assert.java:20)
> at junit.framework.Assert.assertFalse(Assert.java:34)
> at org.apache.hedwig.server.delivery.TestThrottlingDelivery.throttleX(TestThrottlingDelivery.java:149)
> at org.apache.hedwig.server.delivery.TestThrottlingDelivery.testServerSideThrottle(TestThrottlingDelivery.java:216)
> at sun.reflect.NativeMethodAccessorImpl.invoke0(Native Method)
> {code}
> This is a race issue which may cause messages been throttled by mistake:
> # Suppose messages 1-12 are to be delivered and the message window size in Hub server side is 10.
> # Messages 1-10 are delivered to subscriber while message 11-12 is throttled by the window size limitation.
> # Subscriber calls consume 1-10 separately.
> #* CONSUME 1 is handled and FIFODeliveryManager continue to deliver message 11. (Message 12 is still been throttled now)
> #* Subscriber receive message 11 and quickly ack CONSUME 11 to Hub.
> #* If *Hub handle CONSUME 11 prior to CONSUME [2-10]* then CONSUME [2-10] will be just ignored by the first if statement in following code, since lastSeqIdConsumedUtil is now 11:
> #* Message 12 has no chance to been delivered.
> {code:java}
> FIFODeliveryManager.java Line 475
> protected void messageConsumed(long newSeqIdConsumed) {
> if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
> return;
> }
> if (logger.isDebugEnabled()) {
> logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
> va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
> }
> lastSeqIdConsumedUtil = newSeqIdConsumed;
> // after updated seq id check whether it still exceed msg limitation
> if (msgLimitExceeded()) {
> return;
> }
> if (isThrottled) {
> isThrottled = false;
> logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
> va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
> enqueueWithoutFailure(new DeliveryManagerRequest() {
> @Override
> public void performRequest() {
> // enqueue
> clearRetryDelayForSubscriber(ActiveSubscriberState.this);
> }
> });
> }
> }
> {code}
--
This message is automatically generated by JIRA.
If you think it was sent incorrectly, please contact your JIRA administrators
For more information on JIRA, see: http://www.atlassian.com/software/jira