You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/10/10 11:45:30 UTC
svn commit: r1396524 - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/netty/
hedwig-protocol/src/main/java/org/apache/hedwig/protocol/
hedwig-protocol/src/main/protobuf/
hedwig-server/src/main/java/org/apache/he...
Author: sijie
Date: Wed Oct 10 09:45:29 2012
New Revision: 1396524
URL: http://svn.apache.org/viewvc?rev=1396524&view=rev
Log:
BOOKKEEPER-415: Rename DeliveryThrottle to MessageWindowSize (ivank via sijie)
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Oct 10 09:45:29 2012
@@ -174,6 +174,8 @@ Trunk (unreleased changes)
BOOKKEEPER-367: Server-Side Message Delivery Flow Control (sijie via ivank)
+ BOOKKEEPER-415: Rename DeliveryThrottle to MessageWindowSize (ivank via sijie)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Wed Oct 10 09:45:29 2012
@@ -192,9 +192,9 @@ public class NetUtils {
preferencesBuilder.setOptions(options.getOptions());
}
- // set delivery throttle value
- if (options.getDeliveryThrottleValue() > 0) {
- preferencesBuilder.setDeliveryThrottleValue(options.getDeliveryThrottleValue());
+ // set message window size if set
+ if (options.hasMessageWindowSize() && options.getMessageWindowSize() > 0) {
+ preferencesBuilder.setMessageWindowSize(options.getMessageWindowSize());
}
return preferencesBuilder;
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Wed Oct 10 09:45:29 2012
@@ -5705,9 +5705,9 @@ public final class PubSubProtocol {
boolean hasMessageFilter();
String getMessageFilter();
- // optional uint32 deliveryThrottleValue = 4;
- boolean hasDeliveryThrottleValue();
- int getDeliveryThrottleValue();
+ // optional uint32 messageWindowSize = 4;
+ boolean hasMessageWindowSize();
+ int getMessageWindowSize();
}
public static final class SubscriptionPreferences extends
com.google.protobuf.GeneratedMessage
@@ -5793,21 +5793,21 @@ public final class PubSubProtocol {
}
}
- // optional uint32 deliveryThrottleValue = 4;
- public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 4;
- private int deliveryThrottleValue_;
- public boolean hasDeliveryThrottleValue() {
+ // optional uint32 messageWindowSize = 4;
+ public static final int MESSAGEWINDOWSIZE_FIELD_NUMBER = 4;
+ private int messageWindowSize_;
+ public boolean hasMessageWindowSize() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
- public int getDeliveryThrottleValue() {
- return deliveryThrottleValue_;
+ public int getMessageWindowSize() {
+ return messageWindowSize_;
}
private void initFields() {
options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
messageBound_ = 0;
messageFilter_ = "";
- deliveryThrottleValue_ = 0;
+ messageWindowSize_ = 0;
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -5831,7 +5831,7 @@ public final class PubSubProtocol {
output.writeBytes(3, getMessageFilterBytes());
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
- output.writeUInt32(4, deliveryThrottleValue_);
+ output.writeUInt32(4, messageWindowSize_);
}
getUnknownFields().writeTo(output);
}
@@ -5856,7 +5856,7 @@ public final class PubSubProtocol {
}
if (((bitField0_ & 0x00000008) == 0x00000008)) {
size += com.google.protobuf.CodedOutputStream
- .computeUInt32Size(4, deliveryThrottleValue_);
+ .computeUInt32Size(4, messageWindowSize_);
}
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
@@ -5993,7 +5993,7 @@ public final class PubSubProtocol {
bitField0_ = (bitField0_ & ~0x00000002);
messageFilter_ = "";
bitField0_ = (bitField0_ & ~0x00000004);
- deliveryThrottleValue_ = 0;
+ messageWindowSize_ = 0;
bitField0_ = (bitField0_ & ~0x00000008);
return this;
}
@@ -6052,7 +6052,7 @@ public final class PubSubProtocol {
if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
to_bitField0_ |= 0x00000008;
}
- result.deliveryThrottleValue_ = deliveryThrottleValue_;
+ result.messageWindowSize_ = messageWindowSize_;
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -6078,8 +6078,8 @@ public final class PubSubProtocol {
if (other.hasMessageFilter()) {
setMessageFilter(other.getMessageFilter());
}
- if (other.hasDeliveryThrottleValue()) {
- setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+ if (other.hasMessageWindowSize()) {
+ setMessageWindowSize(other.getMessageWindowSize());
}
this.mergeUnknownFields(other.getUnknownFields());
return this;
@@ -6133,7 +6133,7 @@ public final class PubSubProtocol {
}
case 32: {
bitField0_ |= 0x00000008;
- deliveryThrottleValue_ = input.readUInt32();
+ messageWindowSize_ = input.readUInt32();
break;
}
}
@@ -6289,23 +6289,23 @@ public final class PubSubProtocol {
onChanged();
}
- // optional uint32 deliveryThrottleValue = 4;
- private int deliveryThrottleValue_ ;
- public boolean hasDeliveryThrottleValue() {
+ // optional uint32 messageWindowSize = 4;
+ private int messageWindowSize_ ;
+ public boolean hasMessageWindowSize() {
return ((bitField0_ & 0x00000008) == 0x00000008);
}
- public int getDeliveryThrottleValue() {
- return deliveryThrottleValue_;
+ public int getMessageWindowSize() {
+ return messageWindowSize_;
}
- public Builder setDeliveryThrottleValue(int value) {
+ public Builder setMessageWindowSize(int value) {
bitField0_ |= 0x00000008;
- deliveryThrottleValue_ = value;
+ messageWindowSize_ = value;
onChanged();
return this;
}
- public Builder clearDeliveryThrottleValue() {
+ public Builder clearMessageWindowSize() {
bitField0_ = (bitField0_ & ~0x00000008);
- deliveryThrottleValue_ = 0;
+ messageWindowSize_ = 0;
onChanged();
return this;
}
@@ -7146,9 +7146,9 @@ public final class PubSubProtocol {
boolean hasMessageFilter();
String getMessageFilter();
- // optional uint32 deliveryThrottleValue = 6;
- boolean hasDeliveryThrottleValue();
- int getDeliveryThrottleValue();
+ // optional uint32 messageWindowSize = 6;
+ boolean hasMessageWindowSize();
+ int getMessageWindowSize();
// optional bool enableResubscribe = 7 [default = true];
boolean hasEnableResubscribe();
@@ -7258,14 +7258,14 @@ public final class PubSubProtocol {
}
}
- // optional uint32 deliveryThrottleValue = 6;
- public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 6;
- private int deliveryThrottleValue_;
- public boolean hasDeliveryThrottleValue() {
+ // optional uint32 messageWindowSize = 6;
+ public static final int MESSAGEWINDOWSIZE_FIELD_NUMBER = 6;
+ private int messageWindowSize_;
+ public boolean hasMessageWindowSize() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
- public int getDeliveryThrottleValue() {
- return deliveryThrottleValue_;
+ public int getMessageWindowSize() {
+ return messageWindowSize_;
}
// optional bool enableResubscribe = 7 [default = true];
@@ -7284,7 +7284,7 @@ public final class PubSubProtocol {
messageBound_ = 0;
options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
messageFilter_ = "";
- deliveryThrottleValue_ = 0;
+ messageWindowSize_ = 0;
enableResubscribe_ = true;
}
private byte memoizedIsInitialized = -1;
@@ -7315,7 +7315,7 @@ public final class PubSubProtocol {
output.writeBytes(5, getMessageFilterBytes());
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
- output.writeUInt32(6, deliveryThrottleValue_);
+ output.writeUInt32(6, messageWindowSize_);
}
if (((bitField0_ & 0x00000040) == 0x00000040)) {
output.writeBool(7, enableResubscribe_);
@@ -7351,7 +7351,7 @@ public final class PubSubProtocol {
}
if (((bitField0_ & 0x00000020) == 0x00000020)) {
size += com.google.protobuf.CodedOutputStream
- .computeUInt32Size(6, deliveryThrottleValue_);
+ .computeUInt32Size(6, messageWindowSize_);
}
if (((bitField0_ & 0x00000040) == 0x00000040)) {
size += com.google.protobuf.CodedOutputStream
@@ -7496,7 +7496,7 @@ public final class PubSubProtocol {
bitField0_ = (bitField0_ & ~0x00000008);
messageFilter_ = "";
bitField0_ = (bitField0_ & ~0x00000010);
- deliveryThrottleValue_ = 0;
+ messageWindowSize_ = 0;
bitField0_ = (bitField0_ & ~0x00000020);
enableResubscribe_ = true;
bitField0_ = (bitField0_ & ~0x00000040);
@@ -7565,7 +7565,7 @@ public final class PubSubProtocol {
if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
to_bitField0_ |= 0x00000020;
}
- result.deliveryThrottleValue_ = deliveryThrottleValue_;
+ result.messageWindowSize_ = messageWindowSize_;
if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
to_bitField0_ |= 0x00000040;
}
@@ -7601,8 +7601,8 @@ public final class PubSubProtocol {
if (other.hasMessageFilter()) {
setMessageFilter(other.getMessageFilter());
}
- if (other.hasDeliveryThrottleValue()) {
- setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+ if (other.hasMessageWindowSize()) {
+ setMessageWindowSize(other.getMessageWindowSize());
}
if (other.hasEnableResubscribe()) {
setEnableResubscribe(other.getEnableResubscribe());
@@ -7675,7 +7675,7 @@ public final class PubSubProtocol {
}
case 48: {
bitField0_ |= 0x00000020;
- deliveryThrottleValue_ = input.readUInt32();
+ messageWindowSize_ = input.readUInt32();
break;
}
case 56: {
@@ -7881,23 +7881,23 @@ public final class PubSubProtocol {
onChanged();
}
- // optional uint32 deliveryThrottleValue = 6;
- private int deliveryThrottleValue_ ;
- public boolean hasDeliveryThrottleValue() {
+ // optional uint32 messageWindowSize = 6;
+ private int messageWindowSize_ ;
+ public boolean hasMessageWindowSize() {
return ((bitField0_ & 0x00000020) == 0x00000020);
}
- public int getDeliveryThrottleValue() {
- return deliveryThrottleValue_;
+ public int getMessageWindowSize() {
+ return messageWindowSize_;
}
- public Builder setDeliveryThrottleValue(int value) {
+ public Builder setMessageWindowSize(int value) {
bitField0_ |= 0x00000020;
- deliveryThrottleValue_ = value;
+ messageWindowSize_ = value;
onChanged();
return this;
}
- public Builder clearDeliveryThrottleValue() {
+ public Builder clearMessageWindowSize() {
bitField0_ = (bitField0_ & ~0x00000020);
- deliveryThrottleValue_ = 0;
+ messageWindowSize_ = 0;
onChanged();
return this;
}
@@ -15496,75 +15496,74 @@ public final class PubSubProtocol {
"\033.Hedwig.StopDeliveryRequest\022:\n\024startDel" +
"iveryRequest\0309 \001(\0132\034.Hedwig.StartDeliver" +
"yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
- "\017.Hedwig.Message\"\203\001\n\027SubscriptionPrefere" +
- "nces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014me" +
- "ssageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022" +
- "\035\n\025deliveryThrottleValue\030\004 \001(\r\"\277\002\n\020Subsc" +
- "ribeRequest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016cre",
- "ateOrAttach\030\003 \001(\0162\'.Hedwig.SubscribeRequ" +
- "est.CreateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013" +
- "synchronous\030\004 \001(\010:\005false\022\024\n\014messageBound" +
- "\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Sub" +
- "scriptionPreferences\022\032\n\013forceAttach\030\007 \001(" +
- "\010:\005false\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n" +
- "\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\222\002\n\023Sub" +
- "scriptionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005f" +
- "alse\022Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Su" +
- "bscribeRequest.CreateOrAttach:\020CREATE_OR",
- "_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007opti" +
- "ons\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030" +
- "\005 \001(\t\022\035\n\025deliveryThrottleValue\030\006 \001(\r\022\037\n\021" +
- "enableResubscribe\030\007 \001(\010:\004true\"K\n\016Consume" +
- "Request\022\024\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003" +
- " \002(\0132\024.Hedwig.MessageSeqId\"*\n\022Unsubscrib" +
- "eRequest\022\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDe" +
- "liveryRequest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024S" +
- "tartDeliveryRequest\022\024\n\014subscriberId\030\002 \002(" +
- "\014\"\377\001\n\016PubSubResponse\0220\n\017protocolVersion\030",
- "\001 \002(\0162\027.Hedwig.ProtocolVersion\022&\n\nstatus" +
- "Code\030\002 \002(\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030" +
- "\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(" +
- "\0132\017.Hedwig.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014sub" +
- "scriberId\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024." +
- "Hedwig.ResponseBody\"?\n\017PublishResponse\022," +
- "\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig.MessageS" +
- "eqId\"I\n\021SubscribeResponse\0224\n\013preferences" +
- "\030\002 \001(\0132\037.Hedwig.SubscriptionPreferences\"" +
- "v\n\014ResponseBody\0220\n\017publishResponse\030\001 \001(\013",
- "2\027.Hedwig.PublishResponse\0224\n\021subscribeRe" +
- "sponse\030\002 \001(\0132\031.Hedwig.SubscribeResponse\"" +
- "N\n\021SubscriptionState\022#\n\005msgId\030\001 \002(\0132\024.He" +
- "dwig.MessageSeqId\022\024\n\014messageBound\030\002 \001(\r\"" +
- "r\n\020SubscriptionData\022(\n\005state\030\001 \001(\0132\031.Hed" +
- "wig.SubscriptionState\0224\n\013preferences\030\002 \001" +
- "(\0132\037.Hedwig.SubscriptionPreferences\"O\n\013L" +
- "edgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqId" +
- "Included\030\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014" +
- "LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.Le",
- "dgerRange\":\n\013ManagerMeta\022\023\n\013managerImpl\030" +
- "\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n\013HubInfo" +
- "Data\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n" +
- "\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017Proto" +
- "colVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOperation" +
- "Type\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONS" +
- "UME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY" +
- "\020\004\022\021\n\rSTOP_DELIVERY\020\005*D\n\021SubscriptionEve" +
- "nt\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORC" +
- "ED_CLOSED\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022",
- "\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC" +
- "\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025C" +
- "LIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CON" +
- "NECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSI" +
- "BLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017U" +
- "NCERTAIN_STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FIL" +
- "TER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PER" +
- "SISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_IN" +
- "FO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004" +
- "\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_T",
- "OPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_E" +
- "XISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tC" +
- "OMPOSITE\020\274\005B\036\n\032org.apache.hedwig.protoco" +
- "lH\001"
+ "\017.Hedwig.Message\"\177\n\027SubscriptionPreferen" +
+ "ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
+ "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022\031" +
+ "\n\021messageWindowSize\030\004 \001(\r\"\277\002\n\020SubscribeR" +
+ "equest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016createOr",
+ "Attach\030\003 \001(\0162\'.Hedwig.SubscribeRequest.C" +
+ "reateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013synch" +
+ "ronous\030\004 \001(\010:\005false\022\024\n\014messageBound\030\005 \001(" +
+ "\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Subscrip" +
+ "tionPreferences\022\032\n\013forceAttach\030\007 \001(\010:\005fa" +
+ "lse\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n\n\006ATT" +
+ "ACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\216\002\n\023Subscrip" +
+ "tionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005false\022" +
+ "Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Subscri" +
+ "beRequest.CreateOrAttach:\020CREATE_OR_ATTA",
+ "CH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007options\030\004" +
+ " \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030\005 \001(\t" +
+ "\022\031\n\021messageWindowSize\030\006 \001(\r\022\037\n\021enableRes" +
+ "ubscribe\030\007 \001(\010:\004true\"K\n\016ConsumeRequest\022\024" +
+ "\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.He" +
+ "dwig.MessageSeqId\"*\n\022UnsubscribeRequest\022" +
+ "\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDeliveryReq" +
+ "uest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024StartDeliv" +
+ "eryRequest\022\024\n\014subscriberId\030\002 \002(\014\"\377\001\n\016Pub" +
+ "SubResponse\0220\n\017protocolVersion\030\001 \002(\0162\027.H",
+ "edwig.ProtocolVersion\022&\n\nstatusCode\030\002 \002(" +
+ "\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\t" +
+ "statusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(\0132\017.Hedwi" +
+ "g.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014subscriberId" +
+ "\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024.Hedwig.Re" +
+ "sponseBody\"?\n\017PublishResponse\022,\n\016publish" +
+ "edMsgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\"I\n\021S" +
+ "ubscribeResponse\0224\n\013preferences\030\002 \001(\0132\037." +
+ "Hedwig.SubscriptionPreferences\"v\n\014Respon" +
+ "seBody\0220\n\017publishResponse\030\001 \001(\0132\027.Hedwig",
+ ".PublishResponse\0224\n\021subscribeResponse\030\002 " +
+ "\001(\0132\031.Hedwig.SubscribeResponse\"N\n\021Subscr" +
+ "iptionState\022#\n\005msgId\030\001 \002(\0132\024.Hedwig.Mess" +
+ "ageSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscr" +
+ "iptionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subsc" +
+ "riptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedw" +
+ "ig.SubscriptionPreferences\"O\n\013LedgerRang" +
+ "e\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030" +
+ "\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRan" +
+ "ges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange",
+ "\":\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016" +
+ "managerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010h" +
+ "ostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadD" +
+ "ata\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersio" +
+ "n\022\017\n\013VERSION_ONE\020\001*p\n\rOperationType\022\013\n\007P" +
+ "UBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013" +
+ "UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTO" +
+ "P_DELIVERY\020\005*D\n\021SubscriptionEvent\022\017\n\013TOP" +
+ "IC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORCED_CLOSED" +
+ "\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFOR",
+ "MED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CL" +
+ "IENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT" +
+ "_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017" +
+ "\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_T" +
+ "OPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_" +
+ "STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FILTER\020\370\003\022\020\n" +
+ "\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_" +
+ "INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_INFO_EXISTS" +
+ "\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004\022\036\n\031SUBSC" +
+ "RIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNE",
+ "R_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_EXISTS\020\216\004\022" +
+ "\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020" +
+ "\274\005B\036\n\032org.apache.hedwig.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15640,7 +15639,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_SubscriptionPreferences_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionPreferences_descriptor,
- new java.lang.String[] { "Options", "MessageBound", "MessageFilter", "DeliveryThrottleValue", },
+ new java.lang.String[] { "Options", "MessageBound", "MessageFilter", "MessageWindowSize", },
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder.class);
internal_static_Hedwig_SubscribeRequest_descriptor =
@@ -15656,7 +15655,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_SubscriptionOptions_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionOptions_descriptor,
- new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "DeliveryThrottleValue", "EnableResubscribe", },
+ new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "MessageWindowSize", "EnableResubscribe", },
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.Builder.class);
internal_static_Hedwig_ConsumeRequest_descriptor =
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Wed Oct 10 09:45:29 2012
@@ -117,8 +117,9 @@ message SubscriptionPreferences {
optional uint32 messageBound = 2;
// server-side message filter
optional string messageFilter = 3;
- // server-side delivery throttle value
- optional uint32 deliveryThrottleValue = 4;
+ // message window size, this is the maximum number of messages
+ // which will be delivered without being consumed
+ optional uint32 messageWindowSize = 4;
}
message SubscribeRequest{
@@ -156,8 +157,9 @@ message SubscriptionOptions {
optional Map options = 4;
// server-side message filter
optional string messageFilter = 5;
- // server-side delivery throttle value
- optional uint32 deliveryThrottleValue = 6;
+ // message window size, this is the maximum number of messages
+ // which will be delivered without being consumed
+ optional uint32 messageWindowSize = 6;
// enable resubscribe
optional bool enableResubscribe = 7 [default = true];
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Wed Oct 10 09:45:29 2012
@@ -61,8 +61,8 @@ public class ServerConfiguration extends
protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
- protected final static String DEFAULT_DELIVERY_THROTTLE_VALUE =
- "default_delivery_throttle_value";
+ protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
+ "default_message_window_size";
protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
@@ -285,11 +285,11 @@ public class ServerConfiguration extends
return conf.getInt(RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL, 120000);
}
- // This parameter is for setting the default maximum number of messages delivered
- // to a subscriber without being consumed.
- // we throttle delivery messages to a subscriber when reaching the threshold.
- public int getDefaultDeliveryThrottleValue() {
- return conf.getInt(DEFAULT_DELIVERY_THROTTLE_VALUE, 0);
+ // This parameter is for setting the default maximum number of messages which
+ // can be delivered to a subscriber without being consumed.
+ // we pause messages delivery to a subscriber when reaching the window size
+ public int getDefaultMessageWindowSize() {
+ return conf.getInt(DEFAULT_MESSAGE_WINDOW_SIZE, 0);
}
// This parameter is used when Bookkeeper is the persistence store
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Wed Oct 10 09:45:29 2012
@@ -347,7 +347,7 @@ public class FIFODeliveryManager impleme
long lastSeqIdCommunicatedExternally;
long lastSeqIdConsumedUtil;
boolean isThrottled = false;
- final int throttleThreshold;
+ final int messageWindowSize;
ServerMessageFilter filter;
// TODO make use of these variables
@@ -364,14 +364,14 @@ public class FIFODeliveryManager impleme
this.lastSeqIdConsumedUtil = lastLocalSeqIdDelivered;
this.deliveryEndPoint = deliveryEndPoint;
this.filter = filter;
- if (preferences.hasDeliveryThrottleValue()) {
- throttleThreshold = preferences.getDeliveryThrottleValue();
+ if (preferences.hasMessageWindowSize()) {
+ messageWindowSize = preferences.getMessageWindowSize();
} else {
- if (FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue() > 0) {
- throttleThreshold =
- FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue();
+ if (FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize() > 0) {
+ messageWindowSize =
+ FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize();
} else {
- throttleThreshold = UNLIMITED;
+ messageWindowSize = UNLIMITED;
}
}
}
@@ -443,10 +443,10 @@ public class FIFODeliveryManager impleme
}
protected boolean msgLimitExceeded() {
- if (throttleThreshold == UNLIMITED) {
+ if (messageWindowSize == UNLIMITED) {
return false;
}
- if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= throttleThreshold) {
+ if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= messageWindowSize) {
return true;
}
return false;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Wed Oct 10 09:45:29 2012
@@ -136,11 +136,11 @@ public class InMemorySubscriptionState {
changed = true;
}
}
- if (preferences.hasDeliveryThrottleValue()) {
- if (!subscriptionPreferences.hasDeliveryThrottleValue() ||
- subscriptionPreferences.getDeliveryThrottleValue() !=
- preferences.getDeliveryThrottleValue()) {
- newPreferencesBuilder.setDeliveryThrottleValue(preferences.getDeliveryThrottleValue());
+ if (preferences.hasMessageWindowSize()) {
+ if (!subscriptionPreferences.hasMessageWindowSize() ||
+ subscriptionPreferences.getMessageWindowSize() !=
+ preferences.getMessageWindowSize()) {
+ newPreferencesBuilder.setMessageWindowSize(preferences.getMessageWindowSize());
changed = true;
}
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Wed Oct 10 09:45:29 2012
@@ -766,7 +766,7 @@ public class TestBackwardCompat extends
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
.setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
- .setDeliveryThrottleValue(X) .build();
+ .setMessageWindowSize(X) .build();
subscribe(topic, subid, options);
closeSubscription(topic, subid);
publishInts(topic, 1, 3*X);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java Wed Oct 10 09:45:29 2012
@@ -25,7 +25,9 @@ import java.util.concurrent.atomic.Atomi
import org.junit.Before;
import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
import com.google.protobuf.ByteString;
@@ -44,7 +46,7 @@ import org.apache.hedwig.util.Callback;
public class TestThrottlingDelivery extends HedwigHubTestBase {
- private static final int DEFAULT_THROTTLE_VALUE = 10;
+ private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10;
protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
@@ -53,30 +55,30 @@ public class TestThrottlingDelivery exte
}
@Override
- public int getDefaultDeliveryThrottleValue() {
- return DEFAULT_THROTTLE_VALUE;
+ public int getDefaultMessageWindowSize() {
+ return TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE;
}
}
protected class ThrottleDeliveryClientConfiguration extends ClientConfiguration {
- int throttleValue;
+ int messageWindowSize;
ThrottleDeliveryClientConfiguration() {
- this(DEFAULT_THROTTLE_VALUE);
+ this(TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE);
}
- ThrottleDeliveryClientConfiguration(int throttleValue) {
- this.throttleValue = throttleValue;
+ ThrottleDeliveryClientConfiguration(int messageWindowSize) {
+ this.messageWindowSize = messageWindowSize;
}
@Override
public int getMaximumOutstandingMessages() {
- return throttleValue;
+ return messageWindowSize;
}
- void setThrottleValue(int throttleValue) {
- this.throttleValue = throttleValue;
+ void setMessageWindowSize(int messageWindowSize) {
+ this.messageWindowSize = messageWindowSize;
}
@Override
@@ -166,7 +168,7 @@ public class TestThrottlingDelivery exte
@Test
public void testServerSideThrottle() throws Exception {
- int throttleValue = DEFAULT_THROTTLE_VALUE;
+ int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
ThrottleDeliveryClientConfiguration conf =
new ThrottleDeliveryClientConfiguration();
HedwigClient client = new HedwigClient(conf);
@@ -179,27 +181,27 @@ public class TestThrottlingDelivery exte
sub.closeSubscription(topic, subid);
// throttle with hub server's setting
- throttleX(pub, sub, topic, subid, DEFAULT_THROTTLE_VALUE);
+ throttleX(pub, sub, topic, subid, DEFAULT_MESSAGE_WINDOW_SIZE);
- throttleValue = DEFAULT_THROTTLE_VALUE / 2;
+ messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE / 2;
// throttle with a lower value than hub server's setting
SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE)
- .setDeliveryThrottleValue(throttleValue);
+ .setMessageWindowSize(messageWindowSize);
topic = ByteString.copyFromUtf8("testServerSideThrottleWithLowerValue");
sub.subscribe(topic, subid, optionsBuilder.build());
sub.closeSubscription(topic, subid);
- throttleX(pub, sub, topic, subid, throttleValue);
+ throttleX(pub, sub, topic, subid, messageWindowSize);
- throttleValue = DEFAULT_THROTTLE_VALUE + 5;
+ messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE + 5;
// throttle with a higher value than hub server's setting
optionsBuilder = SubscriptionOptions.newBuilder()
.setCreateOrAttach(CreateOrAttach.CREATE)
- .setDeliveryThrottleValue(throttleValue);
+ .setMessageWindowSize(messageWindowSize);
topic = ByteString.copyFromUtf8("testServerSideThrottleWithHigherValue");
sub.subscribe(topic, subid, optionsBuilder.build());
sub.closeSubscription(topic, subid);
- throttleX(pub, sub, topic, subid, throttleValue);
+ throttleX(pub, sub, topic, subid, messageWindowSize);
client.close();
}