You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/04 15:30:32 UTC

svn commit: r1394066 - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-protocol/src/main/java/org/apache/hedwig/protocol/ hedwig-protocol/src/main/protobuf/ hedwig-server/src/main/java/org/apache/he...

Author: ivank
Date: Thu Oct  4 13:30:31 2012
New Revision: 1394066

URL: http://svn.apache.org/viewvc?rev=1394066&view=rev
Log:
BOOKKEEPER-367: Server-Side Message Delivery Flow Control (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Thu Oct  4 13:30:31 2012
@@ -168,6 +168,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-397: Make the hedwig client in RegionManager configurable. (Aniruddha via sijie)
 
+        BOOKKEEPER-367: Server-Side Message Delivery Flow Control (sijie via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Thu Oct  4 13:30:31 2012
@@ -192,6 +192,11 @@ public class NetUtils {
             preferencesBuilder.setOptions(options.getOptions());
         }
 
+        // set delivery throttle value
+        if (options.getDeliveryThrottleValue() > 0) {
+            preferencesBuilder.setDeliveryThrottleValue(options.getDeliveryThrottleValue());
+        }
+
         return preferencesBuilder;
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Thu Oct  4 13:30:31 2012
@@ -5704,6 +5704,10 @@ public final class PubSubProtocol {
     // optional string messageFilter = 3;
     boolean hasMessageFilter();
     String getMessageFilter();
+    
+    // optional uint32 deliveryThrottleValue = 4;
+    boolean hasDeliveryThrottleValue();
+    int getDeliveryThrottleValue();
   }
   public static final class SubscriptionPreferences extends
       com.google.protobuf.GeneratedMessage
@@ -5789,10 +5793,21 @@ public final class PubSubProtocol {
       }
     }
     
+    // optional uint32 deliveryThrottleValue = 4;
+    public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 4;
+    private int deliveryThrottleValue_;
+    public boolean hasDeliveryThrottleValue() {
+      return ((bitField0_ & 0x00000008) == 0x00000008);
+    }
+    public int getDeliveryThrottleValue() {
+      return deliveryThrottleValue_;
+    }
+    
     private void initFields() {
       options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
       messageBound_ = 0;
       messageFilter_ = "";
+      deliveryThrottleValue_ = 0;
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -5815,6 +5830,9 @@ public final class PubSubProtocol {
       if (((bitField0_ & 0x00000004) == 0x00000004)) {
         output.writeBytes(3, getMessageFilterBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        output.writeUInt32(4, deliveryThrottleValue_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -5836,6 +5854,10 @@ public final class PubSubProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeBytesSize(3, getMessageFilterBytes());
       }
+      if (((bitField0_ & 0x00000008) == 0x00000008)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(4, deliveryThrottleValue_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -5971,6 +5993,8 @@ public final class PubSubProtocol {
         bitField0_ = (bitField0_ & ~0x00000002);
         messageFilter_ = "";
         bitField0_ = (bitField0_ & ~0x00000004);
+        deliveryThrottleValue_ = 0;
+        bitField0_ = (bitField0_ & ~0x00000008);
         return this;
       }
       
@@ -6025,6 +6049,10 @@ public final class PubSubProtocol {
           to_bitField0_ |= 0x00000004;
         }
         result.messageFilter_ = messageFilter_;
+        if (((from_bitField0_ & 0x00000008) == 0x00000008)) {
+          to_bitField0_ |= 0x00000008;
+        }
+        result.deliveryThrottleValue_ = deliveryThrottleValue_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -6050,6 +6078,9 @@ public final class PubSubProtocol {
         if (other.hasMessageFilter()) {
           setMessageFilter(other.getMessageFilter());
         }
+        if (other.hasDeliveryThrottleValue()) {
+          setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -6100,6 +6131,11 @@ public final class PubSubProtocol {
               messageFilter_ = input.readBytes();
               break;
             }
+            case 32: {
+              bitField0_ |= 0x00000008;
+              deliveryThrottleValue_ = input.readUInt32();
+              break;
+            }
           }
         }
       }
@@ -6253,6 +6289,27 @@ public final class PubSubProtocol {
         onChanged();
       }
       
+      // optional uint32 deliveryThrottleValue = 4;
+      private int deliveryThrottleValue_ ;
+      public boolean hasDeliveryThrottleValue() {
+        return ((bitField0_ & 0x00000008) == 0x00000008);
+      }
+      public int getDeliveryThrottleValue() {
+        return deliveryThrottleValue_;
+      }
+      public Builder setDeliveryThrottleValue(int value) {
+        bitField0_ |= 0x00000008;
+        deliveryThrottleValue_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearDeliveryThrottleValue() {
+        bitField0_ = (bitField0_ & ~0x00000008);
+        deliveryThrottleValue_ = 0;
+        onChanged();
+        return this;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Hedwig.SubscriptionPreferences)
     }
     
@@ -7089,6 +7146,10 @@ public final class PubSubProtocol {
     boolean hasMessageFilter();
     String getMessageFilter();
     
+    // optional uint32 deliveryThrottleValue = 6;
+    boolean hasDeliveryThrottleValue();
+    int getDeliveryThrottleValue();
+    
     // optional bool enableResubscribe = 7 [default = true];
     boolean hasEnableResubscribe();
     boolean getEnableResubscribe();
@@ -7197,11 +7258,21 @@ public final class PubSubProtocol {
       }
     }
     
+    // optional uint32 deliveryThrottleValue = 6;
+    public static final int DELIVERYTHROTTLEVALUE_FIELD_NUMBER = 6;
+    private int deliveryThrottleValue_;
+    public boolean hasDeliveryThrottleValue() {
+      return ((bitField0_ & 0x00000020) == 0x00000020);
+    }
+    public int getDeliveryThrottleValue() {
+      return deliveryThrottleValue_;
+    }
+    
     // optional bool enableResubscribe = 7 [default = true];
     public static final int ENABLERESUBSCRIBE_FIELD_NUMBER = 7;
     private boolean enableResubscribe_;
     public boolean hasEnableResubscribe() {
-      return ((bitField0_ & 0x00000020) == 0x00000020);
+      return ((bitField0_ & 0x00000040) == 0x00000040);
     }
     public boolean getEnableResubscribe() {
       return enableResubscribe_;
@@ -7213,6 +7284,7 @@ public final class PubSubProtocol {
       messageBound_ = 0;
       options_ = org.apache.hedwig.protocol.PubSubProtocol.Map.getDefaultInstance();
       messageFilter_ = "";
+      deliveryThrottleValue_ = 0;
       enableResubscribe_ = true;
     }
     private byte memoizedIsInitialized = -1;
@@ -7243,6 +7315,9 @@ public final class PubSubProtocol {
         output.writeBytes(5, getMessageFilterBytes());
       }
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
+        output.writeUInt32(6, deliveryThrottleValue_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
         output.writeBool(7, enableResubscribe_);
       }
       getUnknownFields().writeTo(output);
@@ -7276,6 +7351,10 @@ public final class PubSubProtocol {
       }
       if (((bitField0_ & 0x00000020) == 0x00000020)) {
         size += com.google.protobuf.CodedOutputStream
+          .computeUInt32Size(6, deliveryThrottleValue_);
+      }
+      if (((bitField0_ & 0x00000040) == 0x00000040)) {
+        size += com.google.protobuf.CodedOutputStream
           .computeBoolSize(7, enableResubscribe_);
       }
       size += getUnknownFields().getSerializedSize();
@@ -7417,8 +7496,10 @@ public final class PubSubProtocol {
         bitField0_ = (bitField0_ & ~0x00000008);
         messageFilter_ = "";
         bitField0_ = (bitField0_ & ~0x00000010);
-        enableResubscribe_ = true;
+        deliveryThrottleValue_ = 0;
         bitField0_ = (bitField0_ & ~0x00000020);
+        enableResubscribe_ = true;
+        bitField0_ = (bitField0_ & ~0x00000040);
         return this;
       }
       
@@ -7484,6 +7565,10 @@ public final class PubSubProtocol {
         if (((from_bitField0_ & 0x00000020) == 0x00000020)) {
           to_bitField0_ |= 0x00000020;
         }
+        result.deliveryThrottleValue_ = deliveryThrottleValue_;
+        if (((from_bitField0_ & 0x00000040) == 0x00000040)) {
+          to_bitField0_ |= 0x00000040;
+        }
         result.enableResubscribe_ = enableResubscribe_;
         result.bitField0_ = to_bitField0_;
         onBuilt();
@@ -7516,6 +7601,9 @@ public final class PubSubProtocol {
         if (other.hasMessageFilter()) {
           setMessageFilter(other.getMessageFilter());
         }
+        if (other.hasDeliveryThrottleValue()) {
+          setDeliveryThrottleValue(other.getDeliveryThrottleValue());
+        }
         if (other.hasEnableResubscribe()) {
           setEnableResubscribe(other.getEnableResubscribe());
         }
@@ -7585,8 +7673,13 @@ public final class PubSubProtocol {
               messageFilter_ = input.readBytes();
               break;
             }
-            case 56: {
+            case 48: {
               bitField0_ |= 0x00000020;
+              deliveryThrottleValue_ = input.readUInt32();
+              break;
+            }
+            case 56: {
+              bitField0_ |= 0x00000040;
               enableResubscribe_ = input.readBool();
               break;
             }
@@ -7788,22 +7881,43 @@ public final class PubSubProtocol {
         onChanged();
       }
       
+      // optional uint32 deliveryThrottleValue = 6;
+      private int deliveryThrottleValue_ ;
+      public boolean hasDeliveryThrottleValue() {
+        return ((bitField0_ & 0x00000020) == 0x00000020);
+      }
+      public int getDeliveryThrottleValue() {
+        return deliveryThrottleValue_;
+      }
+      public Builder setDeliveryThrottleValue(int value) {
+        bitField0_ |= 0x00000020;
+        deliveryThrottleValue_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearDeliveryThrottleValue() {
+        bitField0_ = (bitField0_ & ~0x00000020);
+        deliveryThrottleValue_ = 0;
+        onChanged();
+        return this;
+      }
+      
       // optional bool enableResubscribe = 7 [default = true];
       private boolean enableResubscribe_ = true;
       public boolean hasEnableResubscribe() {
-        return ((bitField0_ & 0x00000020) == 0x00000020);
+        return ((bitField0_ & 0x00000040) == 0x00000040);
       }
       public boolean getEnableResubscribe() {
         return enableResubscribe_;
       }
       public Builder setEnableResubscribe(boolean value) {
-        bitField0_ |= 0x00000020;
+        bitField0_ |= 0x00000040;
         enableResubscribe_ = value;
         onChanged();
         return this;
       }
       public Builder clearEnableResubscribe() {
-        bitField0_ = (bitField0_ & ~0x00000020);
+        bitField0_ = (bitField0_ & ~0x00000040);
         enableResubscribe_ = true;
         onChanged();
         return this;
@@ -15382,73 +15496,75 @@ public final class PubSubProtocol {
       "\033.Hedwig.StopDeliveryRequest\022:\n\024startDel" +
       "iveryRequest\0309 \001(\0132\034.Hedwig.StartDeliver" +
       "yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
-      "\017.Hedwig.Message\"d\n\027SubscriptionPreferen" +
-      "ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
-      "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\"\277" +
-      "\002\n\020SubscribeRequest\022\024\n\014subscriberId\030\002 \002(" +
-      "\014\022Q\n\016createOrAttach\030\003 \001(\0162\'.Hedwig.Subsc",
-      "ribeRequest.CreateOrAttach:\020CREATE_OR_AT" +
-      "TACH\022\032\n\013synchronous\030\004 \001(\010:\005false\022\024\n\014mess" +
-      "ageBound\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.He" +
-      "dwig.SubscriptionPreferences\022\032\n\013forceAtt" +
-      "ach\030\007 \001(\010:\005false\">\n\016CreateOrAttach\022\n\n\006CR" +
-      "EATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002" +
-      "\"\363\001\n\023SubscriptionOptions\022\032\n\013forceAttach\030" +
-      "\001 \001(\010:\005false\022Q\n\016createOrAttach\030\002 \001(\0162\'.H" +
-      "edwig.SubscribeRequest.CreateOrAttach:\020C" +
-      "REATE_OR_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010",
-      "\022\034\n\007options\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessag" +
-      "eFilter\030\005 \001(\t\022\037\n\021enableResubscribe\030\007 \001(\010" +
-      ":\004true\"K\n\016ConsumeRequest\022\024\n\014subscriberId" +
-      "\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.Hedwig.MessageSe" +
-      "qId\"*\n\022UnsubscribeRequest\022\024\n\014subscriberI" +
-      "d\030\002 \002(\014\"+\n\023StopDeliveryRequest\022\024\n\014subscr" +
-      "iberId\030\002 \002(\014\",\n\024StartDeliveryRequest\022\024\n\014" +
-      "subscriberId\030\002 \002(\014\"\377\001\n\016PubSubResponse\0220\n" +
-      "\017protocolVersion\030\001 \002(\0162\027.Hedwig.Protocol" +
-      "Version\022&\n\nstatusCode\030\002 \002(\0162\022.Hedwig.Sta",
-      "tusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(" +
-      "\t\022 \n\007message\030\005 \001(\0132\017.Hedwig.Message\022\r\n\005t" +
-      "opic\030\006 \001(\014\022\024\n\014subscriberId\030\007 \001(\014\022*\n\014resp" +
-      "onseBody\030\010 \001(\0132\024.Hedwig.ResponseBody\"?\n\017" +
-      "PublishResponse\022,\n\016publishedMsgId\030\001 \002(\0132" +
-      "\024.Hedwig.MessageSeqId\"I\n\021SubscribeRespon" +
-      "se\0224\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscri" +
-      "ptionPreferences\"v\n\014ResponseBody\0220\n\017publ" +
-      "ishResponse\030\001 \001(\0132\027.Hedwig.PublishRespon" +
-      "se\0224\n\021subscribeResponse\030\002 \001(\0132\031.Hedwig.S",
-      "ubscribeResponse\"N\n\021SubscriptionState\022#\n" +
-      "\005msgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\022\024\n\014me" +
-      "ssageBound\030\002 \001(\r\"r\n\020SubscriptionData\022(\n\005" +
-      "state\030\001 \001(\0132\031.Hedwig.SubscriptionState\0224" +
-      "\n\013preferences\030\002 \001(\0132\037.Hedwig.Subscriptio" +
-      "nPreferences\"O\n\013LedgerRange\022\020\n\010ledgerId\030" +
-      "\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 \001(\0132\024.Hedwig" +
-      ".MessageSeqId\"3\n\014LedgerRanges\022#\n\006ranges\030" +
-      "\001 \003(\0132\023.Hedwig.LedgerRange\":\n\013ManagerMet" +
-      "a\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016managerVersion",
-      "\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hostname\030\002 \002(\t\022" +
-      "\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadData\022\021\n\tnumTopi" +
-      "cs\030\002 \002(\004*\"\n\017ProtocolVersion\022\017\n\013VERSION_O" +
-      "NE\020\001*p\n\rOperationType\022\013\n\007PUBLISH\020\000\022\r\n\tSU" +
-      "BSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022" +
-      "\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP_DELIVERY\020\005*D" +
-      "\n\021SubscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032" +
-      "SUBSCRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusC" +
-      "ode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003" +
-      "\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_S",
-      "UBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003" +
-      "\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226" +
-      "\003\022\036\n\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SER" +
-      "VICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026IN" +
-      "VALID_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210" +
-      "\004\022\036\n\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOP" +
-      "IC_PERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBS" +
-      "CRIPTION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_" +
-      "EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027T" +
-      "OPIC_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_",
-      "CONDITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apac" +
-      "he.hedwig.protocolH\001"
+      "\017.Hedwig.Message\"\203\001\n\027SubscriptionPrefere" +
+      "nces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014me" +
+      "ssageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022" +
+      "\035\n\025deliveryThrottleValue\030\004 \001(\r\"\277\002\n\020Subsc" +
+      "ribeRequest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016cre",
+      "ateOrAttach\030\003 \001(\0162\'.Hedwig.SubscribeRequ" +
+      "est.CreateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013" +
+      "synchronous\030\004 \001(\010:\005false\022\024\n\014messageBound" +
+      "\030\005 \001(\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Sub" +
+      "scriptionPreferences\022\032\n\013forceAttach\030\007 \001(" +
+      "\010:\005false\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n" +
+      "\n\006ATTACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\222\002\n\023Sub" +
+      "scriptionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005f" +
+      "alse\022Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Su" +
+      "bscribeRequest.CreateOrAttach:\020CREATE_OR",
+      "_ATTACH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007opti" +
+      "ons\030\004 \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030" +
+      "\005 \001(\t\022\035\n\025deliveryThrottleValue\030\006 \001(\r\022\037\n\021" +
+      "enableResubscribe\030\007 \001(\010:\004true\"K\n\016Consume" +
+      "Request\022\024\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003" +
+      " \002(\0132\024.Hedwig.MessageSeqId\"*\n\022Unsubscrib" +
+      "eRequest\022\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDe" +
+      "liveryRequest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024S" +
+      "tartDeliveryRequest\022\024\n\014subscriberId\030\002 \002(" +
+      "\014\"\377\001\n\016PubSubResponse\0220\n\017protocolVersion\030",
+      "\001 \002(\0162\027.Hedwig.ProtocolVersion\022&\n\nstatus" +
+      "Code\030\002 \002(\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030" +
+      "\003 \002(\004\022\021\n\tstatusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(" +
+      "\0132\017.Hedwig.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014sub" +
+      "scriberId\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024." +
+      "Hedwig.ResponseBody\"?\n\017PublishResponse\022," +
+      "\n\016publishedMsgId\030\001 \002(\0132\024.Hedwig.MessageS" +
+      "eqId\"I\n\021SubscribeResponse\0224\n\013preferences" +
+      "\030\002 \001(\0132\037.Hedwig.SubscriptionPreferences\"" +
+      "v\n\014ResponseBody\0220\n\017publishResponse\030\001 \001(\013",
+      "2\027.Hedwig.PublishResponse\0224\n\021subscribeRe" +
+      "sponse\030\002 \001(\0132\031.Hedwig.SubscribeResponse\"" +
+      "N\n\021SubscriptionState\022#\n\005msgId\030\001 \002(\0132\024.He" +
+      "dwig.MessageSeqId\022\024\n\014messageBound\030\002 \001(\r\"" +
+      "r\n\020SubscriptionData\022(\n\005state\030\001 \001(\0132\031.Hed" +
+      "wig.SubscriptionState\0224\n\013preferences\030\002 \001" +
+      "(\0132\037.Hedwig.SubscriptionPreferences\"O\n\013L" +
+      "edgerRange\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqId" +
+      "Included\030\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014" +
+      "LedgerRanges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.Le",
+      "dgerRange\":\n\013ManagerMeta\022\023\n\013managerImpl\030" +
+      "\002 \002(\t\022\026\n\016managerVersion\030\003 \002(\r\".\n\013HubInfo" +
+      "Data\022\020\n\010hostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n" +
+      "\013HubLoadData\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017Proto" +
+      "colVersion\022\017\n\013VERSION_ONE\020\001*p\n\rOperation" +
+      "Type\022\013\n\007PUBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONS" +
+      "UME\020\002\022\017\n\013UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY" +
+      "\020\004\022\021\n\rSTOP_DELIVERY\020\005*D\n\021SubscriptionEve" +
+      "nt\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORC" +
+      "ED_CLOSED\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022",
+      "\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC" +
+      "\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025C" +
+      "LIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CON" +
+      "NECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSI" +
+      "BLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017U" +
+      "NCERTAIN_STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FIL" +
+      "TER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PER" +
+      "SISTENCE_INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_IN" +
+      "FO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004" +
+      "\022\036\n\031SUBSCRIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_T",
+      "OPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_E" +
+      "XISTS\020\216\004\022\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tC" +
+      "OMPOSITE\020\274\005B\036\n\032org.apache.hedwig.protoco" +
+      "lH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15524,7 +15640,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_SubscriptionPreferences_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionPreferences_descriptor,
-              new java.lang.String[] { "Options", "MessageBound", "MessageFilter", },
+              new java.lang.String[] { "Options", "MessageBound", "MessageFilter", "DeliveryThrottleValue", },
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences.Builder.class);
           internal_static_Hedwig_SubscribeRequest_descriptor =
@@ -15540,7 +15656,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_SubscriptionOptions_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionOptions_descriptor,
-              new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "EnableResubscribe", },
+              new java.lang.String[] { "ForceAttach", "CreateOrAttach", "MessageBound", "Options", "MessageFilter", "DeliveryThrottleValue", "EnableResubscribe", },
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.Builder.class);
           internal_static_Hedwig_ConsumeRequest_descriptor =

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Thu Oct  4 13:30:31 2012
@@ -117,6 +117,8 @@ message SubscriptionPreferences {
     optional uint32 messageBound = 2;
     // server-side message filter
     optional string messageFilter = 3;
+    // server-side delivery throttle value
+    optional uint32 deliveryThrottleValue = 4;
 }
 
 message SubscribeRequest{
@@ -154,6 +156,8 @@ message SubscriptionOptions {
     optional Map options = 4;
     // server-side message filter
     optional string messageFilter = 5;
+    // server-side delivery throttle value
+    optional uint32 deliveryThrottleValue = 6;
     // enable resubscribe
     optional bool enableResubscribe = 7 [default = true];
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/common/ServerConfiguration.java Thu Oct  4 13:30:31 2012
@@ -61,6 +61,9 @@ public class ServerConfiguration extends
     protected final static String BK_ENSEMBLE_SIZE = "bk_ensemble_size";
     protected final static String BK_QUORUM_SIZE = "bk_quorum_size";
     protected final static String RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL = "retry_remote_subscribe_thread_run_interval";
+    protected final static String DEFAULT_DELIVERY_THROTTLE_VALUE =
+        "default_delivery_throttle_value";
+
     protected final static String MAX_ENTRIES_PER_LEDGER = "max_entries_per_ledger";
 
     // manager related settings
@@ -282,6 +285,13 @@ public class ServerConfiguration extends
         return conf.getInt(RETRY_REMOTE_SUBSCRIBE_THREAD_RUN_INTERVAL, 120000);
     }
 
+    // This parameter is for setting the default maximum number of messages delivered
+    // to a subscriber without being consumed.
+    // we throttle delivery messages to a subscriber when reaching the threshold.
+    public int getDefaultDeliveryThrottleValue() {
+        return conf.getInt(DEFAULT_DELIVERY_THROTTLE_VALUE, 0);
+    }
+
     // This parameter is used when Bookkeeper is the persistence store
     // and indicates what the ensemble size is (i.e. how many bookie
     // servers to stripe the ledger entries across).

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Thu Oct  4 13:30:31 2012
@@ -19,17 +19,34 @@ package org.apache.hedwig.server.deliver
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.filter.ServerMessageFilter;
 
 public interface DeliveryManager {
     public void start();
 
-    public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint, ServerMessageFilter filter);
+    public void startServingSubscription(ByteString topic, ByteString subscriberId,
+                                         SubscriptionPreferences preferences,
+                                         MessageSeqId seqIdToStartFrom,
+                                         DeliveryEndPoint endPoint,
+                                         ServerMessageFilter filter);
 
     public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
 
     /**
+     * Tell the delivery manager where that a subscriber has consumed
+     *
+     * @param topic
+     *          Topic Name
+     * @param subscriberId
+     *          Subscriber Id
+     * @param consumedSeqId
+     *          Max consumed seq id.
+     */
+    public void messageConsumed(ByteString topic, ByteString subscriberId,
+                                MessageSeqId consumedSeqId);
+
+    /**
      * Stop delivery manager
      */
     public void stop();

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu Oct  4 13:30:31 2012
@@ -17,6 +17,7 @@
  */
 package org.apache.hedwig.server.delivery;
 
+import java.util.Comparator;
 import java.util.HashMap;
 import java.util.HashSet;
 import java.util.Map;
@@ -25,7 +26,7 @@ import java.util.Set;
 import java.util.SortedMap;
 import java.util.TreeMap;
 import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.PriorityBlockingQueue;
 import java.util.concurrent.LinkedBlockingQueue;
 import java.util.concurrent.TimeUnit;
 
@@ -42,6 +43,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.common.UnexpectedError;
 import org.apache.hedwig.server.netty.ServerStats;
@@ -50,6 +52,7 @@ import org.apache.hedwig.server.persiste
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.persistence.ScanCallback;
 import org.apache.hedwig.server.persistence.ScanRequest;
+import static org.apache.hedwig.util.VarArgs.va;
 
 public class FIFODeliveryManager implements Runnable, DeliveryManager {
 
@@ -68,7 +71,14 @@ public class FIFODeliveryManager impleme
      * The queue of all subscriptions that are facing a transient error either
      * in scanning from the persistence manager, or in sending to the consumer
      */
-    Queue<ActiveSubscriberState> retryQueue = new ConcurrentLinkedQueue<ActiveSubscriberState>();
+    Queue<ActiveSubscriberState> retryQueue =
+        new PriorityBlockingQueue<ActiveSubscriberState>(32, new Comparator<ActiveSubscriberState>() {
+            @Override
+            public int compare(ActiveSubscriberState as1, ActiveSubscriberState as2) {
+                long s = as1.lastScanErrorTime - as2.lastScanErrorTime;
+                return s > 0 ? 1 : (s < 0 ? -1 : 0);
+            }
+        });
 
     /**
      * Stores a mapping from topic to the delivery pointers on the topic. The
@@ -136,11 +146,16 @@ public class FIFODeliveryManager impleme
      *            Only messages passing this filter should be sent to this
      *            subscriber
      */
-    public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+    public void startServingSubscription(ByteString topic, ByteString subscriberId,
+                                         SubscriptionPreferences preferences,
+                                         MessageSeqId seqIdToStartFrom,
                                          DeliveryEndPoint endPoint, ServerMessageFilter filter) {
 
-        ActiveSubscriberState subscriber = new ActiveSubscriberState(topic, subscriberId, seqIdToStartFrom
-                .getLocalComponent() - 1, endPoint, filter);
+        ActiveSubscriberState subscriber = 
+            new ActiveSubscriberState(topic, subscriberId,
+                                      preferences,
+                                      seqIdToStartFrom.getLocalComponent() - 1,
+                                      endPoint, filter);
 
         enqueueWithoutFailure(subscriber);
     }
@@ -179,6 +194,34 @@ public class FIFODeliveryManager impleme
         }
     }
 
+    public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) {
+        subscriber.clearLastScanErrorTime();
+        if (!retryQueue.offer(subscriber)) {
+            throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
+        }
+        // no request in request queue now
+        // issue a empty delivery request to not waiting for polling requests queue
+        if (requestQueue.isEmpty()) {
+            enqueueWithoutFailure(new DeliveryManagerRequest() {
+                    @Override
+                    public void performRequest() {
+                    // do nothing
+                    }
+                    });
+        }
+    }
+
+    @Override
+    public void messageConsumed(ByteString topic, ByteString subscriberId,
+                                MessageSeqId consumedSeqId) {
+        ActiveSubscriberState subState =
+            subscriberStates.get(new TopicSubscriber(topic, subscriberId));
+        if (null == subState) {
+            return;
+        }
+        subState.messageConsumed(consumedSeqId.getLocalComponent()); 
+    }
+
     /**
      * Instructs the delivery manager to move the delivery pointer for a given
      * subscriber
@@ -291,6 +334,9 @@ public class FIFODeliveryManager impleme
     }
 
     public class ActiveSubscriberState implements ScanCallback, DeliveryCallback, DeliveryManagerRequest {
+
+        static final int UNLIMITED = 0;
+
         ByteString topic;
         ByteString subscriberId;
         long lastLocalSeqIdDelivered;
@@ -299,18 +345,35 @@ public class FIFODeliveryManager impleme
         long lastScanErrorTime = -1;
         long localSeqIdDeliveringNow;
         long lastSeqIdCommunicatedExternally;
+        long lastSeqIdConsumedUtil;
+        boolean isThrottled = false;
+        final int throttleThreshold;
         ServerMessageFilter filter;
         // TODO make use of these variables
 
         final static int SEQ_ID_SLACK = 10;
 
-        public ActiveSubscriberState(ByteString topic, ByteString subscriberId, long lastLocalSeqIdDelivered,
-                                     DeliveryEndPoint deliveryEndPoint, ServerMessageFilter filter) {
+        public ActiveSubscriberState(ByteString topic, ByteString subscriberId,
+                                     SubscriptionPreferences preferences,
+                                     long lastLocalSeqIdDelivered,
+                                     DeliveryEndPoint deliveryEndPoint,
+                                     ServerMessageFilter filter) {
             this.topic = topic;
             this.subscriberId = subscriberId;
             this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
+            this.lastSeqIdConsumedUtil = lastLocalSeqIdDelivered;
             this.deliveryEndPoint = deliveryEndPoint;
             this.filter = filter;
+            if (preferences.hasDeliveryThrottleValue()) {
+                throttleThreshold = preferences.getDeliveryThrottleValue();
+            } else {
+                if (FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue() > 0) {
+                    throttleThreshold =
+                        FIFODeliveryManager.this.cfg.getDefaultDeliveryThrottleValue();
+                } else {
+                    throttleThreshold = UNLIMITED;
+                }
+            }
         }
 
         public void setNotConnected() {
@@ -340,16 +403,71 @@ public class FIFODeliveryManager impleme
             this.lastScanErrorTime = lastScanErrorTime;
         }
 
+        /**
+         * Clear the last scan error time so it could be retry immediately.
+         */
+        protected void clearLastScanErrorTime() {
+            this.lastScanErrorTime = -1;
+        }
+
         protected boolean isConnected() {
             return connected;
         }
 
+        protected void messageConsumed(long newSeqIdConsumed) {
+            if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
+                return;
+            }
+            if (logger.isDebugEnabled()) {
+                logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
+                             va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
+            }
+            lastSeqIdConsumedUtil = newSeqIdConsumed;
+            // after updated seq id check whether it still exceed msg limitation
+            if (msgLimitExceeded()) {
+                return;
+            }
+            if (isThrottled) {
+                isThrottled = false;
+                logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
+                            va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
+
+                enqueueWithoutFailure(new DeliveryManagerRequest() {
+                    @Override
+                    public void performRequest() {
+                        // enqueue 
+                        clearRetryDelayForSubscriber(ActiveSubscriberState.this);            
+                    }
+                });
+            }
+        }
+
+        protected boolean msgLimitExceeded() {
+            if (throttleThreshold == UNLIMITED) {
+                return false;
+            }
+            if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= throttleThreshold) {
+                return true;
+            }
+            return false;
+        }
+
         public void deliverNextMessage() {
 
             if (!isConnected()) {
                 return;
             }
 
+            // check whether we have delivered enough messages without receiving their consumes
+            if (msgLimitExceeded()) {
+                logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.",
+                            va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
+                isThrottled = true;
+                // do nothing, since the delivery process would be throttled.
+                // After message consumed, it would be added back to retry queue.
+                return;
+            }
+
             localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
 
             ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Thu Oct  4 13:30:31 2012
@@ -260,7 +260,9 @@ public class SubscribeHandler extends Ba
                 MessageSeqId lastConsumedSeqId = subData.getState().getMsgId();
                 MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(lastConsumedSeqId).setLocalComponent(
                                                     lastConsumedSeqId.getLocalComponent() + 1).build();
-                deliveryMgr.startServingSubscription(topic, subscriberId, seqIdToStartFrom,
+                deliveryMgr.startServingSubscription(topic, subscriberId,
+                                                     subData.getPreferences(),
+                                                     seqIdToStartFrom,
                                                      new ChannelEndPoint(channel), filter);
             }
         }, null);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Thu Oct  4 13:30:31 2012
@@ -575,7 +575,10 @@ public abstract class AbstractSubscripti
                 }
                 cb.operationFinished(ctx, null);
             }
-
+            // tell delivery manage about the consume event
+            if (null != dm) {
+                dm.messageConsumed(topic, subscriberId, consumeSeqId);
+            }
         }
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java Thu Oct  4 13:30:31 2012
@@ -136,6 +136,14 @@ public class InMemorySubscriptionState {
                 changed = true;
             }
         }
+        if (preferences.hasDeliveryThrottleValue()) {
+            if (!subscriptionPreferences.hasDeliveryThrottleValue() ||
+                subscriptionPreferences.getDeliveryThrottleValue() !=
+                preferences.getDeliveryThrottleValue()) {
+                newPreferencesBuilder.setDeliveryThrottleValue(preferences.getDeliveryThrottleValue());
+                changed = true;
+            }
+        }
         if (preferences.hasOptions()) {
             Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(subscriptionPreferences);
             Map<String, ByteString> optUpdates = SubscriptionStateUtils.buildUserOptions(preferences);

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/TestBackwardCompat.java Thu Oct  4 13:30:31 2012
@@ -577,10 +577,14 @@ public class TestBackwardCompat extends 
         }
 
         ClientCurrent() {
+            this(true);
+        }
+
+        ClientCurrent(final boolean autoConsumeEnabled) {
             conf = new org.apache.hedwig.client.conf.ClientConfiguration() {
                 @Override
                 public boolean isAutoSendConsumeMessageEnabled() {
-                    return true;
+                    return autoConsumeEnabled;
                 }
                 @Override
                 public int getConsumedMessagesBufferSize() {
@@ -756,6 +760,65 @@ public class TestBackwardCompat extends 
             subscriber.stopDelivery(topic, subscriberId);
         }
 
+        // throttle doesn't work talking with 41 server
+        void throttleX41(ByteString topic, ByteString subid, final int X)
+        throws Exception {
+            org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions options =
+                org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions.newBuilder()
+                .setCreateOrAttach(org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach.CREATE_OR_ATTACH)
+                .setDeliveryThrottleValue(X) .build();
+            subscribe(topic, subid, options);
+            closeSubscription(topic, subid);
+            publishInts(topic, 1, 3*X);
+            subscribe(topic, subid);
+
+            final AtomicInteger expected = new AtomicInteger(1);
+            final CountDownLatch throttleLatch = new CountDownLatch(1);
+            final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
+            subscriber.startDelivery(topic, subid, new org.apache.hedwig.client.api.MessageHandler() {
+                @Override
+                public synchronized void deliver(ByteString topic, ByteString subscriberId,
+                                                 org.apache.hedwig.protocol.PubSubProtocol.Message msg,
+                                                 org.apache.hedwig.util.Callback<Void> callback, Object context) {
+                    try {
+                        int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                        logger.debug("Received message {},", value);
+
+                        if (value == expected.get()) {
+                            expected.incrementAndGet();
+                        } else {
+                            // error condition
+                            logger.error("Did not receive expected value, expected {}, got {}",
+                                         expected.get(), value);
+                            expected.set(0);
+                            throttleLatch.countDown();
+                            nonThrottleLatch.countDown();
+                        }
+                        if (expected.get() > X+1) {
+                            throttleLatch.countDown();
+                        }
+                        if (expected.get() == (3 * X + 1)) {
+                            nonThrottleLatch.countDown();
+                        }
+                        callback.operationFinished(context, null);
+                    } catch (Exception e) {
+                        logger.error("Received bad message", e);
+                        throttleLatch.countDown();
+                        nonThrottleLatch.countDown();
+                    }
+                }
+            });
+            assertTrue("Should Receive more messages than throttle value " + X,
+                        throttleLatch.await(10, TimeUnit.SECONDS));
+
+            assertTrue("Timed out waiting for messages " + (3*X + 1),
+                       nonThrottleLatch.await(10, TimeUnit.SECONDS));
+            assertEquals("Should be expected message with " + (3*X + 1),
+                         3*X + 1, expected.get());
+
+            subscriber.stopDelivery(topic, subid);
+            closeSubscription(topic, subid);
+        }
     }
 
     /**
@@ -1029,4 +1092,34 @@ public class TestBackwardCompat extends 
         // stop bookkeeper cluster
         bkc410.stop();
     }
+
+    /**
+     * Test compatability between version 4.1.0 and the current version.
+     *
+     * Server side throttling does't work when current client connects to old version
+     * server.
+     */
+    @Test
+    public void testServerSideThrottleCompat410() throws Exception {
+        ByteString topic = ByteString.copyFromUtf8("TestServerSideThrottleCompat410");
+        ByteString subid = ByteString.copyFromUtf8("mysub");
+
+        // start bookkeeper
+        BookKeeperCluster410 bkc410 = new BookKeeperCluster410(3);
+        bkc410.start();
+
+        // start hub server 410
+        Server410 s410 = new Server410(zkUtil.getZooKeeperConnectString());
+        s410.start();
+
+        ClientCurrent ccur = new ClientCurrent(false);
+        ccur.throttleX41(topic, subid, 10);
+
+        ccur.close();
+
+        // stop 410 server
+        s410.stop();
+        // stop bookkeeper cluster
+        bkc410.stop();
+    }
 }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java?rev=1394066&r1=1394065&r2=1394066&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java Thu Oct  4 13:30:31 2012
@@ -24,6 +24,7 @@ import com.google.protobuf.ByteString;
 import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 
 public class StubDeliveryManager implements DeliveryManager {
 
@@ -34,8 +35,11 @@ public class StubDeliveryManager impleme
         public DeliveryEndPoint endPoint;
         public ServerMessageFilter filter;
 
-        public StartServingRequest(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                   DeliveryEndPoint endPoint, ServerMessageFilter filter) {
+        public StartServingRequest(ByteString topic, ByteString subscriberId,
+                                   SubscriptionPreferences preferences,
+                                   MessageSeqId seqIdToStartFrom,
+                                   DeliveryEndPoint endPoint,
+                                   ServerMessageFilter filter) {
             this.topic = topic;
             this.subscriberId = subscriberId;
             this.seqIdToStartFrom = seqIdToStartFrom;
@@ -48,9 +52,13 @@ public class StubDeliveryManager impleme
     public Queue<Object> lastRequest = new LinkedList<Object>();
 
     @Override
-    public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
-                                         DeliveryEndPoint endPoint, ServerMessageFilter filter) {
-        lastRequest.add(new StartServingRequest(topic, subscriberId, seqIdToStartFrom, endPoint, filter));
+    public void startServingSubscription(ByteString topic, ByteString subscriberId,
+                                         SubscriptionPreferences preferences,
+                                         MessageSeqId seqIdToStartFrom,
+                                         DeliveryEndPoint endPoint,
+                                         ServerMessageFilter filter) {
+        lastRequest.add(new StartServingRequest(topic, subscriberId, preferences,
+                                                seqIdToStartFrom, endPoint, filter));
     }
 
     @Override
@@ -59,6 +67,12 @@ public class StubDeliveryManager impleme
     }
 
     @Override
+    public void messageConsumed(ByteString topic, ByteString subscriberId,
+                                MessageSeqId seqId) {
+        // do nothing
+    }
+
+    @Override
     public void start() {
     }
 

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java?rev=1394066&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/TestThrottlingDelivery.java Thu Oct  4 13:30:31 2012
@@ -0,0 +1,207 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import java.util.LinkedList;
+import java.util.List;
+import java.util.concurrent.CountDownLatch;
+import java.util.concurrent.TimeUnit;
+import java.util.concurrent.atomic.AtomicInteger;
+
+import org.junit.Before;
+import org.junit.Test;
+import static org.junit.Assert.*;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.HedwigClient;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
+import org.apache.hedwig.server.HedwigHubTestBase;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.Callback;
+
+public class TestThrottlingDelivery extends HedwigHubTestBase {
+
+    private static final int DEFAULT_THROTTLE_VALUE = 10;
+
+    protected class ThrottleDeliveryServerConfiguration extends HubServerConfiguration {
+
+        ThrottleDeliveryServerConfiguration(int serverPort, int sslServerPort) {
+            super(serverPort, sslServerPort);
+        }
+
+        @Override
+        public int getDefaultDeliveryThrottleValue() {
+            return DEFAULT_THROTTLE_VALUE;
+        }
+    }
+
+    protected class ThrottleDeliveryClientConfiguration extends ClientConfiguration {
+
+        int throttleValue;
+
+        ThrottleDeliveryClientConfiguration() {
+            this(DEFAULT_THROTTLE_VALUE);
+        }
+
+        ThrottleDeliveryClientConfiguration(int throttleValue) {
+            this.throttleValue = throttleValue;
+        }
+
+        @Override
+        public int getMaximumOutstandingMessages() {
+            return throttleValue;
+        }
+
+        void setThrottleValue(int throttleValue) {
+            this.throttleValue = throttleValue;
+        }
+
+        @Override
+        public boolean isAutoSendConsumeMessageEnabled() {
+            return false;
+        }
+    }
+
+    private void throttleX(Publisher pub, final Subscriber sub,
+                           ByteString topic, ByteString subid,
+                           final int X) throws Exception {
+        for (int i=1; i<=3*X; i++) {
+            pub.publish(topic, Message.newBuilder().setBody(
+                               ByteString.copyFromUtf8(String.valueOf(i))).build());
+        }
+        sub.subscribe(topic, subid, CreateOrAttach.ATTACH);
+
+        final AtomicInteger expected = new AtomicInteger(1);
+        final CountDownLatch throttleLatch = new CountDownLatch(1);
+        final CountDownLatch nonThrottleLatch = new CountDownLatch(1);
+        sub.startDelivery(topic, subid, new MessageHandler() {
+            @Override
+            public synchronized void deliver(ByteString topic, ByteString subscriberId,
+                                             Message msg,
+                                             Callback<Void> callback, Object context) {
+                try {
+                    int value = Integer.valueOf(msg.getBody().toStringUtf8());
+                    logger.debug("Received message {},", value);
+
+                    if (value == expected.get()) {
+                        expected.incrementAndGet();
+                    } else {
+                        // error condition
+                        logger.error("Did not receive expected value, expected {}, got {}",
+                                     expected.get(), value);
+                        expected.set(0);
+                        throttleLatch.countDown();
+                        nonThrottleLatch.countDown();
+                    }
+                    if (expected.get() > X+1) {
+                        throttleLatch.countDown();
+                    }
+                    if (expected.get() == (3 * X + 1)) {
+                        nonThrottleLatch.countDown();
+                    }
+                    callback.operationFinished(context, null);
+                    if (expected.get() > X + 1) {
+                        sub.consume(topic, subscriberId, msg.getMsgId());
+                    }      
+                } catch (Exception e) {
+                    logger.error("Received bad message", e);
+                    throttleLatch.countDown();
+                    nonThrottleLatch.countDown();
+                }
+            }
+        });
+        assertFalse("Received more messages than throttle value " + X,
+                    throttleLatch.await(3, TimeUnit.SECONDS));
+        assertEquals("Should be expected messages with only " + (X+1), X+1, expected.get());
+
+        // consume messages to not throttle it
+        for (int i=1; i<=X; i++) {
+            sub.consume(topic, subid,
+                        MessageSeqId.newBuilder().setLocalComponent(i).build());
+        }
+
+        assertTrue("Timed out waiting for messages " + (3*X + 1),
+                   nonThrottleLatch.await(10, TimeUnit.SECONDS));
+        assertEquals("Should be expected message with " + (3*X + 1),
+                     3*X + 1, expected.get());
+
+        sub.stopDelivery(topic, subid);
+        sub.closeSubscription(topic, subid);
+    }
+
+    @Override
+    @Before
+    public void setUp() throws Exception {
+        numServers = 1;
+        super.setUp();
+    }
+
+    @Override
+    protected ServerConfiguration getServerConfiguration(int port, int sslPort) {
+        return new ThrottleDeliveryServerConfiguration(port, sslPort);
+    }
+
+    @Test
+    public void testServerSideThrottle() throws Exception {
+        int throttleValue = DEFAULT_THROTTLE_VALUE;
+        ThrottleDeliveryClientConfiguration conf =
+            new ThrottleDeliveryClientConfiguration();
+        HedwigClient client = new HedwigClient(conf);
+        Publisher pub = client.getPublisher();
+        Subscriber sub = client.getSubscriber();
+
+        ByteString topic = ByteString.copyFromUtf8("testServerSideThrottle"); 
+        ByteString subid = ByteString.copyFromUtf8("serverThrottleSub");
+        sub.subscribe(topic, subid, CreateOrAttach.CREATE);
+        sub.closeSubscription(topic, subid);
+
+        // throttle with hub server's setting
+        throttleX(pub, sub, topic, subid, DEFAULT_THROTTLE_VALUE);
+
+        throttleValue = DEFAULT_THROTTLE_VALUE / 2;
+        // throttle with a lower value than hub server's setting
+        SubscriptionOptions.Builder optionsBuilder = SubscriptionOptions.newBuilder()
+            .setCreateOrAttach(CreateOrAttach.CREATE)
+            .setDeliveryThrottleValue(throttleValue);
+        topic = ByteString.copyFromUtf8("testServerSideThrottleWithLowerValue");
+        sub.subscribe(topic, subid, optionsBuilder.build());
+        sub.closeSubscription(topic, subid);
+        throttleX(pub, sub, topic, subid, throttleValue);
+
+        throttleValue = DEFAULT_THROTTLE_VALUE + 5;
+        // throttle with a higher value than hub server's setting
+        optionsBuilder = SubscriptionOptions.newBuilder()
+                         .setCreateOrAttach(CreateOrAttach.CREATE)
+                         .setDeliveryThrottleValue(throttleValue);
+        topic = ByteString.copyFromUtf8("testServerSideThrottleWithHigherValue");
+        sub.subscribe(topic, subid, optionsBuilder.build());
+        sub.closeSubscription(topic, subid);
+        throttleX(pub, sub, topic, subid, throttleValue);
+
+        client.close();
+    }
+
+}