You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by si...@apache.org on 2012/10/10 11:45:30 UTC

svn commit: r1396524 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-protocol/src/main/java/org/apache/hedwig/protocol/ hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apache/he...

Author: sijie
Date: Wed Oct 10 09:45:29 2012
New Revision: 1396524

URL: http://svn.apache.org/viewvc?rev=1396524&view=rev
Log:
BOOKKEEPER-415: Rename DeliveryThrottle to MessageWindowSize (ivank via sijie)

Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Wed Oct 10 09:45:29 2012
@@ -174,6 +174,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-367: Server-Side Message Delivery Flow Control (sijie via ivank)
 
+        BOOKKEEPER-415: Rename DeliveryThrottle to MessageWindowSize (ivank via sijie)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Wed Oct 10 09:45:29 2012
@@ -192,9 +192,9 @@ public class NetUtils {
             preferencesBuilder.setOptions(options.getOptions());
         }
 
-        // set delivery throttle value
-        if (options.getDeliveryThrottleValue() > 0) {
-            preferencesBuilder.setDeliveryThrottleValue(options.getDeliveryThrottleValue());
+        // set message window size if set
+        if (options.hasMessageWindowSize() && options.getMessageWindowSize() > 0) {
+            preferencesBuilder.setMessageWindowSize(options.getMessageWindowSize());
         }
 
         return preferencesBuilder;

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Wed Oct 10 09:45:29 2012
@@ -5705,9 +5705,9 @@ public final class PubSubProtocol {
     boolean hasMessageFilter();
     String getMessageFilter();
     
-    // optional uint32 deliveryThrottleValue = 4;
-    boolean hasDeliveryThrottleValue();
-    int getDeliveryThrottleValue();
+    // optional uint32 messageWindowSize = 4;
+    boolean hasMessageWindowSize();
+    int getMessageWindowSize();
   }
   public static final class SubscriptionPreferences extends
       com.google.protobuf.GeneratedMessage
@@ -5793,21 +5793,21 @@ public final class PubSubProtocol {
       }
     }
     
-    // optional uint32 deliveryThrottleValue = 4;
-    public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 4;
-    private int deliveryThrottleValue_;
-    public boolean hasDeliveryThrottleValue() {
+    // optional uint32 messageWindowSize = 4;
+    public static final int MESSAGEWINDOWSIZE_FIELD_NUMBER = 4;
+    private int messageWindowSize_;
+    public boolean hasMessageWindowSize() {
       return ((bitField0_ & 0x00000008) == 0x00000008);
     }
-    public int getDeliveryThrottleValue() {
-      return deliveryThrottleValue_;
+    public int getMessageWindowSize() {
+      return messageWindowSize_;
     }
     
     private void initFields() {
       options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
       messageBound_ = 0;
       messageFilter_ = "";
-      deliveryThrottleValue_ = 0;
+      messageWindowSize_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5831,7 +5831,7 @@ public final class PubSubProtocol {
         output.writeBytes(3, getMessageFilterBytes());
       }
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
-        output.writeUInt32(4, deliveryThrottleValue_);
+        output.writeUInt32(4, messageWindowSize_);
       }
       getUnknownFields().writeTo(output);
     }
@@ -5856,7 +5856,7 @@ public final class PubSubProtocol {
       }
       if (((bitField0_ & 0x00000008) == 0x00000008)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeUInt32Size(4, deliveryThrottleValue_);
+          .computeUInt32Size(4, messageWindowSize_);
       }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
@@ -5993,7 +5993,7 @@ public final class PubSubProtocol {
         bitField0_ = (bitField0_ & ~0x00000002);
         messageFilter_ = "";
         bitField0_ = (bitField0_ & ~0x00000004);
-        deliveryThrottleValue_ = 0;
+        messageWindowSize_ = 0;
         bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
@@ -6052,7 +6052,7 @@ public final class PubSubProtocol {
         if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
           to_bitField0_ |= 0x00000008;
         }
-        result.deliveryThrottleValue_ = deliveryThrottleValue_;
+        result.messageWindowSize_ = messageWindowSize_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6078,8 +6078,8 @@ public final class PubSubProtocol {
         if (other.hasMessageFilter()) {
           setMessageFilter(other.getMessageFilter());
         }
-        if (other.hasDeliveryThrottleValue()) {
-          setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+        if (other.hasMessageWindowSize()) {
+          setMessageWindowSize(other.getMessageWindowSize());
         }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
@@ -6133,7 +6133,7 @@ public final class PubSubProtocol {
             }
             case 32: {
               bitField0_ |= 0x00000008;
-              deliveryThrottleValue_ = input.readUInt32();
+              messageWindowSize_ = input.readUInt32();
               break;
             }
           }
@@ -6289,23 +6289,23 @@ public final class PubSubProtocol {
         onChanged();
       }
       
-      // optional uint32 deliveryThrottleValue = 4;
-      private int deliveryThrottleValue_ ;
-      public boolean hasDeliveryThrottleValue() {
+      // optional uint32 messageWindowSize = 4;
+      private int messageWindowSize_ ;
+      public boolean hasMessageWindowSize() {
         return ((bitField0_ & 0x00000008) == 0x00000008);
       }
-      public int getDeliveryThrottleValue() {
-        return deliveryThrottleValue_;
+      public int getMessageWindowSize() {
+        return messageWindowSize_;
       }
-      public Builder setDeliveryThrottleValue(int value) {
+      public Builder setMessageWindowSize(int value) {
         bitField0_ |= 0x00000008;
-        deliveryThrottleValue_ = value;
+        messageWindowSize_ = value;
         onChanged();
         return this;
       }
-      public Builder clearDeliveryThrottleValue() {
+      public Builder clearMessageWindowSize() {
         bitField0_ = (bitField0_ & ~0x00000008);
-        deliveryThrottleValue_ = 0;
+        messageWindowSize_ = 0;
         onChanged();
         return this;
       }
@@ -7146,9 +7146,9 @@ public final class PubSubProtocol {
     boolean hasMessageFilter();
     String getMessageFilter();
     
-    // optional uint32 deliveryThrottleValue = 6;
-    boolean hasDeliveryThrottleValue();
-    int getDeliveryThrottleValue();
+    // optional uint32 messageWindowSize = 6;
+    boolean hasMessageWindowSize();
+    int getMessageWindowSize();
     
     // optional bool enableResubscribe = 7 [default = true];
     boolean hasEnableResubscribe();
@@ -7258,14 +7258,14 @@ public final class PubSubProtocol {
       }
     }
     
-    // optional uint32 deliveryThrottleValue = 6;
-    public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 6;
-    private int deliveryThrottleValue_;
-    public boolean hasDeliveryThrottleValue() {
+    // optional uint32 messageWindowSize = 6;
+    public static final int MESSAGEWINDOWSIZE_FIELD_NUMBER = 6;
+    private int messageWindowSize_;
+    public boolean hasMessageWindowSize() {
       return ((bitField0_ & 0x00000020) == 0x00000020);
     }
-    public int getDeliveryThrottleValue() {
-      return deliveryThrottleValue_;
+    public int getMessageWindowSize() {
+      return messageWindowSize_;
     }
     
     // optional bool enableResubscribe = 7 [default = true];
@@ -7284,7 +7284,7 @@ public final class PubSubProtocol {
       messageBound_ = 0;
       options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
       messageFilter_ = "";
-      deliveryThrottleValue_ = 0;
+      messageWindowSize_ = 0;
       enableResubscribe_ = true;
     }
     private byte memoizedIsInitialized = -1;
@@ -7315,7 +7315,7 @@ public final class PubSubProtocol {
         output.writeBytes(5, getMessageFilterBytes());
       }
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
-        output.writeUInt32(6, deliveryThrottleValue_);
+        output.writeUInt32(6, messageWindowSize_);
       }
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeBool(7, enableResubscribe_);
@@ -7351,7 +7351,7 @@ public final class PubSubProtocol {
       }
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         size += com.google.protobuf.CodedOutputStream
-          .computeUInt32Size(6, deliveryThrottleValue_);
+          .computeUInt32Size(6, messageWindowSize_);
       }
       if (((bitField0_ & 0x00000040) == 0x00000040)) {
         size += com.google.protobuf.CodedOutputStream
@@ -7496,7 +7496,7 @@ public final class PubSubProtocol {
         bitField0_ = (bitField0_ & ~0x00000008);
         messageFilter_ = "";
         bitField0_ = (bitField0_ & ~0x00000010);
-        deliveryThrottleValue_ = 0;
+        messageWindowSize_ = 0;
         bitField0_ = (bitField0_ & ~0x00000020);
         enableResubscribe_ = true;
         bitField0_ = (bitField0_ & ~0x00000040);
@@ -7565,7 +7565,7 @@ public final class PubSubProtocol {
         if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
           to_bitField0_ |= 0x00000020;
         }
-        result.deliveryThrottleValue_ = deliveryThrottleValue_;
+        result.messageWindowSize_ = messageWindowSize_;
         if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
           to_bitField0_ |= 0x00000040;
         }
@@ -7601,8 +7601,8 @@ public final class PubSubProtocol {
         if (other.hasMessageFilter()) {
           setMessageFilter(other.getMessageFilter());
         }
-        if (other.hasDeliveryThrottleValue()) {
-          setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+        if (other.hasMessageWindowSize()) {
+          setMessageWindowSize(other.getMessageWindowSize());
         }
         if (other.hasEnableResubscribe()) {
           setEnableResubscribe(other.getEnableResubscribe());
@@ -7675,7 +7675,7 @@ public final class PubSubProtocol {
             }
             case 48: {
               bitField0_ |= 0x00000020;
-              deliveryThrottleValue_ = input.readUInt32();
+              messageWindowSize_ = input.readUInt32();
               break;
             }
             case 56: {
@@ -7881,23 +7881,23 @@ public final class PubSubProtocol {
         onChanged();
       }
       
-      // optional uint32 deliveryThrottleValue = 6;
-      private int deliveryThrottleValue_ ;
-      public boolean hasDeliveryThrottleValue() {
+      // optional uint32 messageWindowSize = 6;
+      private int messageWindowSize_ ;
+      public boolean hasMessageWindowSize() {
         return ((bitField0_ & 0x00000020) == 0x00000020);
       }
-      public int getDeliveryThrottleValue() {
-        return deliveryThrottleValue_;
+      public int getMessageWindowSize() {
+        return messageWindowSize_;
       }
-      public Builder setDeliveryThrottleValue(int value) {
+      public Builder setMessageWindowSize(int value) {
         bitField0_ |= 0x00000020;
-        deliveryThrottleValue_ = value;
+        messageWindowSize_ = value;
         onChanged();
         return this;
       }
-      public Builder clearDeliveryThrottleValue() {
+      public Builder clearMessageWindowSize() {
         bitField0_ = (bitField0_ & ~0x00000020);
-        deliveryThrottleValue_ = 0;
+        messageWindowSize_ = 0;
         onChanged();
         return this;
       }
@@ -15496,75 +15496,74 @@ public final class PubSubProtocol {
       "\033.Hedwig.StopDeliveryRequest\022:\n\024startDel" +
       "iveryRequest\0309 \001(\0132\034.Hedwig.StartDeliver" +
       "yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
-      "\017.Hedwig.Message\"\203\001\n\027SubscriptionPrefere" +
-      "nces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014me" +
-      "ssageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022" +
-      "\035\n\025deliveryThrottleValue\030\004 \001(\r\"\277\002\n\020Subsc" +
-      "ribeRequest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016cre",
-      "ateOrAttach\030\003 \001(\0162\'.Hedwig.SubscribeRequ" +
-      "est.CreateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013" +
-      "synchronous\030\004 \001(\010:\005false\022\024\n\014messageBound" +
-      "\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Sub" +
-      "scriptionPreferences\022\032\n\013forceAttach\030\007 \001(" +
-      "\010:\005false\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n" +
-      "\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\222\002\n\023Sub" +
-      "scriptionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005f" +
-      "alse\022Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Su" +
-      "bscribeRequest.CreateOrAttach:\020CREATE_OR",
-      "_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007opti" +
-      "ons\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030" +
-      "\005 \001(\t\022\035\n\025deliveryThrottleValue\030\006 \001(\r\022\037\n\021" +
-      "enableResubscribe\030\007 \001(\010:\004true\"K\n\016Consume" +
-      "Request\022\024\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003" +
-      " \002(\0132\024.Hedwig.MessageSeqId\"*\n\022Unsubscrib" +
-      "eRequest\022\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDe" +
-      "liveryRequest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024S" +
-      "tartDeliveryRequest\022\024\n\014subscriberId\030\002 \002(" +
-      "\014\"\377\001\n\016PubSubResponse\0220\n\017protocolVersion\030",
-      "\001 \002(\0162\027.Hedwig.ProtocolVersion\022&\n\nstatus" +
-      "Code\030\002 \002(\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030" +
-      "\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(" +
-      "\0132\017.Hedwig.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014sub" +
-      "scriberId\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024." +
-      "Hedwig.ResponseBody\"?\n\017PublishResponse\022," +
-      "\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig.MessageS" +
-      "eqId\"I\n\021SubscribeResponse\0224\n\013preferences" +
-      "\030\002 \001(\0132\037.Hedwig.SubscriptionPreferences\"" +
-      "v\n\014ResponseBody\0220\n\017publishResponse\030\001 \001(\013",
-      "2\027.Hedwig.PublishResponse\0224\n\021subscribeRe" +
-      "sponse\030\002 \001(\0132\031.Hedwig.SubscribeResponse\"" +
-      "N\n\021SubscriptionState\022#\n\005msgId\030\001 \002(\0132\024.He" +
-      "dwig.MessageSeqId\022\024\n\014messageBound\030\002 \001(\r\"" +
-      "r\n\020SubscriptionData\022(\n\005state\030\001 \001(\0132\031.Hed" +
-      "wig.SubscriptionState\0224\n\013preferences\030\002 \001" +
-      "(\0132\037.Hedwig.SubscriptionPreferences\"O\n\013L" +
-      "edgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqId" +
-      "Included\030\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014" +
-      "LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.Le",
-      "dgerRange\":\n\013ManagerMeta\022\023\n\013managerImpl\030" +
-      "\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n\013HubInfo" +
-      "Data\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n" +
-      "\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017Proto" +
-      "colVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOperation" +
-      "Type\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONS" +
-      "UME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY" +
-      "\020\004\022\021\n\rSTOP_DELIVERY\020\005*D\n\021SubscriptionEve" +
-      "nt\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORC" +
-      "ED_CLOSED\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022",
-      "\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC" +
-      "\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025C" +
-      "LIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CON" +
-      "NECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSI" +
-      "BLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017U" +
-      "NCERTAIN_STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FIL" +
-      "TER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PER" +
-      "SISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_IN" +
-      "FO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004" +
-      "\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_T",
-      "OPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_E" +
-      "XISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tC" +
-      "OMPOSITE\020\274\005B\036\n\032org.apache.hedwig.protoco" +
-      "lH\001"
+      "\017.Hedwig.Message\"\177\n\027SubscriptionPreferen" +
+      "ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
+      "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022\031" +
+      "\n\021messageWindowSize\030\004 \001(\r\"\277\002\n\020SubscribeR" +
+      "equest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016createOr",
+      "Attach\030\003 \001(\0162\'.Hedwig.SubscribeRequest.C" +
+      "reateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013synch" +
+      "ronous\030\004 \001(\010:\005false\022\024\n\014messageBound\030\005 \001(" +
+      "\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Subscrip" +
+      "tionPreferences\022\032\n\013forceAttach\030\007 \001(\010:\005fa" +
+      "lse\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n\n\006ATT" +
+      "ACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\216\002\n\023Subscrip" +
+      "tionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005false\022" +
+      "Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Subscri" +
+      "beRequest.CreateOrAttach:\020CREATE_OR_ATTA",
+      "CH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007options\030\004" +
+      " \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030\005 \001(\t" +
+      "\022\031\n\021messageWindowSize\030\006 \001(\r\022\037\n\021enableRes" +
+      "ubscribe\030\007 \001(\010:\004true\"K\n\016ConsumeRequest\022\024" +
+      "\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.He" +
+      "dwig.MessageSeqId\"*\n\022UnsubscribeRequest\022" +
+      "\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDeliveryReq" +
+      "uest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024StartDeliv" +
+      "eryRequest\022\024\n\014subscriberId\030\002 \002(\014\"\377\001\n\016Pub" +
+      "SubResponse\0220\n\017protocolVersion\030\001 \002(\0162\027.H",
+      "edwig.ProtocolVersion\022&\n\nstatusCode\030\002 \002(" +
+      "\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\t" +
+      "statusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(\0132\017.Hedwi" +
+      "g.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014subscriberId" +
+      "\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024.Hedwig.Re" +
+      "sponseBody\"?\n\017PublishResponse\022,\n\016publish" +
+      "edMsgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\"I\n\021S" +
+      "ubscribeResponse\0224\n\013preferences\030\002 \001(\0132\037." +
+      "Hedwig.SubscriptionPreferences\"v\n\014Respon" +
+      "seBody\0220\n\017publishResponse\030\001 \001(\0132\027.Hedwig",
+      ".PublishResponse\0224\n\021subscribeResponse\030\002 " +
+      "\001(\0132\031.Hedwig.SubscribeResponse\"N\n\021Subscr" +
+      "iptionState\022#\n\005msgId\030\001 \002(\0132\024.Hedwig.Mess" +
+      "ageSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscr" +
+      "iptionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subsc" +
+      "riptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedw" +
+      "ig.SubscriptionPreferences\"O\n\013LedgerRang" +
+      "e\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030" +
+      "\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRan" +
+      "ges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange",
+      "\":\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016" +
+      "managerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010h" +
+      "ostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadD" +
+      "ata\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersio" +
+      "n\022\017\n\013VERSION_ONE\020\001*p\n\rOperationType\022\013\n\007P" +
+      "UBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013" +
+      "UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTO" +
+      "P_DELIVERY\020\005*D\n\021SubscriptionEvent\022\017\n\013TOP" +
+      "IC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORCED_CLOSED" +
+      "\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFOR",
+      "MED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CL" +
+      "IENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT" +
+      "_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017" +
+      "\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_T" +
+      "OPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_" +
+      "STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FILTER\020\370\003\022\020\n" +
+      "\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_" +
+      "INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_INFO_EXISTS" +
+      "\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004\022\036\n\031SUBSC" +
+      "RIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNE",
+      "R_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_EXISTS\020\216\004\022" +
+      "\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020" +
+      "\274\005B\036\n\032org.apache.hedwig.protocolH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15640,7 +15639,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_SubscriptionPreferences_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionPreferences_descriptor,
-              new java.lang.String[] { "Options", "MessageBound", "MessageFilter", "DeliveryThrottleValue", },
+              new java.lang.String[] { "Options", "MessageBound", "MessageFilter", "MessageWindowSize", },
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder.class);
           internal_static_Hedwig_SubscribeRequest_descriptor =
@@ -15656,7 +15655,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_SubscriptionOptions_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionOptions_descriptor,
-              new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "DeliveryThrottleValue", "EnableResubscribe", },
+              new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "MessageWindowSize", "EnableResubscribe", },
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.Builder.class);
           internal_static_Hedwig_ConsumeRequest_descriptor =

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Wed Oct 10 09:45:29 2012
@@ -117,8 +117,9 @@ message SubscriptionPreferences {
     optional uint32 messageBound = 2;
     // server-side message filter
     optional string messageFilter = 3;
-    // server-side delivery throttle value
-    optional uint32 deliveryThrottleValue = 4;
+    // message window size, this is the maximum number of messages 
+    // which will be delivered without being consumed
+    optional uint32 messageWindowSize = 4;
 }
 
 message SubscribeRequest{
@@ -156,8 +157,9 @@ message SubscriptionOptions {
     optional Map options = 4;
     // server-side message filter
     optional string messageFilter = 5;
-    // server-side delivery throttle value
-    optional uint32 deliveryThrottleValue = 6;
+    // message window size, this is the maximum number of messages 
+    // which will be delivered without being consumed
+    optional uint32 messageWindowSize = 6;
     // enable resubscribe
     optional bool enableResubscribe = 7 [default = true];
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Wed Oct 10 09:45:29 2012
@@ -61,8 +61,8 @@ public class ServerConfiguration extends
     protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
     protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
     protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
-    protected final static String DEFAULT_DELIVERY_THROTTLE_VALUE =
-        "default_delivery_throttle_value";
+    protected final static String DEFAULT_MESSAGE_WINDOW_SIZE =
+        "default_message_window_size";
 
     protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
 
@@ -285,11 +285,11 @@ public class ServerConfiguration extends
         return conf.getInt(RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL, 120000);
     }
 
-    // This parameter is for setting the default maximum number of messages delivered
-    // to a subscriber without being consumed.
-    // we throttle delivery messages to a subscriber when reaching the threshold.
-    public int getDefaultDeliveryThrottleValue() {
-        return conf.getInt(DEFAULT_DELIVERY_THROTTLE_VALUE, 0);
+    // This parameter is for setting the default maximum number of messages which
+    // can be delivered to a subscriber without being consumed.
+    // we pause messages delivery to a subscriber when reaching the window size
+    public int getDefaultMessageWindowSize() {
+        return conf.getInt(DEFAULT_MESSAGE_WINDOW_SIZE, 0);
     }
 
     // This parameter is used when Bookkeeper is the persistence store

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Wed Oct 10 09:45:29 2012
@@ -347,7 +347,7 @@ public class FIFODeliveryManager impleme
         long lastSeqIdCommunicatedExternally;
         long lastSeqIdConsumedUtil;
         boolean isThrottled = false;
-        final int throttleThreshold;
+        final int messageWindowSize;
         ServerMessageFilter filter;
         // TODO make use of these variables
 
@@ -364,14 +364,14 @@ public class FIFODeliveryManager impleme
             this.lastSeqIdConsumedUtil = lastLocalSeqIdDelivered;
             this.deliveryEndPoint = deliveryEndPoint;
             this.filter = filter;
-            if (preferences.hasDeliveryThrottleValue()) {
-                throttleThreshold = preferences.getDeliveryThrottleValue();
+            if (preferences.hasMessageWindowSize()) {
+                messageWindowSize = preferences.getMessageWindowSize();
             } else {
-                if (FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue() > 0) {
-                    throttleThreshold =
-                        FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue();
+                if (FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize() > 0) {
+                    messageWindowSize =
+                        FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize();
                 } else {
-                    throttleThreshold = UNLIMITED;
+                    messageWindowSize = UNLIMITED;
                 }
             }
         }
@@ -443,10 +443,10 @@ public class FIFODeliveryManager impleme
         }
 
         protected boolean msgLimitExceeded() {
-            if (throttleThreshold == UNLIMITED) {
+            if (messageWindowSize == UNLIMITED) {
                 return false;
             }
-            if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= throttleThreshold) {
+            if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= messageWindowSize) {
                 return true;
             }
             return false;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Wed Oct 10 09:45:29 2012
@@ -136,11 +136,11 @@ public class InMemorySubscriptionState {
                 changed = true;
             }
         }
-        if (preferences.hasDeliveryThrottleValue()) {
-            if (!subscriptionPreferences.hasDeliveryThrottleValue() ||
-                subscriptionPreferences.getDeliveryThrottleValue() !=
-                preferences.getDeliveryThrottleValue()) {
-                newPreferencesBuilder.setDeliveryThrottleValue(preferences.getDeliveryThrottleValue());
+        if (preferences.hasMessageWindowSize()) {
+            if (!subscriptionPreferences.hasMessageWindowSize() ||
+                subscriptionPreferences.getMessageWindowSize() !=
+                preferences.getMessageWindowSize()) {
+                newPreferencesBuilder.setMessageWindowSize(preferences.getMessageWindowSize());
                 changed = true;
             }
         }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Wed Oct 10 09:45:29 2012
@@ -766,7 +766,7 @@ public class TestBackwardCompat extends 
             org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
                 org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
                 .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
-                .setDeliveryThrottleValue(X) .build();
+                .setMessageWindowSize(X) .build();
             subscribe(topic, subid, options);
             closeSubscription(topic, subid);
             publishInts(topic, 1, 3*X);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1396524&r1=1396523&r2=1396524&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java Wed Oct 10 09:45:29 2012
@@ -25,7 +25,9 @@ import java.util.concurrent.atomic.Atomi
 
 import org.junit.Before;
 import org.junit.Test;
-import static org.junit.Assert.*;
+import static org.junit.Assert.assertFalse;
+import static org.junit.Assert.assertTrue;
+import static org.junit.Assert.assertEquals;
 
 import com.google.protobuf.ByteString;
 
@@ -44,7 +46,7 @@ import org.apache.hedwig.util.Callback;
 
 public class TestThrottlingDelivery extends HedwigHubTestBase {
 
-    private static final int DEFAULT_THROTTLE_VALUE = 10;
+    private static final int DEFAULT_MESSAGE_WINDOW_SIZE = 10;
 
     protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
 
@@ -53,30 +55,30 @@ public class TestThrottlingDelivery exte
         }
 
         @Override
-        public int getDefaultDeliveryThrottleValue() {
-            return DEFAULT_THROTTLE_VALUE;
+        public int getDefaultMessageWindowSize() {
+            return TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE;
         }
     }
 
     protected class ThrottleDeliveryClientConfiguration extends ClientConfiguration {
 
-        int throttleValue;
+        int messageWindowSize;
 
         ThrottleDeliveryClientConfiguration() {
-            this(DEFAULT_THROTTLE_VALUE);
+            this(TestThrottlingDelivery.this.DEFAULT_MESSAGE_WINDOW_SIZE);
         }
 
-        ThrottleDeliveryClientConfiguration(int throttleValue) {
-            this.throttleValue = throttleValue;
+        ThrottleDeliveryClientConfiguration(int messageWindowSize) {
+            this.messageWindowSize = messageWindowSize;
         }
 
         @Override
         public int getMaximumOutstandingMessages() {
-            return throttleValue;
+            return messageWindowSize;
         }
 
-        void setThrottleValue(int throttleValue) {
-            this.throttleValue = throttleValue;
+        void setMessageWindowSize(int messageWindowSize) {
+            this.messageWindowSize = messageWindowSize;
         }
 
         @Override
@@ -166,7 +168,7 @@ public class TestThrottlingDelivery exte
 
     @Test
     public void testServerSideThrottle() throws Exception {
-        int throttleValue = DEFAULT_THROTTLE_VALUE;
+        int messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE;
         ThrottleDeliveryClientConfiguration conf =
             new ThrottleDeliveryClientConfiguration();
         HedwigClient client = new HedwigClient(conf);
@@ -179,27 +181,27 @@ public class TestThrottlingDelivery exte
         sub.closeSubscription(topic, subid);
 
         // throttle with hub server's setting
-        throttleX(pub, sub, topic, subid, DEFAULT_THROTTLE_VALUE);
+        throttleX(pub, sub, topic, subid, DEFAULT_MESSAGE_WINDOW_SIZE);
 
-        throttleValue = DEFAULT_THROTTLE_VALUE / 2;
+        messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE / 2;
         // throttle with a lower value than hub server's setting
         SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
             .setCreateOrAttach(CreateOrAttach.CREATE)
-            .setDeliveryThrottleValue(throttleValue);
+            .setMessageWindowSize(messageWindowSize);
         topic = ByteString.copyFromUtf8("testServerSideThrottleWithLowerValue");
         sub.subscribe(topic, subid, optionsBuilder.build());
         sub.closeSubscription(topic, subid);
-        throttleX(pub, sub, topic, subid, throttleValue);
+        throttleX(pub, sub, topic, subid, messageWindowSize);
 
-        throttleValue = DEFAULT_THROTTLE_VALUE + 5;
+        messageWindowSize = DEFAULT_MESSAGE_WINDOW_SIZE + 5;
         // throttle with a higher value than hub server's setting
         optionsBuilder = SubscriptionOptions.newBuilder()
                          .setCreateOrAttach(CreateOrAttach.CREATE)
-                         .setDeliveryThrottleValue(throttleValue);
+                         .setMessageWindowSize(messageWindowSize);
         topic = ByteString.copyFromUtf8("testServerSideThrottleWithHigherValue");
         sub.subscribe(topic, subid, optionsBuilder.build());
         sub.closeSubscription(topic, subid);
-        throttleX(pub, sub, topic, subid, throttleValue);
+        throttleX(pub, sub, topic, subid, messageWindowSize);
 
         client.close();
     }