You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/04 15:30:32 UTC
svn commit: r1394066 - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/netty/
hedwig-protocol/src/main/java/org/apache/hedwig/protocol/
hedwig-protocol/src/main/protobuf/
hedwig-server/src/main/java/org/apache/he...
Author: ivank
Date: Thu Oct 4 13:30:31 2012
New Revision: 1394066
URL: http://svn.apache.org/viewvc?rev=1394066&view=rev
Log:
BOOKKEEPER-367: Server-Side Message Delivery Flow Control (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct 4 13:30:31 2012
@@ -168,6 +168,8 @@ Trunk (unreleased changes)
BOOKKEEPER-397: Make the hedwig client in RegionManager configurable. (Aniruddha via sijie)
+ BOOKKEEPER-367: Server-Side Message Delivery Flow Control (sijie via ivank)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Thu Oct 4 13:30:31 2012
@@ -192,6 +192,11 @@ public class NetUtils {
preferencesBuilder.setOptions(options.getOptions());
}
+ // set delivery throttle value
+ if (options.getDeliveryThrottleValue() > 0) {
+ preferencesBuilder.setDeliveryThrottleValue(options.getDeliveryThrottleValue());
+ }
+
return preferencesBuilder;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Thu Oct 4 13:30:31 2012
@@ -5704,6 +5704,10 @@ public final class PubSubProtocol {
// optional string messageFilter = 3;
boolean hasMessageFilter();
String getMessageFilter();
+
+ // optional uint32 deliveryThrottleValue = 4;
+ boolean hasDeliveryThrottleValue();
+ int getDeliveryThrottleValue();
}
public static final class SubscriptionPreferences extends
com.google.protobuf.GeneratedMessage
@@ -5789,10 +5793,21 @@ public final class PubSubProtocol {
}
}
+ // optional uint32 deliveryThrottleValue = 4;
+ public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 4;
+ private int deliveryThrottleValue_;
+ public boolean hasDeliveryThrottleValue() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public int getDeliveryThrottleValue() {
+ return deliveryThrottleValue_;
+ }
+
private void initFields() {
options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
messageBound_ = 0;
messageFilter_ = "";
+ deliveryThrottleValue_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5815,6 +5830,9 @@ public final class PubSubProtocol {
if (((bitField0_ & 0x00000004) == 0x00000004)) {
output.writeBytes(3, getMessageFilterBytes());
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ output.writeUInt32(4, deliveryThrottleValue_);
+ }
getUnknownFields().writeTo(output);
}
@@ -5836,6 +5854,10 @@ public final class PubSubProtocol {
size += com.google.protobuf.CodedOutputStream
.computeBytesSize(3, getMessageFilterBytes());
}
+ if (((bitField0_ & 0x00000008) == 0x00000008)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(4, deliveryThrottleValue_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -5971,6 +5993,8 @@ public final class PubSubProtocol {
bitField0_ = (bitField0_ & ~0x00000002);
messageFilter_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
+ deliveryThrottleValue_ = 0;
+ bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -6025,6 +6049,10 @@ public final class PubSubProtocol {
to_bitField0_ |= 0x00000004;
}
result.messageFilter_ = messageFilter_;
+ if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+ to_bitField0_ |= 0x00000008;
+ }
+ result.deliveryThrottleValue_ = deliveryThrottleValue_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -6050,6 +6078,9 @@ public final class PubSubProtocol {
if (other.hasMessageFilter()) {
setMessageFilter(other.getMessageFilter());
}
+ if (other.hasDeliveryThrottleValue()) {
+ setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -6100,6 +6131,11 @@ public final class PubSubProtocol {
messageFilter_ = input.readBytes();
break;
}
+ case 32: {
+ bitField0_ |= 0x00000008;
+ deliveryThrottleValue_ = input.readUInt32();
+ break;
+ }
}
}
}
@@ -6253,6 +6289,27 @@ public final class PubSubProtocol {
onChanged();
}
+ // optional uint32 deliveryThrottleValue = 4;
+ private int deliveryThrottleValue_ ;
+ public boolean hasDeliveryThrottleValue() {
+ return ((bitField0_ & 0x00000008) == 0x00000008);
+ }
+ public int getDeliveryThrottleValue() {
+ return deliveryThrottleValue_;
+ }
+ public Builder setDeliveryThrottleValue(int value) {
+ bitField0_ |= 0x00000008;
+ deliveryThrottleValue_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearDeliveryThrottleValue() {
+ bitField0_ = (bitField0_ & ~0x00000008);
+ deliveryThrottleValue_ = 0;
+ onChanged();
+ return this;
+ }
+
// @@protoc_insertion_point(builder_scope:Hedwig.SubscriptionPreferences)
}
@@ -7089,6 +7146,10 @@ public final class PubSubProtocol {
boolean hasMessageFilter();
String getMessageFilter();
+ // optional uint32 deliveryThrottleValue = 6;
+ boolean hasDeliveryThrottleValue();
+ int getDeliveryThrottleValue();
+
// optional bool enableResubscribe = 7 [default = true];
boolean hasEnableResubscribe();
boolean getEnableResubscribe();
@@ -7197,11 +7258,21 @@ public final class PubSubProtocol {
}
}
+ // optional uint32 deliveryThrottleValue = 6;
+ public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 6;
+ private int deliveryThrottleValue_;
+ public boolean hasDeliveryThrottleValue() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public int getDeliveryThrottleValue() {
+ return deliveryThrottleValue_;
+ }
+
// optional bool enableResubscribe = 7 [default = true];
public static final int ENABLERESUBSCRIBE_FIELD_NUMBER = 7;
private boolean enableResubscribe_;
public boolean hasEnableResubscribe() {
- return ((bitField0_ & 0x00000020) == 0x00000020);
+ return ((bitField0_ & 0x00000040) == 0x00000040);
}
public boolean getEnableResubscribe() {
return enableResubscribe_;
@@ -7213,6 +7284,7 @@ public final class PubSubProtocol {
messageBound_ = 0;
options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
messageFilter_ = "";
+ deliveryThrottleValue_ = 0;
enableResubscribe_ = true;
}
private byte memoizedIsInitialized = -1;
@@ -7243,6 +7315,9 @@ public final class PubSubProtocol {
output.writeBytes(5, getMessageFilterBytes());
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
+ output.writeUInt32(6, deliveryThrottleValue_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeBool(7, enableResubscribe_);
}
getUnknownFields().writeTo(output);
@@ -7276,6 +7351,10 @@ public final class PubSubProtocol {
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
size += com.google.protobuf.CodedOutputStream
+ .computeUInt32Size(6, deliveryThrottleValue_);
+ }
+ if (((bitField0_ & 0x00000040) == 0x00000040)) {
+ size += com.google.protobuf.CodedOutputStream
.computeBoolSize(7, enableResubscribe_);
}
size += getUnknownFields().getSerializedSize();
@@ -7417,8 +7496,10 @@ public final class PubSubProtocol {
bitField0_ = (bitField0_ & ~0x00000008);
messageFilter_ = "";
bitField0_ = (bitField0_ & ~0x00000010);
- enableResubscribe_ = true;
+ deliveryThrottleValue_ = 0;
bitField0_ = (bitField0_ & ~0x00000020);
+ enableResubscribe_ = true;
+ bitField0_ = (bitField0_ & ~0x00000040);
return this;
}
@@ -7484,6 +7565,10 @@ public final class PubSubProtocol {
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000020;
}
+ result.deliveryThrottleValue_ = deliveryThrottleValue_;
+ if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+ to_bitField0_ |= 0x00000040;
+ }
result.enableResubscribe_ = enableResubscribe_;
result.bitField0_ = to_bitField0_;
onBuilt();
@@ -7516,6 +7601,9 @@ public final class PubSubProtocol {
if (other.hasMessageFilter()) {
setMessageFilter(other.getMessageFilter());
}
+ if (other.hasDeliveryThrottleValue()) {
+ setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+ }
if (other.hasEnableResubscribe()) {
setEnableResubscribe(other.getEnableResubscribe());
}
@@ -7585,8 +7673,13 @@ public final class PubSubProtocol {
messageFilter_ = input.readBytes();
break;
}
- case 56: {
+ case 48: {
bitField0_ |= 0x00000020;
+ deliveryThrottleValue_ = input.readUInt32();
+ break;
+ }
+ case 56: {
+ bitField0_ |= 0x00000040;
enableResubscribe_ = input.readBool();
break;
}
@@ -7788,22 +7881,43 @@ public final class PubSubProtocol {
onChanged();
}
+ // optional uint32 deliveryThrottleValue = 6;
+ private int deliveryThrottleValue_ ;
+ public boolean hasDeliveryThrottleValue() {
+ return ((bitField0_ & 0x00000020) == 0x00000020);
+ }
+ public int getDeliveryThrottleValue() {
+ return deliveryThrottleValue_;
+ }
+ public Builder setDeliveryThrottleValue(int value) {
+ bitField0_ |= 0x00000020;
+ deliveryThrottleValue_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearDeliveryThrottleValue() {
+ bitField0_ = (bitField0_ & ~0x00000020);
+ deliveryThrottleValue_ = 0;
+ onChanged();
+ return this;
+ }
+
// optional bool enableResubscribe = 7 [default = true];
private boolean enableResubscribe_ = true;
public boolean hasEnableResubscribe() {
- return ((bitField0_ & 0x00000020) == 0x00000020);
+ return ((bitField0_ & 0x00000040) == 0x00000040);
}
public boolean getEnableResubscribe() {
return enableResubscribe_;
}
public Builder setEnableResubscribe(boolean value) {
- bitField0_ |= 0x00000020;
+ bitField0_ |= 0x00000040;
enableResubscribe_ = value;
onChanged();
return this;
}
public Builder clearEnableResubscribe() {
- bitField0_ = (bitField0_ & ~0x00000020);
+ bitField0_ = (bitField0_ & ~0x00000040);
enableResubscribe_ = true;
onChanged();
return this;
@@ -15382,73 +15496,75 @@ public final class PubSubProtocol {
"\033.Hedwig.StopDeliveryRequest\022:\n\024startDel" +
"iveryRequest\0309 \001(\0132\034.Hedwig.StartDeliver" +
"yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
- "\017.Hedwig.Message\"d\n\027SubscriptionPreferen" +
- "ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
- "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\"\277" +
- "\002\n\020SubscribeRequest\022\024\n\014subscriberId\030\002 \002(" +
- "\014\022Q\n\016createOrAttach\030\003 \001(\0162\'.Hedwig.Subsc",
- "ribeRequest.CreateOrAttach:\020CREATE_OR_AT" +
- "TACH\022\032\n\013synchronous\030\004 \001(\010:\005false\022\024\n\014mess" +
- "ageBound\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.He" +
- "dwig.SubscriptionPreferences\022\032\n\013forceAtt" +
- "ach\030\007 \001(\010:\005false\">\n\016CreateOrAttach\022\n\n\006CR" +
- "EATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002" +
- "\"\363\001\n\023SubscriptionOptions\022\032\n\013forceAttach\030" +
- "\001 \001(\010:\005false\022Q\n\016createOrAttach\030\002 \001(\0162\'.H" +
- "edwig.SubscribeRequest.CreateOrAttach:\020C" +
- "REATE_OR_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010",
- "\022\034\n\007options\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessag" +
- "eFilter\030\005 \001(\t\022\037\n\021enableResubscribe\030\007 \001(\010" +
- ":\004true\"K\n\016ConsumeRequest\022\024\n\014subscriberId" +
- "\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.Hedwig.MessageSe" +
- "qId\"*\n\022UnsubscribeRequest\022\024\n\014subscriberI" +
- "d\030\002 \002(\014\"+\n\023StopDeliveryRequest\022\024\n\014subscr" +
- "iberId\030\002 \002(\014\",\n\024StartDeliveryRequest\022\024\n\014" +
- "subscriberId\030\002 \002(\014\"\377\001\n\016PubSubResponse\0220\n" +
- "\017protocolVersion\030\001 \002(\0162\027.Hedwig.Protocol" +
- "Version\022&\n\nstatusCode\030\002 \002(\0162\022.Hedwig.Sta",
- "tusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(" +
- "\t\022 \n\007message\030\005 \001(\0132\017.Hedwig.Message\022\r\n\005t" +
- "opic\030\006 \001(\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014resp" +
- "onseBody\030\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017" +
- "PublishResponse\022,\n\016publishedMsgId\030\001 \002(\0132" +
- "\024.Hedwig.MessageSeqId\"I\n\021SubscribeRespon" +
- "se\0224\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscri" +
- "ptionPreferences\"v\n\014ResponseBody\0220\n\017publ" +
- "ishResponse\030\001 \001(\0132\027.Hedwig.PublishRespon" +
- "se\0224\n\021subscribeResponse\030\002 \001(\0132\031.Hedwig.S",
- "ubscribeResponse\"N\n\021SubscriptionState\022#\n" +
- "\005msgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\022\024\n\014me" +
- "ssageBound\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005" +
- "state\030\001 \001(\0132\031.Hedwig.SubscriptionState\0224" +
- "\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscriptio" +
- "nPreferences\"O\n\013LedgerRange\022\020\n\010ledgerId\030" +
- "\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 \001(\0132\024.Hedwig" +
- ".MessageSeqId\"3\n\014LedgerRanges\022#\n\006ranges\030" +
- "\001 \003(\0132\023.Hedwig.LedgerRange\":\n\013ManagerMet" +
- "a\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016managerVersion",
- "\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022" +
- "\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopi" +
- "cs\030\002 \002(\004*\"\n\017ProtocolVersion\022\017\n\013VERSION_O" +
- "NE\020\001*p\n\rOperationType\022\013\n\007PUBLISH\020\000\022\r\n\tSU" +
- "BSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022" +
- "\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*D" +
- "\n\021SubscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032" +
- "SUBSCRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusC" +
- "ode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003" +
- "\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_S",
- "UBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003" +
- "\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226" +
- "\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SER" +
- "VICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026IN" +
- "VALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210" +
- "\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOP" +
- "IC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBS" +
- "CRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_" +
- "EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027T" +
- "OPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_",
- "CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apac" +
- "he.hedwig.protocolH\001"
+ "\017.Hedwig.Message\"\203\001\n\027SubscriptionPrefere" +
+ "nces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014me" +
+ "ssageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022" +
+ "\035\n\025deliveryThrottleValue\030\004 \001(\r\"\277\002\n\020Subsc" +
+ "ribeRequest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016cre",
+ "ateOrAttach\030\003 \001(\0162\'.Hedwig.SubscribeRequ" +
+ "est.CreateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013" +
+ "synchronous\030\004 \001(\010:\005false\022\024\n\014messageBound" +
+ "\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Sub" +
+ "scriptionPreferences\022\032\n\013forceAttach\030\007 \001(" +
+ "\010:\005false\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n" +
+ "\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\222\002\n\023Sub" +
+ "scriptionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005f" +
+ "alse\022Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Su" +
+ "bscribeRequest.CreateOrAttach:\020CREATE_OR",
+ "_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007opti" +
+ "ons\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030" +
+ "\005 \001(\t\022\035\n\025deliveryThrottleValue\030\006 \001(\r\022\037\n\021" +
+ "enableResubscribe\030\007 \001(\010:\004true\"K\n\016Consume" +
+ "Request\022\024\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003" +
+ " \002(\0132\024.Hedwig.MessageSeqId\"*\n\022Unsubscrib" +
+ "eRequest\022\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDe" +
+ "liveryRequest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024S" +
+ "tartDeliveryRequest\022\024\n\014subscriberId\030\002 \002(" +
+ "\014\"\377\001\n\016PubSubResponse\0220\n\017protocolVersion\030",
+ "\001 \002(\0162\027.Hedwig.ProtocolVersion\022&\n\nstatus" +
+ "Code\030\002 \002(\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030" +
+ "\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(" +
+ "\0132\017.Hedwig.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014sub" +
+ "scriberId\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024." +
+ "Hedwig.ResponseBody\"?\n\017PublishResponse\022," +
+ "\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig.MessageS" +
+ "eqId\"I\n\021SubscribeResponse\0224\n\013preferences" +
+ "\030\002 \001(\0132\037.Hedwig.SubscriptionPreferences\"" +
+ "v\n\014ResponseBody\0220\n\017publishResponse\030\001 \001(\013",
+ "2\027.Hedwig.PublishResponse\0224\n\021subscribeRe" +
+ "sponse\030\002 \001(\0132\031.Hedwig.SubscribeResponse\"" +
+ "N\n\021SubscriptionState\022#\n\005msgId\030\001 \002(\0132\024.He" +
+ "dwig.MessageSeqId\022\024\n\014messageBound\030\002 \001(\r\"" +
+ "r\n\020SubscriptionData\022(\n\005state\030\001 \001(\0132\031.Hed" +
+ "wig.SubscriptionState\0224\n\013preferences\030\002 \001" +
+ "(\0132\037.Hedwig.SubscriptionPreferences\"O\n\013L" +
+ "edgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqId" +
+ "Included\030\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014" +
+ "LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.Le",
+ "dgerRange\":\n\013ManagerMeta\022\023\n\013managerImpl\030" +
+ "\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n\013HubInfo" +
+ "Data\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n" +
+ "\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017Proto" +
+ "colVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOperation" +
+ "Type\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONS" +
+ "UME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY" +
+ "\020\004\022\021\n\rSTOP_DELIVERY\020\005*D\n\021SubscriptionEve" +
+ "nt\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORC" +
+ "ED_CLOSED\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022",
+ "\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC" +
+ "\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025C" +
+ "LIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CON" +
+ "NECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSI" +
+ "BLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017U" +
+ "NCERTAIN_STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FIL" +
+ "TER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PER" +
+ "SISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_IN" +
+ "FO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004" +
+ "\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_T",
+ "OPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_E" +
+ "XISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tC" +
+ "OMPOSITE\020\274\005B\036\n\032org.apache.hedwig.protoco" +
+ "lH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15524,7 +15640,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_SubscriptionPreferences_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionPreferences_descriptor,
- new java.lang.String[] { "Options", "MessageBound", "MessageFilter", },
+ new java.lang.String[] { "Options", "MessageBound", "MessageFilter", "DeliveryThrottleValue", },
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder.class);
internal_static_Hedwig_SubscribeRequest_descriptor =
@@ -15540,7 +15656,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_SubscriptionOptions_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionOptions_descriptor,
- new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "EnableResubscribe", },
+ new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "DeliveryThrottleValue", "EnableResubscribe", },
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.Builder.class);
internal_static_Hedwig_ConsumeRequest_descriptor =
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Thu Oct 4 13:30:31 2012
@@ -117,6 +117,8 @@ message SubscriptionPreferences {
optional uint32 messageBound = 2;
// server-side message filter
optional string messageFilter = 3;
+ // server-side delivery throttle value
+ optional uint32 deliveryThrottleValue = 4;
}
message SubscribeRequest{
@@ -154,6 +156,8 @@ message SubscriptionOptions {
optional Map options = 4;
// server-side message filter
optional string messageFilter = 5;
+ // server-side delivery throttle value
+ optional uint32 deliveryThrottleValue = 6;
// enable resubscribe
optional bool enableResubscribe = 7 [default = true];
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Thu Oct 4 13:30:31 2012
@@ -61,6 +61,9 @@ public class ServerConfiguration extends
protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
+ protected final static String DEFAULT_DELIVERY_THROTTLE_VALUE =
+ "default_delivery_throttle_value";
+
protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
// manager related settings
@@ -282,6 +285,13 @@ public class ServerConfiguration extends
return conf.getInt(RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL, 120000);
}
+ // This parameter is for setting the default maximum number of messages delivered
+ // to a subscriber without being consumed.
+ // we throttle delivery messages to a subscriber when reaching the threshold.
+ public int getDefaultDeliveryThrottleValue() {
+ return conf.getInt(DEFAULT_DELIVERY_THROTTLE_VALUE, 0);
+ }
+
// This parameter is used when Bookkeeper is the persistence store
// and indicates what the ensemble size is (i.e. how many bookie
// servers to stripe the ledger entries across).
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Thu Oct 4 13:30:31 2012
@@ -19,17 +19,34 @@ package org.apache.hedwig.server.deliver
import com.google.protobuf.ByteString;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.filter.ServerMessageFilter;
public interface DeliveryManager {
public void start();
- public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, ServerMessageFilter filter);
+ public void startServingSubscription(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences,
+ MessageSeqId seqIdToStartFrom,
+ DeliveryEndPoint endPoint,
+ ServerMessageFilter filter);
public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
/**
+ * Tell the delivery manager where that a subscriber has consumed
+ *
+ * @param topic
+ * Topic Name
+ * @param subscriberId
+ * Subscriber Id
+ * @param consumedSeqId
+ * Max consumed seq id.
+ */
+ public void messageConsumed(ByteString topic, ByteString subscriberId,
+ MessageSeqId consumedSeqId);
+
+ /**
* Stop delivery manager
*/
public void stop();
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu Oct 4 13:30:31 2012
@@ -17,6 +17,7 @@
*/
package org.apache.hedwig.server.delivery;
+import java.util.Comparator;
import java.util.HashMap;
import java.util.HashSet;
import java.util.Map;
@@ -25,7 +26,7 @@ import java.util.Set;
import java.util.SortedMap;
import java.util.TreeMap;
import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.PriorityBlockingQueue;
import java.util.concurrent.LinkedBlockingQueue;
import java.util.concurrent.TimeUnit;
@@ -42,6 +43,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.UnexpectedError;
import org.apache.hedwig.server.netty.ServerStats;
@@ -50,6 +52,7 @@ import org.apache.hedwig.server.persiste
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.persistence.ScanCallback;
import org.apache.hedwig.server.persistence.ScanRequest;
+import static org.apache.hedwig.util.VarArgs.va;
public class FIFODeliveryManager implements Runnable, DeliveryManager {
@@ -68,7 +71,14 @@ public class FIFODeliveryManager impleme
* The queue of all subscriptions that are facing a transient error either
* in scanning from the persistence manager, or in sending to the consumer
*/
- Queue<ActiveSubscriberState> retryQueue = new ConcurrentLinkedQueue<ActiveSubscriberState>();
+ Queue<ActiveSubscriberState> retryQueue =
+ new PriorityBlockingQueue<ActiveSubscriberState>(32, new Comparator<ActiveSubscriberState>() {
+ @Override
+ public int compare(ActiveSubscriberState as1, ActiveSubscriberState as2) {
+ long s = as1.lastScanErrorTime - as2.lastScanErrorTime;
+ return s > 0 ? 1 : (s < 0 ? -1 : 0);
+ }
+ });
/**
* Stores a mapping from topic to the delivery pointers on the topic. The
@@ -136,11 +146,16 @@ public class FIFODeliveryManager impleme
* Only messages passing this filter should be sent to this
* subscriber
*/
- public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+ public void startServingSubscription(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences,
+ MessageSeqId seqIdToStartFrom,
DeliveryEndPoint endPoint, ServerMessageFilter filter) {
- ActiveSubscriberState subscriber = new ActiveSubscriberState(topic, subscriberId, seqIdToStartFrom
- .getLocalComponent() - 1, endPoint, filter);
+ ActiveSubscriberState subscriber =
+ new ActiveSubscriberState(topic, subscriberId,
+ preferences,
+ seqIdToStartFrom.getLocalComponent() - 1,
+ endPoint, filter);
enqueueWithoutFailure(subscriber);
}
@@ -179,6 +194,34 @@ public class FIFODeliveryManager impleme
}
}
+ public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) {
+ subscriber.clearLastScanErrorTime();
+ if (!retryQueue.offer(subscriber)) {
+ throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
+ }
+ // no request in request queue now
+ // issue a empty delivery request to not waiting for polling requests queue
+ if (requestQueue.isEmpty()) {
+ enqueueWithoutFailure(new DeliveryManagerRequest() {
+ @Override
+ public void performRequest() {
+ // do nothing
+ }
+ });
+ }
+ }
+
+ @Override
+ public void messageConsumed(ByteString topic, ByteString subscriberId,
+ MessageSeqId consumedSeqId) {
+ ActiveSubscriberState subState =
+ subscriberStates.get(new TopicSubscriber(topic, subscriberId));
+ if (null == subState) {
+ return;
+ }
+ subState.messageConsumed(consumedSeqId.getLocalComponent());
+ }
+
/**
* Instructs the delivery manager to move the delivery pointer for a given
* subscriber
@@ -291,6 +334,9 @@ public class FIFODeliveryManager impleme
}
public class ActiveSubscriberState implements ScanCallback, DeliveryCallback, DeliveryManagerRequest {
+
+ static final int UNLIMITED = 0;
+
ByteString topic;
ByteString subscriberId;
long lastLocalSeqIdDelivered;
@@ -299,18 +345,35 @@ public class FIFODeliveryManager impleme
long lastScanErrorTime = -1;
long localSeqIdDeliveringNow;
long lastSeqIdCommunicatedExternally;
+ long lastSeqIdConsumedUtil;
+ boolean isThrottled = false;
+ final int throttleThreshold;
ServerMessageFilter filter;
// TODO make use of these variables
final static int SEQ_ID_SLACK = 10;
- public ActiveSubscriberState(ByteString topic, ByteString subscriberId, long lastLocalSeqIdDelivered,
- DeliveryEndPoint deliveryEndPoint, ServerMessageFilter filter) {
+ public ActiveSubscriberState(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences,
+ long lastLocalSeqIdDelivered,
+ DeliveryEndPoint deliveryEndPoint,
+ ServerMessageFilter filter) {
this.topic = topic;
this.subscriberId = subscriberId;
this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
+ this.lastSeqIdConsumedUtil = lastLocalSeqIdDelivered;
this.deliveryEndPoint = deliveryEndPoint;
this.filter = filter;
+ if (preferences.hasDeliveryThrottleValue()) {
+ throttleThreshold = preferences.getDeliveryThrottleValue();
+ } else {
+ if (FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue() > 0) {
+ throttleThreshold =
+ FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue();
+ } else {
+ throttleThreshold = UNLIMITED;
+ }
+ }
}
public void setNotConnected() {
@@ -340,16 +403,71 @@ public class FIFODeliveryManager impleme
this.lastScanErrorTime = lastScanErrorTime;
}
+ /**
+ * Clear the last scan error time so it could be retry immediately.
+ */
+ protected void clearLastScanErrorTime() {
+ this.lastScanErrorTime = -1;
+ }
+
protected boolean isConnected() {
return connected;
}
+ protected void messageConsumed(long newSeqIdConsumed) {
+ if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
+ return;
+ }
+ if (logger.isDebugEnabled()) {
+ logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
+ va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
+ }
+ lastSeqIdConsumedUtil = newSeqIdConsumed;
+ // after updated seq id check whether it still exceed msg limitation
+ if (msgLimitExceeded()) {
+ return;
+ }
+ if (isThrottled) {
+ isThrottled = false;
+ logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
+ va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
+
+ enqueueWithoutFailure(new DeliveryManagerRequest() {
+ @Override
+ public void performRequest() {
+ // enqueue
+ clearRetryDelayForSubscriber(ActiveSubscriberState.this);
+ }
+ });
+ }
+ }
+
+ protected boolean msgLimitExceeded() {
+ if (throttleThreshold == UNLIMITED) {
+ return false;
+ }
+ if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= throttleThreshold) {
+ return true;
+ }
+ return false;
+ }
+
public void deliverNextMessage() {
if (!isConnected()) {
return;
}
+ // check whether we have delivered enough messages without receiving their consumes
+ if (msgLimitExceeded()) {
+ logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.",
+ va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
+ isThrottled = true;
+ // do nothing, since the delivery process would be throttled.
+ // After message consumed, it would be added back to retry queue.
+ return;
+ }
+
localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Thu Oct 4 13:30:31 2012
@@ -260,7 +260,9 @@ public class SubscribeHandler extends Ba
MessageSeqId lastConsumedSeqId = subData.getState().getMsgId();
MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(lastConsumedSeqId).setLocalComponent(
lastConsumedSeqId.getLocalComponent() + 1).build();
- deliveryMgr.startServingSubscription(topic, subscriberId, seqIdToStartFrom,
+ deliveryMgr.startServingSubscription(topic, subscriberId,
+ subData.getPreferences(),
+ seqIdToStartFrom,
new ChannelEndPoint(channel), filter);
}
}, null);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Thu Oct 4 13:30:31 2012
@@ -575,7 +575,10 @@ public abstract class AbstractSubscripti
}
cb.operationFinished(ctx, null);
}
-
+ // tell delivery manage about the consume event
+ if (null != dm) {
+ dm.messageConsumed(topic, subscriberId, consumeSeqId);
+ }
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Thu Oct 4 13:30:31 2012
@@ -136,6 +136,14 @@ public class InMemorySubscriptionState {
changed = true;
}
}
+ if (preferences.hasDeliveryThrottleValue()) {
+ if (!subscriptionPreferences.hasDeliveryThrottleValue() ||
+ subscriptionPreferences.getDeliveryThrottleValue() !=
+ preferences.getDeliveryThrottleValue()) {
+ newPreferencesBuilder.setDeliveryThrottleValue(preferences.getDeliveryThrottleValue());
+ changed = true;
+ }
+ }
if (preferences.hasOptions()) {
Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(subscriptionPreferences);
Map<String, ByteString> optUpdates = SubscriptionStateUtils.buildUserOptions(preferences);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Thu Oct 4 13:30:31 2012
@@ -577,10 +577,14 @@ public class TestBackwardCompat extends
}
ClientCurrent() {
+ this(true);
+ }
+
+ ClientCurrent(final boolean autoConsumeEnabled) {
conf = new org.apache.hedwig.client.conf.ClientConfiguration() {
@Override
public boolean isAutoSendConsumeMessageEnabled() {
- return true;
+ return autoConsumeEnabled;
}
@Override
public int getConsumedMessagesBufferSize() {
@@ -756,6 +760,65 @@ public class TestBackwardCompat extends
subscriber.stopDelivery(topic, subscriberId);
}
+ // throttle doesn't work talking with 41 server
+ void throttleX41(ByteString topic, ByteString subid, final int X)
+ throws Exception {
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
+ org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
+ .setDeliveryThrottleValue(X) .build();
+ subscribe(topic, subid, options);
+ closeSubscription(topic, subid);
+ publishInts(topic, 1, 3*X);
+ subscribe(topic, subid);
+
+ final AtomicInteger expected = new AtomicInteger(1);
+ final CountDownLatch throttleLatch = new CountDownLatch(1);
+ final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
+ subscriber.startDelivery(topic, subid, new org.apache.hedwig.client.api.MessageHandler() {
+ @Override
+ public synchronized void deliver(ByteString topic, ByteString subscriberId,
+ org.apache.hedwig.protocol.PubSubProtocol.Message msg,
+ org.apache.hedwig.util.Callback<Void> callback, Object context) {
+ try {
+ int value = Integer.valueOf(msg.getBody().toStringUtf8());
+ logger.debug("Received message {},", value);
+
+ if (value == expected.get()) {
+ expected.incrementAndGet();
+ } else {
+ // error condition
+ logger.error("Did not receive expected value, expected {}, got {}",
+ expected.get(), value);
+ expected.set(0);
+ throttleLatch.countDown();
+ nonThrottleLatch.countDown();
+ }
+ if (expected.get() > X+1) {
+ throttleLatch.countDown();
+ }
+ if (expected.get() == (3 * X + 1)) {
+ nonThrottleLatch.countDown();
+ }
+ callback.operationFinished(context, null);
+ } catch (Exception e) {
+ logger.error("Received bad message", e);
+ throttleLatch.countDown();
+ nonThrottleLatch.countDown();
+ }
+ }
+ });
+ assertTrue("Should Receive more messages than throttle value " + X,
+ throttleLatch.await(10, TimeUnit.SECONDS));
+
+ assertTrue("Timed out waiting for messages " + (3*X + 1),
+ nonThrottleLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected message with " + (3*X + 1),
+ 3*X + 1, expected.get());
+
+ subscriber.stopDelivery(topic, subid);
+ closeSubscription(topic, subid);
+ }
}
/**
@@ -1029,4 +1092,34 @@ public class TestBackwardCompat extends
// stop bookkeeper cluster
bkc410.stop();
}
+
+ /**
+ * Test compatability between version 4.1.0 and the current version.
+ *
+ * Server side throttling does't work when current client connects to old version
+ * server.
+ */
+ @Test
+ public void testServerSideThrottleCompat410() throws Exception {
+ ByteString topic = ByteString.copyFromUtf8("TestServerSideThrottleCompat410");
+ ByteString subid = ByteString.copyFromUtf8("mysub");
+
+ // start bookkeeper
+ BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
+ bkc410.start();
+
+ // start hub server 410
+ Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString());
+ s410.start();
+
+ ClientCurrent ccur = new ClientCurrent(false);
+ ccur.throttleX41(topic, subid, 10);
+
+ ccur.close();
+
+ // stop 410 server
+ s410.stop();
+ // stop bookkeeper cluster
+ bkc410.stop();
+ }
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Thu Oct 4 13:30:31 2012
@@ -24,6 +24,7 @@ import com.google.protobuf.ByteString;
import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
public class StubDeliveryManager implements DeliveryManager {
@@ -34,8 +35,11 @@ public class StubDeliveryManager impleme
public DeliveryEndPoint endPoint;
public ServerMessageFilter filter;
- public StartServingRequest(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, ServerMessageFilter filter) {
+ public StartServingRequest(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences,
+ MessageSeqId seqIdToStartFrom,
+ DeliveryEndPoint endPoint,
+ ServerMessageFilter filter) {
this.topic = topic;
this.subscriberId = subscriberId;
this.seqIdToStartFrom = seqIdToStartFrom;
@@ -48,9 +52,13 @@ public class StubDeliveryManager impleme
public Queue<Object> lastRequest = new LinkedList<Object>();
@Override
- public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, ServerMessageFilter filter) {
- lastRequest.add(new StartServingRequest(topic, subscriberId, seqIdToStartFrom, endPoint, filter));
+ public void startServingSubscription(ByteString topic, ByteString subscriberId,
+ SubscriptionPreferences preferences,
+ MessageSeqId seqIdToStartFrom,
+ DeliveryEndPoint endPoint,
+ ServerMessageFilter filter) {
+ lastRequest.add(new StartServingRequest(topic, subscriberId, preferences,
+ seqIdToStartFrom, endPoint, filter));
}
@Override
@@ -59,6 +67,12 @@ public class StubDeliveryManager impleme
}
@Override
+ public void messageConsumed(ByteString topic, ByteString subscriberId,
+ MessageSeqId seqId) {
+ // do nothing
+ }
+
+ @Override
public void start() {
}
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1394066&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java Thu Oct 4 13:30:31 2012
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+
+public class TestThrottlingDelivery extends HedwigHubTestBase {
+
+ private static final int DEFAULT_THROTTLE_VALUE = 10;
+
+ protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
+
+ ThrottleDeliveryServerConfiguration(int serverPort, int sslServerPort) {
+ super(serverPort, sslServerPort);
+ }
+
+ @Override
+ public int getDefaultDeliveryThrottleValue() {
+ return DEFAULT_THROTTLE_VALUE;
+ }
+ }
+
+ protected class ThrottleDeliveryClientConfiguration extends ClientConfiguration {
+
+ int throttleValue;
+
+ ThrottleDeliveryClientConfiguration() {
+ this(DEFAULT_THROTTLE_VALUE);
+ }
+
+ ThrottleDeliveryClientConfiguration(int throttleValue) {
+ this.throttleValue = throttleValue;
+ }
+
+ @Override
+ public int getMaximumOutstandingMessages() {
+ return throttleValue;
+ }
+
+ void setThrottleValue(int throttleValue) {
+ this.throttleValue = throttleValue;
+ }
+
+ @Override
+ public boolean isAutoSendConsumeMessageEnabled() {
+ return false;
+ }
+ }
+
+ private void throttleX(Publisher pub, final Subscriber sub,
+ ByteString topic, ByteString subid,
+ final int X) throws Exception {
+ for (int i=1; i<=3*X; i++) {
+ pub.publish(topic, Message.newBuilder().setBody(
+ ByteString.copyFromUtf8(String.valueOf(i))).build());
+ }
+ sub.subscribe(topic, subid, CreateOrAttach.ATTACH);
+
+ final AtomicInteger expected = new AtomicInteger(1);
+ final CountDownLatch throttleLatch = new CountDownLatch(1);
+ final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
+ sub.startDelivery(topic, subid, new MessageHandler() {
+ @Override
+ public synchronized void deliver(ByteString topic, ByteString subscriberId,
+ Message msg,
+ Callback<Void> callback, Object context) {
+ try {
+ int value = Integer.valueOf(msg.getBody().toStringUtf8());
+ logger.debug("Received message {},", value);
+
+ if (value == expected.get()) {
+ expected.incrementAndGet();
+ } else {
+ // error condition
+ logger.error("Did not receive expected value, expected {}, got {}",
+ expected.get(), value);
+ expected.set(0);
+ throttleLatch.countDown();
+ nonThrottleLatch.countDown();
+ }
+ if (expected.get() > X+1) {
+ throttleLatch.countDown();
+ }
+ if (expected.get() == (3 * X + 1)) {
+ nonThrottleLatch.countDown();
+ }
+ callback.operationFinished(context, null);
+ if (expected.get() > X + 1) {
+ sub.consume(topic, subscriberId, msg.getMsgId());
+ }
+ } catch (Exception e) {
+ logger.error("Received bad message", e);
+ throttleLatch.countDown();
+ nonThrottleLatch.countDown();
+ }
+ }
+ });
+ assertFalse("Received more messages than throttle value " + X,
+ throttleLatch.await(3, TimeUnit.SECONDS));
+ assertEquals("Should be expected messages with only " + (X+1), X+1, expected.get());
+
+ // consume messages to not throttle it
+ for (int i=1; i<=X; i++) {
+ sub.consume(topic, subid,
+ MessageSeqId.newBuilder().setLocalComponent(i).build());
+ }
+
+ assertTrue("Timed out waiting for messages " + (3*X + 1),
+ nonThrottleLatch.await(10, TimeUnit.SECONDS));
+ assertEquals("Should be expected message with " + (3*X + 1),
+ 3*X + 1, expected.get());
+
+ sub.stopDelivery(topic, subid);
+ sub.closeSubscription(topic, subid);
+ }
+
+ @Override
+ @Before
+ public void setUp() throws Exception {
+ numServers = 1;
+ super.setUp();
+ }
+
+ @Override
+ protected ServerConfiguration getServerConfiguration(int port, int sslPort) {
+ return new ThrottleDeliveryServerConfiguration(port, sslPort);
+ }
+
+ @Test
+ public void testServerSideThrottle() throws Exception {
+ int throttleValue = DEFAULT_THROTTLE_VALUE;
+ ThrottleDeliveryClientConfiguration conf =
+ new ThrottleDeliveryClientConfiguration();
+ HedwigClient client = new HedwigClient(conf);
+ Publisher pub = client.getPublisher();
+ Subscriber sub = client.getSubscriber();
+
+ ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle");
+ ByteString subid = ByteString.copyFromUtf8("serverThrottleSub");
+ sub.subscribe(topic, subid, CreateOrAttach.CREATE);
+ sub.closeSubscription(topic, subid);
+
+ // throttle with hub server's setting
+ throttleX(pub, sub, topic, subid, DEFAULT_THROTTLE_VALUE);
+
+ throttleValue = DEFAULT_THROTTLE_VALUE / 2;
+ // throttle with a lower value than hub server's setting
+ SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE)
+ .setDeliveryThrottleValue(throttleValue);
+ topic = ByteString.copyFromUtf8("testServerSideThrottleWithLowerValue");
+ sub.subscribe(topic, subid, optionsBuilder.build());
+ sub.closeSubscription(topic, subid);
+ throttleX(pub, sub, topic, subid, throttleValue);
+
+ throttleValue = DEFAULT_THROTTLE_VALUE + 5;
+ // throttle with a higher value than hub server's setting
+ optionsBuilder = SubscriptionOptions.newBuilder()
+ .setCreateOrAttach(CreateOrAttach.CREATE)
+ .setDeliveryThrottleValue(throttleValue);
+ topic = ByteString.copyFromUtf8("testServerSideThrottleWithHigherValue");
+ sub.subscribe(topic, subid, optionsBuilder.build());
+ sub.closeSubscription(topic, subid);
+ throttleX(pub, sub, topic, subid, throttleValue);
+
+ client.close();
+ }
+
+}