You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2013/07/22 19:18:01 UTC
svn commit: r1505735 - in /zookeeper/bookkeeper/branches/branch-4.2: ./
hedwig-server/src/main/java/org/apache/hedwig/server/delivery/
hedwig-server/src/test/java/org/apache/hedwig/server/delivery/
Author: ivank
Date: Mon Jul 22 17:18:01 2013
New Revision: 1505735
URL: http://svn.apache.org/r1505735
Log:
BOOKKEEPER-607: Filtered Messages Require ACK from Client Causes User Being Throttled Incorrectly Forever (sijie via ivank)
Modified:
zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
Modified: zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt?rev=1505735&r1=1505734&r2=1505735&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/CHANGES.txt Mon Jul 22 17:18:01 2013
@@ -54,6 +54,8 @@ Release 4.2.2 - Unreleased
BOOKKEEPER-601: readahead cache size isn't updated correctly (sijie via fpj)
+ BOOKKEEPER-607: Filtered Messages Require ACK from Client Causes User Being Throttled Incorrectly Forever (sijie via ivank)
+
hedwig-client:
BOOKKEEPER-598: Fails to compile - RESUBSCRIBE_EXCEPTION conflict (Matthew Farrellee via ivank)
Modified: zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1505735&r1=1505734&r2=1505735&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Jul 22 17:18:01 2013
@@ -139,6 +139,7 @@ public class FIFODeliveryManager impleme
this.cfg = cfg;
}
+ @Override
public void start() {
workerThread.start();
}
@@ -205,7 +206,7 @@ public class FIFODeliveryManager impleme
MessageSeqId seqIdToStartFrom,
DeliveryEndPoint endPoint, ServerMessageFilter filter,
Callback<Void> callback, Object ctx) {
- ActiveSubscriberState subscriber =
+ ActiveSubscriberState subscriber =
new ActiveSubscriberState(topic, subscriberId,
preferences,
seqIdToStartFrom.getLocalComponent() - 1,
@@ -262,7 +263,7 @@ public class FIFODeliveryManager impleme
if (null == subState) {
return;
}
- subState.messageConsumed(consumedSeqId.getLocalComponent());
+ subState.messageConsumed(consumedSeqId.getLocalComponent());
}
/**
@@ -313,6 +314,7 @@ public class FIFODeliveryManager impleme
/**
* Stop method which will enqueue a ShutdownDeliveryManagerRequest.
*/
+ @Override
public void stop() {
enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
}
@@ -528,8 +530,8 @@ public class FIFODeliveryManager impleme
enqueueWithoutFailure(new DeliveryManagerRequest() {
@Override
public void performRequest() {
- // enqueue
- clearRetryDelayForSubscriber(ActiveSubscriberState.this);
+ // enqueue
+ clearRetryDelayForSubscriber(ActiveSubscriberState.this);
}
});
}
@@ -621,6 +623,19 @@ public class FIFODeliveryManager impleme
}
if (!filter.testMessage(message)) {
+ // for filtered out messages, we don't deliver the message to client, so we would not
+ // receive its consume request which moves the <i>lastSeqIdConsumedUtil</i> pointer.
+ // we move the <i>lastSeqIdConsumedUtil</i> here for filtered out messages, which would
+ // avoid a subscriber being throttled due to the message gap introduced by filtering.
+ //
+ // it is OK to move <i>lastSeqIdConsumedUtil</i> here, since this pointer is subscriber's
+ // delivery state which to trottling deliver. changing <i>lastSeqIdConsumedUtil</i> would
+ // not affect the subscriber's consume pointer in zookeeper which is managed in subscription
+ // manager.
+ //
+ // And marking message consumed before calling sending finished, would avoid the subscriber
+ // being throttled first and released from throttled state laster.
+ messageConsumed(message.getMsgId().getLocalComponent());
sendingFinished();
return;
}
@@ -641,6 +656,7 @@ public class FIFODeliveryManager impleme
}
+ @Override
public void scanFailed(Object ctx, Exception exception) {
if (!checkConnected()) {
return;
@@ -650,6 +666,7 @@ public class FIFODeliveryManager impleme
retryErroredSubscriberAfterDelay(this);
}
+ @Override
public void scanFinished(Object ctx, ReasonForFinish reason) {
checkConnected();
}
@@ -658,6 +675,7 @@ public class FIFODeliveryManager impleme
* ===============================================================
* {@link DeliveryCallback} methods
*/
+ @Override
public void sendingFinished() {
if (!isConnected()) {
return;
@@ -687,6 +705,7 @@ public class FIFODeliveryManager impleme
}
+ @Override
public void permanentErrorOnSend() {
// the underlying channel is broken, the channel will
// be closed in UmbrellaHandler when exception happened.
@@ -695,6 +714,7 @@ public class FIFODeliveryManager impleme
NOP_CALLBACK, null);
}
+ @Override
public void transientErrorOnSend() {
retryErroredSubscriberAfterDelay(this);
}
@@ -703,6 +723,7 @@ public class FIFODeliveryManager impleme
* ===============================================================
* {@link DeliveryManagerRequest} methods
*/
+ @Override
public void performRequest() {
// Put this subscriber in the channel to subscriber mapping
ActiveSubscriberState prevSubscriber =
@@ -847,6 +868,7 @@ public class FIFODeliveryManager impleme
// This is a simple type of Request we will enqueue when the
// PubSubServer is shut down and we want to stop the DeliveryManager
// thread.
+ @Override
public void performRequest() {
keepRunning = false;
}
Modified: zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1505735&r1=1505734&r2=1505735&view=diff
==============================================================================
--- zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java (original)
+++ zookeeper/bookkeeper/branches/branch-4.2/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java Mon Jul 22 17:18:01 2013
@@ -17,14 +17,18 @@
*/
package org.apache.hedwig.server.delivery;
+import java.io.IOException;
+
import java.util.Arrays;
import java.util.Collection;
-import java.util.LinkedList;
-import java.util.List;
+import java.util.Map;
import java.util.concurrent.CountDownLatch;
import java.util.concurrent.TimeUnit;
import java.util.concurrent.atomic.AtomicInteger;
+import org.apache.commons.configuration.Configuration;
+import org.apache.commons.configuration.ConfigurationException;
+
import org.junit.Before;
import org.junit.Test;
import org.junit.runner.RunWith;
@@ -40,11 +44,19 @@ import org.apache.hedwig.client.HedwigCl
import org.apache.hedwig.client.api.MessageHandler;
import org.apache.hedwig.client.api.Publisher;
import org.apache.hedwig.client.api.Subscriber;
-import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.filter.ClientMessageFilter;
+import org.apache.hedwig.filter.MessageFilterBase;
+import org.apache.hedwig.filter.ServerMessageFilter;
+import org.apache.hedwig.protocol.PubSubProtocol;
+
import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageHeader;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+
import org.apache.hedwig.server.HedwigHubTestBase;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.util.Callback;
@@ -53,6 +65,43 @@ import org.apache.hedwig.util.Callback;
public class TestThrottlingDelivery extends HedwigHubTestBase {
private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10;
+ private static final String OPT_MOD = "MOD";
+
+ static class ModMessageFilter implements ServerMessageFilter, ClientMessageFilter {
+
+ int mod;
+
+ @Override
+ public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences) {
+ Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(preferences);
+ ByteString modValue = userOptions.get(OPT_MOD);
+ if (null == modValue) {
+ mod = 0;
+ } else {
+ mod = Integer.valueOf(modValue.toStringUtf8());
+ }
+ return this;
+ }
+
+ @Override
+ public boolean testMessage(Message message) {
+ int value = Integer.valueOf(message.getBody().toStringUtf8());
+ return 0 == value % mod;
+ }
+
+ @Override
+ public ServerMessageFilter initialize(Configuration conf) throws ConfigurationException, IOException {
+ // do nothing
+ return this;
+ }
+
+ @Override
+ public void uninitialize() {
+ // do nothing
+ }
+
+ }
protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
@@ -62,7 +111,7 @@ public class TestThrottlingDelivery exte
@Override
public int getDefaultMessageWindowSize() {
- return TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE;
+ return TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE;
}
}
@@ -71,7 +120,7 @@ public class TestThrottlingDelivery exte
int messageWindowSize;
ThrottleDeliveryClientConfiguration() {
- this(TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE);
+ this(TestThrottlingDelivery.DEFAULT_MESSAGE_WINDOW_SIZE);
}
ThrottleDeliveryClientConfiguration(int messageWindowSize) {
@@ -98,6 +147,73 @@ public class TestThrottlingDelivery exte
}
}
+ private void publishNums(Publisher pub, ByteString topic, int start, int num, int M) throws Exception {
+ for (int i = 1; i <= num; i++) {
+ PubSubProtocol.Map.Builder propsBuilder = PubSubProtocol.Map.newBuilder().addEntries(
+ PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
+ .setValue(ByteString.copyFromUtf8(String.valueOf((start + i) % M))));
+ MessageHeader.Builder headerBuilder = MessageHeader.newBuilder().setProperties(propsBuilder);
+ Message msg = Message.newBuilder().setBody(ByteString.copyFromUtf8(String.valueOf(start + i)))
+ .setHeader(headerBuilder).build();
+ pub.publish(topic, msg);
+ }
+ }
+
+ private void throttleWithFilter(Publisher pub, final Subscriber sub,
+ ByteString topic, ByteString subid,
+ final int X) throws Exception {
+ // publish numbers with header (so only 3 messages would be delivered)
+ publishNums(pub, topic, 0, 3 * X, X);
+
+ // subscribe the topic with filter
+ PubSubProtocol.Map userOptions = PubSubProtocol.Map
+ .newBuilder()
+ .addEntries(
+ PubSubProtocol.Map.Entry.newBuilder().setKey(OPT_MOD)
+ .setValue(ByteString.copyFromUtf8(String.valueOf(X)))).build();
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.ATTACH)
+ .setOptions(userOptions).setMessageFilter(ModMessageFilter.class.getName()).build();
+ sub.subscribe(topic, subid, opts);
+
+ final AtomicInteger expected = new AtomicInteger(X);
+ final CountDownLatch latch = new CountDownLatch(1);
+ sub.startDelivery(topic, subid, new MessageHandler() {
+ @Override
+ public synchronized void deliver(ByteString topic, ByteString subscriberId,
+ Message msg,
+ Callback<Void> callback, Object context) {
+ try {
+ int value = Integer.valueOf(msg.getBody().toStringUtf8());
+ logger.debug("Received message {},", value);
+
+ if (value == expected.get()) {
+ expected.addAndGet(X);
+ } else {
+ // error condition
+ logger.error("Did not receive expected value, expected {}, got {}",
+ expected.get(), value);
+ expected.set(0);
+ latch.countDown();
+ }
+ if (value == 3 * X) {
+ latch.countDown();
+ }
+ callback.operationFinished(context, null);
+ sub.consume(topic, subscriberId, msg.getMsgId());
+ } catch (Exception e) {
+ logger.error("Received bad message", e);
+ latch.countDown();
+ }
+ }
+ });
+
+ assertTrue("Timed out waiting for messages " + 3 * X, latch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected message with " + 4 * X, 4 * X, expected.get());
+
+ sub.stopDelivery(topic, subid);
+ sub.closeSubscription(topic, subid);
+ }
+
private void throttleX(Publisher pub, final Subscriber sub,
ByteString topic, ByteString subid,
final int X) throws Exception {
@@ -138,7 +254,7 @@ public class TestThrottlingDelivery exte
callback.operationFinished(context, null);
if (expected.get() > X + 1) {
sub.consume(topic, subscriberId, msg.getMsgId());
- }
+ }
} catch (Exception e) {
logger.error("Received bad message", e);
throttleLatch.countDown();
@@ -197,7 +313,7 @@ public class TestThrottlingDelivery exte
Publisher pub = client.getPublisher();
Subscriber sub = client.getSubscriber();
- ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle");
+ ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle");
ByteString subid = ByteString.copyFromUtf8("serverThrottleSub");
sub.subscribe(topic, subid, CreateOrAttach.CREATE);
sub.closeSubscription(topic, subid);
@@ -228,4 +344,26 @@ public class TestThrottlingDelivery exte
client.close();
}
+ @Test(timeout = 60000)
+ public void testThrottleWithServerSideFilter() throws Exception {
+ int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
+ ThrottleDeliveryClientConfiguration conf = new ThrottleDeliveryClientConfiguration();
+ HedwigClient client = new HedwigClient(conf);
+ Publisher pub = client.getPublisher();
+ Subscriber sub = client.getSubscriber();
+
+ ByteString topic = ByteString.copyFromUtf8("testThrottleWithServerSideFilter");
+ ByteString subid = ByteString.copyFromUtf8("mysub");
+ SubscriptionOptions opts = SubscriptionOptions.newBuilder().setCreateOrAttach(CreateOrAttach.CREATE).build();
+ sub.subscribe(topic, subid, opts);
+ sub.closeSubscription(topic, subid);
+
+ // message gap: half of the throttle threshold
+ throttleWithFilter(pub, sub, topic, subid, messageWindowSize / 2);
+ // message gap: equals to the throttle threshold
+ throttleWithFilter(pub, sub, topic, subid, messageWindowSize);
+ // message gap: larger than the throttle threshold
+ throttleWithFilter(pub, sub, topic, subid, messageWindowSize + messageWindowSize / 2);
+ }
+
}