You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/22 13:04:48 UTC

svn commit: r1400836 [1/2] - in /zookeeper/bookkeeper/trunk: ./ hedwig-client/src/main/java/org/apache/hedwig/client/handlers/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/ hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/s...

Author: ivank
Date: Mon Oct 22 11:04:47 2012
New Revision: 1400836

URL: http://svn.apache.org/viewvc?rev=1400836&view=rev
Log:
BOOKKEEPER-411: Add CloseSubscription Request for multiplexing support (sijie via ivank)

Added:
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java
Modified:
    zookeeper/bookkeeper/trunk/CHANGES.txt
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
    zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
    zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
    zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java

Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct 22 11:04:47 2012
@@ -188,6 +188,8 @@ Trunk (unreleased changes)
 
         BOOKKEEPER-435: Create SubscriptionChannelManager to manage all subscription channel (sijie via ivank)
 
+        BOOKKEEPER-411: Add CloseSubscription Request for multiplexing support (sijie via ivank)
+
       hedwig-client:
 
         BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)

Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java?rev=1400836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java Mon Oct 22 11:04:47 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class CloseSubscriptionResponseHandler extends AbstractResponseHandler {
+
+    private static Logger logger =
+        LoggerFactory.getLogger(CloseSubscriptionResponseHandler.class);
+
+    public CloseSubscriptionResponseHandler(ClientConfiguration cfg,
+                                            HChannelManager channelManager) {
+        super(cfg, channelManager);
+    }
+
+    @Override
+    public void handleResponse(final PubSubResponse response, final PubSubData pubSubData,
+                               final Channel channel)
+            throws Exception {
+        switch (response.getStatusCode()) {
+        case SUCCESS:
+            pubSubData.getCallback().operationFinished(pubSubData.context, null);
+            break;
+        case CLIENT_NOT_SUBSCRIBED:
+            // For closesubscription requests, the server says that the client was
+            // never subscribed to the topic.
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ClientNotSubscribedException(
+                                                    "Client was never subscribed to topic: " +
+                                                        pubSubData.topic.toStringUtf8() + ", subscriberId: " +
+                                                        pubSubData.subscriberId.toStringUtf8()));
+            break;
+        case SERVICE_DOWN:
+            // Response was service down failure so just invoke the callback's
+            // operationFailed method.
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+                                                    "Server responded with a SERVICE_DOWN status"));
+            break;
+        case NOT_RESPONSIBLE_FOR_TOPIC:
+            // Redirect response so we'll need to repost the original
+            // Unsubscribe Request
+            handleRedirectResponse(response, pubSubData, channel);
+            break;
+        default:
+            // Consider all other status codes as errors, operation failed
+            // cases.
+            logger.error("Unexpected error response from server for PubSubResponse: " + response);
+            pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+                                                    "Server responded with a status code of: " +
+                                                        response.getStatusCode()));
+            break;
+        }
+    }
+
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Mon Oct 22 11:04:47 2012
@@ -23,6 +23,7 @@ import org.jboss.netty.channel.Channel;
 
 import org.apache.hedwig.client.data.PubSubData;
 import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
@@ -90,6 +91,11 @@ public class NetUtils {
             // Set the UnsubscribeRequest into the outer PubSubRequest
             pubsubRequestBuilder.setUnsubscribeRequest(buildUnsubscribeRequest(pubSubData));
             break;
+        case CLOSESUBSCRIPTION:
+            // Set the CloseSubscriptionRequest into the outer PubSubRequest
+            pubsubRequestBuilder.setCloseSubscriptionRequest(
+                buildCloseSubscriptionRequest(pubSubData));
+            break;
         }
 
         // Update the PubSubData with the txnId and the requestWriteTime
@@ -133,6 +139,16 @@ public class NetUtils {
         return unsubscribeRequestBuilder;
     }
 
+    // build closesubscription request
+    private static CloseSubscriptionRequest.Builder
+        buildCloseSubscriptionRequest(PubSubData pubSubData) {
+        // Create the CloseSubscriptionRequest
+        CloseSubscriptionRequest.Builder closeSubscriptionRequestBuilder =
+            CloseSubscriptionRequest.newBuilder();
+        closeSubscriptionRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+        return closeSubscriptionRequestBuilder;
+    }
+
     /**
      * Build consume request
      *

Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java Mon Oct 22 11:04:47 2012
@@ -22,6 +22,7 @@ import java.util.Map;
 
 import org.apache.hedwig.client.conf.ClientConfiguration;
 import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
 import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
 import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
 import org.apache.hedwig.client.netty.impl.HChannelHandler;
@@ -40,6 +41,8 @@ public class SimpleSubscriptionChannelPi
             new HashMap<OperationType, AbstractResponseHandler>();
         handlers.put(OperationType.SUBSCRIBE,
                      new SimpleSubscribeResponseHandler(cfg, channelManager));
+        handlers.put(OperationType.CLOSESUBSCRIPTION,
+                     new CloseSubscriptionResponseHandler(cfg, channelManager));
         return handlers;
     }
 

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Mon Oct 22 11:04:47 2012
@@ -82,6 +82,7 @@ public final class PubSubProtocol {
     UNSUBSCRIBE(3, 3),
     START_DELIVERY(4, 4),
     STOP_DELIVERY(5, 5),
+    CLOSESUBSCRIPTION(6, 6),
     ;
     
     public static final int PUBLISH_VALUE = 0;
@@ -90,6 +91,7 @@ public final class PubSubProtocol {
     public static final int UNSUBSCRIBE_VALUE = 3;
     public static final int START_DELIVERY_VALUE = 4;
     public static final int STOP_DELIVERY_VALUE = 5;
+    public static final int CLOSESUBSCRIPTION_VALUE = 6;
     
     
     public final int getNumber() { return value; }
@@ -102,6 +104,7 @@ public final class PubSubProtocol {
         case 3: return UNSUBSCRIBE;
         case 4: return START_DELIVERY;
         case 5: return STOP_DELIVERY;
+        case 6: return CLOSESUBSCRIPTION;
         default: return null;
       }
     }
@@ -132,7 +135,7 @@ public final class PubSubProtocol {
     }
     
     private static final OperationType[] VALUES = {
-      PUBLISH, SUBSCRIBE, CONSUME, UNSUBSCRIBE, START_DELIVERY, STOP_DELIVERY, 
+      PUBLISH, SUBSCRIBE, CONSUME, UNSUBSCRIBE, START_DELIVERY, STOP_DELIVERY, CLOSESUBSCRIPTION, 
     };
     
     public static OperationType valueOf(
@@ -3652,6 +3655,11 @@ public final class PubSubProtocol {
     boolean hasStartDeliveryRequest();
     org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest getStartDeliveryRequest();
     org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequestOrBuilder getStartDeliveryRequestOrBuilder();
+    
+    // optional .Hedwig.CloseSubscriptionRequest closeSubscriptionRequest = 58;
+    boolean hasCloseSubscriptionRequest();
+    org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getCloseSubscriptionRequest();
+    org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder getCloseSubscriptionRequestOrBuilder();
   }
   public static final class PubSubRequest extends
       com.google.protobuf.GeneratedMessage
@@ -3824,6 +3832,19 @@ public final class PubSubProtocol {
       return startDeliveryRequest_;
     }
     
+    // optional .Hedwig.CloseSubscriptionRequest closeSubscriptionRequest = 58;
+    public static final int CLOSESUBSCRIPTIONREQUEST_FIELD_NUMBER = 58;
+    private org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest closeSubscriptionRequest_;
+    public boolean hasCloseSubscriptionRequest() {
+      return ((bitField0_ & 0x00000800) == 0x00000800);
+    }
+    public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getCloseSubscriptionRequest() {
+      return closeSubscriptionRequest_;
+    }
+    public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder getCloseSubscriptionRequestOrBuilder() {
+      return closeSubscriptionRequest_;
+    }
+    
     private void initFields() {
       protocolVersion_ = org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion.VERSION_ONE;
       type_ = org.apache.hedwig.protocol.PubSubProtocol.OperationType.PUBLISH;
@@ -3837,6 +3858,7 @@ public final class PubSubProtocol {
       unsubscribeRequest_ = org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest.getDefaultInstance();
       stopDeliveryRequest_ = org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest.getDefaultInstance();
       startDeliveryRequest_ = org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest.getDefaultInstance();
+      closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
     }
     private byte memoizedIsInitialized = -1;
     public final boolean isInitialized() {
@@ -3895,6 +3917,12 @@ public final class PubSubProtocol {
           return false;
         }
       }
+      if (hasCloseSubscriptionRequest()) {
+        if (!getCloseSubscriptionRequest().isInitialized()) {
+          memoizedIsInitialized = 0;
+          return false;
+        }
+      }
       memoizedIsInitialized = 1;
       return true;
     }
@@ -3938,6 +3966,9 @@ public final class PubSubProtocol {
       if (((bitField0_ & 0x00000400) == 0x00000400)) {
         output.writeMessage(57, startDeliveryRequest_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        output.writeMessage(58, closeSubscriptionRequest_);
+      }
       getUnknownFields().writeTo(output);
     }
     
@@ -4000,6 +4031,10 @@ public final class PubSubProtocol {
         size += com.google.protobuf.CodedOutputStream
           .computeMessageSize(57, startDeliveryRequest_);
       }
+      if (((bitField0_ & 0x00000800) == 0x00000800)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeMessageSize(58, closeSubscriptionRequest_);
+      }
       size += getUnknownFields().getSerializedSize();
       memoizedSerializedSize = size;
       return size;
@@ -4122,6 +4157,7 @@ public final class PubSubProtocol {
           getUnsubscribeRequestFieldBuilder();
           getStopDeliveryRequestFieldBuilder();
           getStartDeliveryRequestFieldBuilder();
+          getCloseSubscriptionRequestFieldBuilder();
         }
       }
       private static Builder create() {
@@ -4178,6 +4214,12 @@ public final class PubSubProtocol {
           startDeliveryRequestBuilder_.clear();
         }
         bitField0_ = (bitField0_ & ~0x00000800);
+        if (closeSubscriptionRequestBuilder_ == null) {
+          closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+        } else {
+          closeSubscriptionRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00001000);
         return this;
       }
       
@@ -4289,6 +4331,14 @@ public final class PubSubProtocol {
         } else {
           result.startDeliveryRequest_ = startDeliveryRequestBuilder_.build();
         }
+        if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+          to_bitField0_ |= 0x00000800;
+        }
+        if (closeSubscriptionRequestBuilder_ == null) {
+          result.closeSubscriptionRequest_ = closeSubscriptionRequest_;
+        } else {
+          result.closeSubscriptionRequest_ = closeSubscriptionRequestBuilder_.build();
+        }
         result.bitField0_ = to_bitField0_;
         onBuilt();
         return result;
@@ -4348,6 +4398,9 @@ public final class PubSubProtocol {
         if (other.hasStartDeliveryRequest()) {
           mergeStartDeliveryRequest(other.getStartDeliveryRequest());
         }
+        if (other.hasCloseSubscriptionRequest()) {
+          mergeCloseSubscriptionRequest(other.getCloseSubscriptionRequest());
+        }
         this.mergeUnknownFields(other.getUnknownFields());
         return this;
       }
@@ -4405,6 +4458,12 @@ public final class PubSubProtocol {
             return false;
           }
         }
+        if (hasCloseSubscriptionRequest()) {
+          if (!getCloseSubscriptionRequest().isInitialized()) {
+            
+            return false;
+          }
+        }
         return true;
       }
       
@@ -4527,6 +4586,15 @@ public final class PubSubProtocol {
               setStartDeliveryRequest(subBuilder.buildPartial());
               break;
             }
+            case 466: {
+              org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder subBuilder = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.newBuilder();
+              if (hasCloseSubscriptionRequest()) {
+                subBuilder.mergeFrom(getCloseSubscriptionRequest());
+              }
+              input.readMessage(subBuilder, extensionRegistry);
+              setCloseSubscriptionRequest(subBuilder.buildPartial());
+              break;
+            }
           }
         }
       }
@@ -5238,6 +5306,96 @@ public final class PubSubProtocol {
         return startDeliveryRequestBuilder_;
       }
       
+      // optional .Hedwig.CloseSubscriptionRequest closeSubscriptionRequest = 58;
+      private org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder> closeSubscriptionRequestBuilder_;
+      public boolean hasCloseSubscriptionRequest() {
+        return ((bitField0_ & 0x00001000) == 0x00001000);
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getCloseSubscriptionRequest() {
+        if (closeSubscriptionRequestBuilder_ == null) {
+          return closeSubscriptionRequest_;
+        } else {
+          return closeSubscriptionRequestBuilder_.getMessage();
+        }
+      }
+      public Builder setCloseSubscriptionRequest(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest value) {
+        if (closeSubscriptionRequestBuilder_ == null) {
+          if (value == null) {
+            throw new NullPointerException();
+          }
+          closeSubscriptionRequest_ = value;
+          onChanged();
+        } else {
+          closeSubscriptionRequestBuilder_.setMessage(value);
+        }
+        bitField0_ |= 0x00001000;
+        return this;
+      }
+      public Builder setCloseSubscriptionRequest(
+          org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder builderForValue) {
+        if (closeSubscriptionRequestBuilder_ == null) {
+          closeSubscriptionRequest_ = builderForValue.build();
+          onChanged();
+        } else {
+          closeSubscriptionRequestBuilder_.setMessage(builderForValue.build());
+        }
+        bitField0_ |= 0x00001000;
+        return this;
+      }
+      public Builder mergeCloseSubscriptionRequest(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest value) {
+        if (closeSubscriptionRequestBuilder_ == null) {
+          if (((bitField0_ & 0x00001000) == 0x00001000) &&
+              closeSubscriptionRequest_ != org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance()) {
+            closeSubscriptionRequest_ =
+              org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.newBuilder(closeSubscriptionRequest_).mergeFrom(value).buildPartial();
+          } else {
+            closeSubscriptionRequest_ = value;
+          }
+          onChanged();
+        } else {
+          closeSubscriptionRequestBuilder_.mergeFrom(value);
+        }
+        bitField0_ |= 0x00001000;
+        return this;
+      }
+      public Builder clearCloseSubscriptionRequest() {
+        if (closeSubscriptionRequestBuilder_ == null) {
+          closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+          onChanged();
+        } else {
+          closeSubscriptionRequestBuilder_.clear();
+        }
+        bitField0_ = (bitField0_ & ~0x00001000);
+        return this;
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder getCloseSubscriptionRequestBuilder() {
+        bitField0_ |= 0x00001000;
+        onChanged();
+        return getCloseSubscriptionRequestFieldBuilder().getBuilder();
+      }
+      public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder getCloseSubscriptionRequestOrBuilder() {
+        if (closeSubscriptionRequestBuilder_ != null) {
+          return closeSubscriptionRequestBuilder_.getMessageOrBuilder();
+        } else {
+          return closeSubscriptionRequest_;
+        }
+      }
+      private com.google.protobuf.SingleFieldBuilder<
+          org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder> 
+          getCloseSubscriptionRequestFieldBuilder() {
+        if (closeSubscriptionRequestBuilder_ == null) {
+          closeSubscriptionRequestBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+              org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder>(
+                  closeSubscriptionRequest_,
+                  getParentForChildren(),
+                  isClean());
+          closeSubscriptionRequest_ = null;
+        }
+        return closeSubscriptionRequestBuilder_;
+      }
+      
       // @@protoc_insertion_point(builder_scope:Hedwig.PubSubRequest)
     }
     
@@ -9489,6 +9647,355 @@ public final class PubSubProtocol {
     // @@protoc_insertion_point(class_scope:Hedwig.StartDeliveryRequest)
   }
   
+  public interface CloseSubscriptionRequestOrBuilder
+      extends com.google.protobuf.MessageOrBuilder {
+    
+    // required bytes subscriberId = 2;
+    boolean hasSubscriberId();
+    com.google.protobuf.ByteString getSubscriberId();
+  }
+  public static final class CloseSubscriptionRequest extends
+      com.google.protobuf.GeneratedMessage
+      implements CloseSubscriptionRequestOrBuilder {
+    // Use CloseSubscriptionRequest.newBuilder() to construct.
+    private CloseSubscriptionRequest(Builder builder) {
+      super(builder);
+    }
+    private CloseSubscriptionRequest(boolean noInit) {}
+    
+    private static final CloseSubscriptionRequest defaultInstance;
+    public static CloseSubscriptionRequest getDefaultInstance() {
+      return defaultInstance;
+    }
+    
+    public CloseSubscriptionRequest getDefaultInstanceForType() {
+      return defaultInstance;
+    }
+    
+    public static final com.google.protobuf.Descriptors.Descriptor
+        getDescriptor() {
+      return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_descriptor;
+    }
+    
+    protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+        internalGetFieldAccessorTable() {
+      return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable;
+    }
+    
+    private int bitField0_;
+    // required bytes subscriberId = 2;
+    public static final int SUBSCRIBERID_FIELD_NUMBER = 2;
+    private com.google.protobuf.ByteString subscriberId_;
+    public boolean hasSubscriberId() {
+      return ((bitField0_ & 0x00000001) == 0x00000001);
+    }
+    public com.google.protobuf.ByteString getSubscriberId() {
+      return subscriberId_;
+    }
+    
+    private void initFields() {
+      subscriberId_ = com.google.protobuf.ByteString.EMPTY;
+    }
+    private byte memoizedIsInitialized = -1;
+    public final boolean isInitialized() {
+      byte isInitialized = memoizedIsInitialized;
+      if (isInitialized != -1) return isInitialized == 1;
+      
+      if (!hasSubscriberId()) {
+        memoizedIsInitialized = 0;
+        return false;
+      }
+      memoizedIsInitialized = 1;
+      return true;
+    }
+    
+    public void writeTo(com.google.protobuf.CodedOutputStream output)
+                        throws java.io.IOException {
+      getSerializedSize();
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        output.writeBytes(2, subscriberId_);
+      }
+      getUnknownFields().writeTo(output);
+    }
+    
+    private int memoizedSerializedSize = -1;
+    public int getSerializedSize() {
+      int size = memoizedSerializedSize;
+      if (size != -1) return size;
+    
+      size = 0;
+      if (((bitField0_ & 0x00000001) == 0x00000001)) {
+        size += com.google.protobuf.CodedOutputStream
+          .computeBytesSize(2, subscriberId_);
+      }
+      size += getUnknownFields().getSerializedSize();
+      memoizedSerializedSize = size;
+      return size;
+    }
+    
+    private static final long serialVersionUID = 0L;
+    @java.lang.Override
+    protected java.lang.Object writeReplace()
+        throws java.io.ObjectStreamException {
+      return super.writeReplace();
+    }
+    
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+        com.google.protobuf.ByteString data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+        com.google.protobuf.ByteString data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(byte[] data)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+        byte[] data,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws com.google.protobuf.InvalidProtocolBufferException {
+      return newBuilder().mergeFrom(data, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseDelimitedFrom(java.io.InputStream input)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseDelimitedFrom(
+        java.io.InputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      Builder builder = newBuilder();
+      if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+        return builder.buildParsed();
+      } else {
+        return null;
+      }
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+        com.google.protobuf.CodedInputStream input)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input).buildParsed();
+    }
+    public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+        com.google.protobuf.CodedInputStream input,
+        com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+        throws java.io.IOException {
+      return newBuilder().mergeFrom(input, extensionRegistry)
+               .buildParsed();
+    }
+    
+    public static Builder newBuilder() { return Builder.create(); }
+    public Builder newBuilderForType() { return newBuilder(); }
+    public static Builder newBuilder(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest prototype) {
+      return newBuilder().mergeFrom(prototype);
+    }
+    public Builder toBuilder() { return newBuilder(this); }
+    
+    @java.lang.Override
+    protected Builder newBuilderForType(
+        com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+      Builder builder = new Builder(parent);
+      return builder;
+    }
+    public static final class Builder extends
+        com.google.protobuf.GeneratedMessage.Builder<Builder>
+       implements org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder {
+      public static final com.google.protobuf.Descriptors.Descriptor
+          getDescriptor() {
+        return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_descriptor;
+      }
+      
+      protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+          internalGetFieldAccessorTable() {
+        return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable;
+      }
+      
+      // Construct using org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.newBuilder()
+      private Builder() {
+        maybeForceBuilderInitialization();
+      }
+      
+      private Builder(BuilderParent parent) {
+        super(parent);
+        maybeForceBuilderInitialization();
+      }
+      private void maybeForceBuilderInitialization() {
+        if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+        }
+      }
+      private static Builder create() {
+        return new Builder();
+      }
+      
+      public Builder clear() {
+        super.clear();
+        subscriberId_ = com.google.protobuf.ByteString.EMPTY;
+        bitField0_ = (bitField0_ & ~0x00000001);
+        return this;
+      }
+      
+      public Builder clone() {
+        return create().mergeFrom(buildPartial());
+      }
+      
+      public com.google.protobuf.Descriptors.Descriptor
+          getDescriptorForType() {
+        return org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDescriptor();
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getDefaultInstanceForType() {
+        return org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest build() {
+        org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(result);
+        }
+        return result;
+      }
+      
+      private org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest buildParsed()
+          throws com.google.protobuf.InvalidProtocolBufferException {
+        org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest result = buildPartial();
+        if (!result.isInitialized()) {
+          throw newUninitializedMessageException(
+            result).asInvalidProtocolBufferException();
+        }
+        return result;
+      }
+      
+      public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest buildPartial() {
+        org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest result = new org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest(this);
+        int from_bitField0_ = bitField0_;
+        int to_bitField0_ = 0;
+        if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+          to_bitField0_ |= 0x00000001;
+        }
+        result.subscriberId_ = subscriberId_;
+        result.bitField0_ = to_bitField0_;
+        onBuilt();
+        return result;
+      }
+      
+      public Builder mergeFrom(com.google.protobuf.Message other) {
+        if (other instanceof org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest) {
+          return mergeFrom((org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest)other);
+        } else {
+          super.mergeFrom(other);
+          return this;
+        }
+      }
+      
+      public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest other) {
+        if (other == org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance()) return this;
+        if (other.hasSubscriberId()) {
+          setSubscriberId(other.getSubscriberId());
+        }
+        this.mergeUnknownFields(other.getUnknownFields());
+        return this;
+      }
+      
+      public final boolean isInitialized() {
+        if (!hasSubscriberId()) {
+          
+          return false;
+        }
+        return true;
+      }
+      
+      public Builder mergeFrom(
+          com.google.protobuf.CodedInputStream input,
+          com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+          throws java.io.IOException {
+        com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+          com.google.protobuf.UnknownFieldSet.newBuilder(
+            this.getUnknownFields());
+        while (true) {
+          int tag = input.readTag();
+          switch (tag) {
+            case 0:
+              this.setUnknownFields(unknownFields.build());
+              onChanged();
+              return this;
+            default: {
+              if (!parseUnknownField(input, unknownFields,
+                                     extensionRegistry, tag)) {
+                this.setUnknownFields(unknownFields.build());
+                onChanged();
+                return this;
+              }
+              break;
+            }
+            case 18: {
+              bitField0_ |= 0x00000001;
+              subscriberId_ = input.readBytes();
+              break;
+            }
+          }
+        }
+      }
+      
+      private int bitField0_;
+      
+      // required bytes subscriberId = 2;
+      private com.google.protobuf.ByteString subscriberId_ = com.google.protobuf.ByteString.EMPTY;
+      public boolean hasSubscriberId() {
+        return ((bitField0_ & 0x00000001) == 0x00000001);
+      }
+      public com.google.protobuf.ByteString getSubscriberId() {
+        return subscriberId_;
+      }
+      public Builder setSubscriberId(com.google.protobuf.ByteString value) {
+        if (value == null) {
+    throw new NullPointerException();
+  }
+  bitField0_ |= 0x00000001;
+        subscriberId_ = value;
+        onChanged();
+        return this;
+      }
+      public Builder clearSubscriberId() {
+        bitField0_ = (bitField0_ & ~0x00000001);
+        subscriberId_ = getDefaultInstance().getSubscriberId();
+        onChanged();
+        return this;
+      }
+      
+      // @@protoc_insertion_point(builder_scope:Hedwig.CloseSubscriptionRequest)
+    }
+    
+    static {
+      defaultInstance = new CloseSubscriptionRequest(true);
+      defaultInstance.initFields();
+    }
+    
+    // @@protoc_insertion_point(class_scope:Hedwig.CloseSubscriptionRequest)
+  }
+  
   public interface PubSubResponseOrBuilder
       extends com.google.protobuf.MessageOrBuilder {
     
@@ -15408,6 +15915,11 @@ public final class PubSubProtocol {
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
       internal_static_Hedwig_StartDeliveryRequest_fieldAccessorTable;
   private static com.google.protobuf.Descriptors.Descriptor
+    internal_static_Hedwig_CloseSubscriptionRequest_descriptor;
+  private static
+    com.google.protobuf.GeneratedMessage.FieldAccessorTable
+      internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable;
+  private static com.google.protobuf.Descriptors.Descriptor
     internal_static_Hedwig_PubSubResponse_descriptor;
   private static
     com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -15482,7 +15994,7 @@ public final class PubSubProtocol {
       "4\n\023RegionSpecificSeqId\022\016\n\006region\030\001 \002(\014\022\r" +
       "\n\005seqId\030\002 \002(\004\"]\n\014MessageSeqId\022\026\n\016localCo",
       "mponent\030\001 \001(\004\0225\n\020remoteComponents\030\002 \003(\0132" +
-      "\033.Hedwig.RegionSpecificSeqId\"\361\003\n\rPubSubR" +
+      "\033.Hedwig.RegionSpecificSeqId\"\265\004\n\rPubSubR" +
       "equest\0220\n\017protocolVersion\030\001 \002(\0162\027.Hedwig" +
       ".ProtocolVersion\022#\n\004type\030\002 \002(\0162\025.Hedwig." +
       "OperationType\022\024\n\014triedServers\030\003 \003(\014\022\r\n\005t" +
@@ -15495,75 +16007,79 @@ public final class PubSubProtocol {
       "ibeRequest\0228\n\023stopDeliveryRequest\0308 \001(\0132" +
       "\033.Hedwig.StopDeliveryRequest\022:\n\024startDel" +
       "iveryRequest\0309 \001(\0132\034.Hedwig.StartDeliver" +
-      "yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
-      "\017.Hedwig.Message\"\177\n\027SubscriptionPreferen" +
-      "ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
-      "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022\031" +
-      "\n\021messageWindowSize\030\004 \001(\r\"\277\002\n\020SubscribeR" +
-      "equest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016createOr",
-      "Attach\030\003 \001(\0162\'.Hedwig.SubscribeRequest.C" +
-      "reateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013synch" +
-      "ronous\030\004 \001(\010:\005false\022\024\n\014messageBound\030\005 \001(" +
-      "\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Subscrip" +
-      "tionPreferences\022\032\n\013forceAttach\030\007 \001(\010:\005fa" +
-      "lse\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n\n\006ATT" +
-      "ACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\216\002\n\023Subscrip" +
-      "tionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005false\022" +
-      "Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Subscri" +
-      "beRequest.CreateOrAttach:\020CREATE_OR_ATTA",
-      "CH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007options\030\004" +
-      " \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030\005 \001(\t" +
-      "\022\031\n\021messageWindowSize\030\006 \001(\r\022\037\n\021enableRes" +
-      "ubscribe\030\007 \001(\010:\004true\"K\n\016ConsumeRequest\022\024" +
-      "\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.He" +
-      "dwig.MessageSeqId\"*\n\022UnsubscribeRequest\022" +
-      "\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDeliveryReq" +
-      "uest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024StartDeliv" +
-      "eryRequest\022\024\n\014subscriberId\030\002 \002(\014\"\377\001\n\016Pub" +
-      "SubResponse\0220\n\017protocolVersion\030\001 \002(\0162\027.H",
-      "edwig.ProtocolVersion\022&\n\nstatusCode\030\002 \002(" +
-      "\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\t" +
-      "statusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(\0132\017.Hedwi" +
-      "g.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014subscriberId" +
-      "\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024.Hedwig.Re" +
-      "sponseBody\"?\n\017PublishResponse\022,\n\016publish" +
-      "edMsgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\"I\n\021S" +
-      "ubscribeResponse\0224\n\013preferences\030\002 \001(\0132\037." +
-      "Hedwig.SubscriptionPreferences\"v\n\014Respon" +
-      "seBody\0220\n\017publishResponse\030\001 \001(\0132\027.Hedwig",
-      ".PublishResponse\0224\n\021subscribeResponse\030\002 " +
-      "\001(\0132\031.Hedwig.SubscribeResponse\"N\n\021Subscr" +
-      "iptionState\022#\n\005msgId\030\001 \002(\0132\024.Hedwig.Mess" +
-      "ageSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscr" +
-      "iptionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subsc" +
-      "riptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedw" +
-      "ig.SubscriptionPreferences\"O\n\013LedgerRang" +
-      "e\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030" +
-      "\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRan" +
-      "ges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange",
-      "\":\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016" +
-      "managerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010h" +
-      "ostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadD" +
-      "ata\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersio" +
-      "n\022\017\n\013VERSION_ONE\020\001*p\n\rOperationType\022\013\n\007P" +
-      "UBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013" +
-      "UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTO" +
-      "P_DELIVERY\020\005*D\n\021SubscriptionEvent\022\017\n\013TOP" +
-      "IC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORCED_CLOSED" +
-      "\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFOR",
-      "MED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CL" +
-      "IENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT" +
-      "_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017" +
-      "\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_T" +
-      "OPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_" +
-      "STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FILTER\020\370\003\022\020\n" +
-      "\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_" +
-      "INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_INFO_EXISTS" +
-      "\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004\022\036\n\031SUBSC" +
-      "RIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNE",
-      "R_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_EXISTS\020\216\004\022" +
-      "\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020" +
-      "\274\005B\036\n\032org.apache.hedwig.protocolH\001"
+      "yRequest\022B\n\030closeSubscriptionRequest\030: \001" +
+      "(\0132 .Hedwig.CloseSubscriptionRequest\".\n\016" +
+      "PublishRequest\022\034\n\003msg\030\002 \002(\0132\017.Hedwig.Mes" +
+      "sage\"\177\n\027SubscriptionPreferences\022\034\n\007optio" +
+      "ns\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014messageBound\030\002 " +
+      "\001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022\031\n\021messageWin",
+      "dowSize\030\004 \001(\r\"\277\002\n\020SubscribeRequest\022\024\n\014su" +
+      "bscriberId\030\002 \002(\014\022Q\n\016createOrAttach\030\003 \001(\016" +
+      "2\'.Hedwig.SubscribeRequest.CreateOrAttac" +
+      "h:\020CREATE_OR_ATTACH\022\032\n\013synchronous\030\004 \001(\010" +
+      ":\005false\022\024\n\014messageBound\030\005 \001(\r\0224\n\013prefere" +
+      "nces\030\006 \001(\0132\037.Hedwig.SubscriptionPreferen" +
+      "ces\022\032\n\013forceAttach\030\007 \001(\010:\005false\">\n\016Creat" +
+      "eOrAttach\022\n\n\006CREATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CRE" +
+      "ATE_OR_ATTACH\020\002\"\216\002\n\023SubscriptionOptions\022" +
+      "\032\n\013forceAttach\030\001 \001(\010:\005false\022Q\n\016createOrA",
+      "ttach\030\002 \001(\0162\'.Hedwig.SubscribeRequest.Cr" +
+      "eateOrAttach:\020CREATE_OR_ATTACH\022\027\n\014messag" +
+      "eBound\030\003 \001(\r:\0010\022\034\n\007options\030\004 \001(\0132\013.Hedwi" +
+      "g.Map\022\025\n\rmessageFilter\030\005 \001(\t\022\031\n\021messageW" +
+      "indowSize\030\006 \001(\r\022\037\n\021enableResubscribe\030\007 \001" +
+      "(\010:\004true\"K\n\016ConsumeRequest\022\024\n\014subscriber" +
+      "Id\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.Hedwig.Message" +
+      "SeqId\"*\n\022UnsubscribeRequest\022\024\n\014subscribe" +
+      "rId\030\002 \002(\014\"+\n\023StopDeliveryRequest\022\024\n\014subs" +
+      "criberId\030\002 \002(\014\",\n\024StartDeliveryRequest\022\024",
+      "\n\014subscriberId\030\002 \002(\014\"0\n\030CloseSubscriptio" +
+      "nRequest\022\024\n\014subscriberId\030\002 \002(\014\"\377\001\n\016PubSu" +
+      "bResponse\0220\n\017protocolVersion\030\001 \002(\0162\027.Hed" +
+      "wig.ProtocolVersion\022&\n\nstatusCode\030\002 \002(\0162" +
+      "\022.Hedwig.StatusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\tst" +
+      "atusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(\0132\017.Hedwig." +
+      "Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014subscriberId\030\007" +
+      " \001(\014\022*\n\014responseBody\030\010 \001(\0132\024.Hedwig.Resp" +
+      "onseBody\"?\n\017PublishResponse\022,\n\016published" +
+      "MsgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\"I\n\021Sub",
+      "scribeResponse\0224\n\013preferences\030\002 \001(\0132\037.He" +
+      "dwig.SubscriptionPreferences\"v\n\014Response" +
+      "Body\0220\n\017publishResponse\030\001 \001(\0132\027.Hedwig.P" +
+      "ublishResponse\0224\n\021subscribeResponse\030\002 \001(" +
+      "\0132\031.Hedwig.SubscribeResponse\"N\n\021Subscrip" +
+      "tionState\022#\n\005msgId\030\001 \002(\0132\024.Hedwig.Messag" +
+      "eSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscrip" +
+      "tionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subscri" +
+      "ptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedwig" +
+      ".SubscriptionPreferences\"O\n\013LedgerRange\022",
+      "\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 " +
+      "\001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRange" +
+      "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange\":" +
+      "\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016ma" +
+      "nagerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hos" +
+      "tname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadDat" +
+      "a\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersion\022" +
+      "\017\n\013VERSION_ONE\020\001*\207\001\n\rOperationType\022\013\n\007PU" +
+      "BLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013U" +
+      "NSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP",
+      "_DELIVERY\020\005\022\025\n\021CLOSESUBSCRIPTION\020\006*D\n\021Su" +
+      "bscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBS" +
+      "CRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusCode\022" +
+      "\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\r" +
+      "NO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSC" +
+      "RIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021" +
+      "COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n" +
+      "\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE" +
+      "_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVALI" +
+      "D_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n",
+      "\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_P" +
+      "ERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIP" +
+      "TION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXIS" +
+      "TS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC" +
+      "_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_COND" +
+      "ITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apache.h" +
+      "edwig.protocolH\001"
     };
     com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
       new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15623,7 +16139,7 @@ public final class PubSubProtocol {
           internal_static_Hedwig_PubSubRequest_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_PubSubRequest_descriptor,
-              new java.lang.String[] { "ProtocolVersion", "Type", "TriedServers", "TxnId", "ShouldClaim", "Topic", "PublishRequest", "SubscribeRequest", "ConsumeRequest", "UnsubscribeRequest", "StopDeliveryRequest", "StartDeliveryRequest", },
+              new java.lang.String[] { "ProtocolVersion", "Type", "TriedServers", "TxnId", "ShouldClaim", "Topic", "PublishRequest", "SubscribeRequest", "ConsumeRequest", "UnsubscribeRequest", "StopDeliveryRequest", "StartDeliveryRequest", "CloseSubscriptionRequest", },
               org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest.class,
               org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest.Builder.class);
           internal_static_Hedwig_PublishRequest_descriptor =
@@ -15690,8 +16206,16 @@ public final class PubSubProtocol {
               new java.lang.String[] { "SubscriberId", },
               org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest.class,
               org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest.Builder.class);
-          internal_static_Hedwig_PubSubResponse_descriptor =
+          internal_static_Hedwig_CloseSubscriptionRequest_descriptor =
             getDescriptor().getMessageTypes().get(14);
+          internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable = new
+            com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+              internal_static_Hedwig_CloseSubscriptionRequest_descriptor,
+              new java.lang.String[] { "SubscriberId", },
+              org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.class,
+              org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder.class);
+          internal_static_Hedwig_PubSubResponse_descriptor =
+            getDescriptor().getMessageTypes().get(15);
           internal_static_Hedwig_PubSubResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_PubSubResponse_descriptor,
@@ -15699,7 +16223,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse.class,
               org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse.Builder.class);
           internal_static_Hedwig_PublishResponse_descriptor =
-            getDescriptor().getMessageTypes().get(15);
+            getDescriptor().getMessageTypes().get(16);
           internal_static_Hedwig_PublishResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_PublishResponse_descriptor,
@@ -15707,7 +16231,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.class,
               org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.Builder.class);
           internal_static_Hedwig_SubscribeResponse_descriptor =
-            getDescriptor().getMessageTypes().get(16);
+            getDescriptor().getMessageTypes().get(17);
           internal_static_Hedwig_SubscribeResponse_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscribeResponse_descriptor,
@@ -15715,7 +16239,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder.class);
           internal_static_Hedwig_ResponseBody_descriptor =
-            getDescriptor().getMessageTypes().get(17);
+            getDescriptor().getMessageTypes().get(18);
           internal_static_Hedwig_ResponseBody_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_ResponseBody_descriptor,
@@ -15723,7 +16247,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.class,
               org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.Builder.class);
           internal_static_Hedwig_SubscriptionState_descriptor =
-            getDescriptor().getMessageTypes().get(18);
+            getDescriptor().getMessageTypes().get(19);
           internal_static_Hedwig_SubscriptionState_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionState_descriptor,
@@ -15731,7 +16255,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.Builder.class);
           internal_static_Hedwig_SubscriptionData_descriptor =
-            getDescriptor().getMessageTypes().get(19);
+            getDescriptor().getMessageTypes().get(20);
           internal_static_Hedwig_SubscriptionData_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_SubscriptionData_descriptor,
@@ -15739,7 +16263,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.class,
               org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.Builder.class);
           internal_static_Hedwig_LedgerRange_descriptor =
-            getDescriptor().getMessageTypes().get(20);
+            getDescriptor().getMessageTypes().get(21);
           internal_static_Hedwig_LedgerRange_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_LedgerRange_descriptor,
@@ -15747,7 +16271,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.class,
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.Builder.class);
           internal_static_Hedwig_LedgerRanges_descriptor =
-            getDescriptor().getMessageTypes().get(21);
+            getDescriptor().getMessageTypes().get(22);
           internal_static_Hedwig_LedgerRanges_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_LedgerRanges_descriptor,
@@ -15755,7 +16279,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.class,
               org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.Builder.class);
           internal_static_Hedwig_ManagerMeta_descriptor =
-            getDescriptor().getMessageTypes().get(22);
+            getDescriptor().getMessageTypes().get(23);
           internal_static_Hedwig_ManagerMeta_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_ManagerMeta_descriptor,
@@ -15763,7 +16287,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.class,
               org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.Builder.class);
           internal_static_Hedwig_HubInfoData_descriptor =
-            getDescriptor().getMessageTypes().get(23);
+            getDescriptor().getMessageTypes().get(24);
           internal_static_Hedwig_HubInfoData_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_HubInfoData_descriptor,
@@ -15771,7 +16295,7 @@ public final class PubSubProtocol {
               org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.class,
               org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.Builder.class);
           internal_static_Hedwig_HubLoadData_descriptor =
-            getDescriptor().getMessageTypes().get(24);
+            getDescriptor().getMessageTypes().get(25);
           internal_static_Hedwig_HubLoadData_fieldAccessorTable = new
             com.google.protobuf.GeneratedMessage.FieldAccessorTable(
               internal_static_Hedwig_HubLoadData_descriptor,

Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Mon Oct 22 11:04:47 2012
@@ -70,6 +70,9 @@ enum OperationType{
     //the following two are only used for the hedwig proxy
     START_DELIVERY = 4;
     STOP_DELIVERY = 5;
+    // end for requests only used for hedwig proxy
+
+    CLOSESUBSCRIPTION = 6;
 }
 
 /* A PubSubRequest is just a union of the various request types, with
@@ -95,6 +98,7 @@ message PubSubRequest{
     optional UnsubscribeRequest unsubscribeRequest = 55;
     optional StopDeliveryRequest stopDeliveryRequest = 56;
     optional StartDeliveryRequest startDeliveryRequest = 57;
+    optional CloseSubscriptionRequest closeSubscriptionRequest = 58;
 }
 
 
@@ -191,6 +195,10 @@ enum SubscriptionEvent {
     SUBSCRIPTION_FORCED_CLOSED = 2;
 }
 
+message CloseSubscriptionRequest {
+    required bytes subscriberId = 2;
+}
+
 message PubSubResponse{
     required ProtocolVersion protocolVersion = 1;
     required StatusCode statusCode = 2;

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Mon Oct 22 11:04:47 2012
@@ -19,8 +19,10 @@ package org.apache.hedwig.server.deliver
 
 import com.google.protobuf.ByteString;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.filter.ServerMessageFilter;
+import org.apache.hedwig.util.Callback;
 
 public interface DeliveryManager {
     public void start();
@@ -31,7 +33,17 @@ public interface DeliveryManager {
                                          DeliveryEndPoint endPoint,
                                          ServerMessageFilter filter);
 
-    public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
+    /**
+     * Stop serving a given subscription.
+     *
+     * @param topic
+     *          Topic Name
+     * @param subscriberId
+     *          Subscriber Id
+     */
+    public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
+                                      SubscriptionEvent event,
+                                      Callback<Void> callback, Object ctx);
 
     /**
      * Tell the delivery manager where that a subscriber has consumed

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Oct 22 11:04:47 2012
@@ -37,12 +37,14 @@ import com.google.protobuf.ByteString;
 
 import org.apache.bookkeeper.util.MathUtils;
 import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.filter.ServerMessageFilter;
 import org.apache.hedwig.protocol.PubSubProtocol.Message;
 import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
 import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
 import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.server.common.ServerConfiguration;
 import org.apache.hedwig.server.common.UnexpectedError;
@@ -52,12 +54,22 @@ import org.apache.hedwig.server.persiste
 import org.apache.hedwig.server.persistence.PersistenceManager;
 import org.apache.hedwig.server.persistence.ScanCallback;
 import org.apache.hedwig.server.persistence.ScanRequest;
+import org.apache.hedwig.util.Callback;
 import static org.apache.hedwig.util.VarArgs.va;
 
 public class FIFODeliveryManager implements Runnable, DeliveryManager {
 
     protected static final Logger logger = LoggerFactory.getLogger(FIFODeliveryManager.class);
 
+    private static Callback<Void> NOP_CALLBACK = new Callback<Void>() {
+        @Override
+        public void operationFinished(Object ctx, Void result) {
+        }
+        @Override
+        public void operationFailed(Object ctx, PubSubException exception) {
+        }
+    };
+
     protected interface DeliveryManagerRequest {
         public void performRequest();
     }
@@ -160,11 +172,16 @@ public class FIFODeliveryManager impleme
         enqueueWithoutFailure(subscriber);
     }
 
-    public void stopServingSubscriber(ByteString topic, ByteString subscriberId) {
-        ActiveSubscriberState subState = subscriberStates.get(new TopicSubscriber(topic, subscriberId));
+    public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
+                                      SubscriptionEvent event,
+                                      Callback<Void> cb, Object ctx) {
+        ActiveSubscriberState subState =
+            subscriberStates.get(new TopicSubscriber(topic, subscriberId));
 
         if (subState != null) {
-            stopServingSubscriber(subState);
+            stopServingSubscriber(subState, event, cb, ctx);
+        } else {
+            cb.operationFinished(ctx, null);
         }
     }
 
@@ -172,10 +189,19 @@ public class FIFODeliveryManager impleme
      * Due to some error or disconnection or unsusbcribe, someone asks us to
      * stop serving a particular endpoint
      *
-     * @param endPoint
-     */
-    protected void stopServingSubscriber(ActiveSubscriberState subscriber) {
-        enqueueWithoutFailure(new StopServingSubscriber(subscriber));
+     * @param subscriber
+     *          Subscriber instance
+     * @param event
+     *          Subscription event indicates why to stop subscriber.
+     * @param cb
+     *          Callback after the subscriber is stopped.
+     * @param ctx
+     *          Callback context
+     */
+    protected void stopServingSubscriber(ActiveSubscriberState subscriber,
+                                         SubscriptionEvent event,
+                                         Callback<Void> cb, Object ctx) {
+        enqueueWithoutFailure(new StopServingSubscriber(subscriber, event, cb, ctx));
     }
 
     /**
@@ -376,13 +402,19 @@ public class FIFODeliveryManager impleme
             }
         }
 
-        public void setNotConnected() {
+        public void setNotConnected(SubscriptionEvent event) {
             // have closed it.
             if (!isConnected()) {
                 return;
             }
             this.connected = false;
-            deliveryEndPoint.close();
+            if (null != event &&
+                (SubscriptionEvent.TOPIC_MOVED == event ||
+                 SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED == event)) {
+                // for we need to close the underlying when topic moved
+                // or subscription forced closed.
+                deliveryEndPoint.close();
+            }
             // uninitialize filter
             this.filter.uninitialize();
         }
@@ -549,7 +581,11 @@ public class FIFODeliveryManager impleme
 
 
         public void permanentErrorOnSend() {
-            stopServingSubscriber(this);
+            // the underlying channel is broken, the channel will
+            // be closed in UmbrellaHandler when exception happened.
+            // so we don't need to close the channel again
+            stopServingSubscriber(this, null,
+                                  NOP_CALLBACK, null);
         }
 
         public void transientErrorOnSend() {
@@ -563,10 +599,12 @@ public class FIFODeliveryManager impleme
         public void performRequest() {
 
             // Put this subscriber in the channel to subscriber mapping
-            ActiveSubscriberState prevSubscriber = subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
+            ActiveSubscriberState prevSubscriber =
+                subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
 
             if (prevSubscriber != null) {
-                stopServingSubscriber(prevSubscriber);
+                stopServingSubscriber(prevSubscriber, SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED,
+                                      NOP_CALLBACK, null);
             }
 
             lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
@@ -589,16 +627,24 @@ public class FIFODeliveryManager impleme
 
     protected class StopServingSubscriber implements DeliveryManagerRequest {
         ActiveSubscriberState subscriber;
-
-        public StopServingSubscriber(ActiveSubscriberState subscriber) {
+        SubscriptionEvent event;
+        final Callback<Void> cb;
+        final Object ctx;
+
+        public StopServingSubscriber(ActiveSubscriberState subscriber,
+                                     SubscriptionEvent event,
+                                     Callback<Void> callback, Object ctx) {
             this.subscriber = subscriber;
+            this.event = event;
+            this.cb = callback;
+            this.ctx = ctx;
         }
 
         @Override
         public void performRequest() {
 
             // This will automatically stop delivery, and disconnect the channel
-            subscriber.setNotConnected();
+            subscriber.setNotConnected(event);
 
             // if the subscriber has moved on, a move request for its delivery
             // pointer must be pending in the request queue. Note that the
@@ -609,6 +655,7 @@ public class FIFODeliveryManager impleme
                               true,
                               // pruneTopic=
                               true);
+            cb.operationFinished(ctx, null);
         }
 
     }

Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java?rev=1400836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java Mon Oct 22 11:04:47 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.ServerStats;
+import org.apache.hedwig.server.netty.ServerStats.OpStats;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class CloseSubscriptionHandler extends BaseHandler {
+    SubscriptionManager subMgr;
+    DeliveryManager deliveryMgr;
+    SubscriptionChannelManager subChannelMgr;
+    // op stats
+    final OpStats closesubStats;
+
+    public CloseSubscriptionHandler(ServerConfiguration cfg, TopicManager tm,
+                                    SubscriptionManager subMgr,
+                                    DeliveryManager deliveryMgr,
+                                    SubscriptionChannelManager subChannelMgr) {
+        super(tm, cfg);
+        this.subMgr = subMgr;
+        this.deliveryMgr = deliveryMgr;
+        this.subChannelMgr = subChannelMgr;
+        closesubStats = ServerStats.getInstance().getOpStats(OperationType.CLOSESUBSCRIPTION);
+    }
+
+    @Override
+    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+        if (!request.hasCloseSubscriptionRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing closesubscription request data");
+            closesubStats.incrementFailedOps();
+            return;
+        }
+
+        final CloseSubscriptionRequest closesubRequest =
+                request.getCloseSubscriptionRequest();
+        final ByteString topic = request.getTopic();
+        final ByteString subscriberId = closesubRequest.getSubscriberId();
+
+        final long requestTime = System.currentTimeMillis();
+
+        subMgr.closeSubscription(topic, subscriberId, new Callback<Void>() {
+            @Override
+            public void operationFinished(Object ctx, Void result) {
+                // we should not close the channel in delivery manager
+                // since client waits the response for closeSubscription request
+                // client side would close the channel
+                deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
+                new Callback<Void>() {
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                        closesubStats.incrementFailedOps();
+                    }
+                    @Override
+                    public void operationFinished(Object ctx, Void resultOfOperation) {
+                        // remove the topic subscription from subscription channels
+                        subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
+                                             channel);
+                        channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+                        closesubStats.updateLatency(System.currentTimeMillis() - requestTime);
+                    }
+                }, null);
+            }
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                closesubStats.incrementFailedOps();
+            }
+        }, null);
+    }
+}

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java Mon Oct 22 11:04:47 2012
@@ -21,9 +21,11 @@ import org.jboss.netty.channel.Channel;
 import com.google.protobuf.ByteString;
 
 import org.apache.bookkeeper.util.MathUtils;
+import org.apache.hedwig.client.data.TopicSubscriber;
 import org.apache.hedwig.exceptions.PubSubException;
 import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
 import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
 import org.apache.hedwig.protoextensions.PubSubResponseUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
@@ -34,18 +36,24 @@ import org.apache.hedwig.server.netty.Um
 import org.apache.hedwig.server.subscriptions.SubscriptionManager;
 import org.apache.hedwig.server.topics.TopicManager;
 import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
 
 public class UnsubscribeHandler extends BaseHandler {
     SubscriptionManager subMgr;
     DeliveryManager deliveryMgr;
+    SubscriptionChannelManager subChannelMgr;
     // op stats
     final OpStats unsubStats;
 
-    public UnsubscribeHandler(TopicManager tm, ServerConfiguration cfg, SubscriptionManager subMgr,
-                              DeliveryManager deliveryMgr) {
+    public UnsubscribeHandler(ServerConfiguration cfg,
+                              TopicManager tm,
+                              SubscriptionManager subMgr,
+                              DeliveryManager deliveryMgr,
+                              SubscriptionChannelManager subChannelMgr) {
         super(tm, cfg);
         this.subMgr = subMgr;
         this.deliveryMgr = deliveryMgr;
+        this.subChannelMgr = subChannelMgr;
         unsubStats = ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE);
     }
 
@@ -72,9 +80,25 @@ public class UnsubscribeHandler extends 
 
             @Override
             public void operationFinished(Object ctx, Void resultOfOperation) {
-                deliveryMgr.stopServingSubscriber(topic, subscriberId);
-                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
-                unsubStats.updateLatency(MathUtils.now() - requestTime);
+                // we should not close the channel in delivery manager
+                // since client waits the response for closeSubscription request
+                // client side would close the channel
+                deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
+                new Callback<Void>() {
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                        unsubStats.incrementFailedOps();
+                    }
+                    @Override
+                    public void operationFinished(Object ctx, Void resultOfOperation) {
+                        // remove the topic subscription from subscription channels
+                        subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
+                                             channel);
+                        channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+                        unsubStats.updateLatency(System.currentTimeMillis() - requestTime);
+                    }
+                }, ctx);
             }
         }, null);
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Mon Oct 22 11:04:47 2012
@@ -56,6 +56,7 @@ import org.apache.hedwig.server.common.S
 import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
 import org.apache.hedwig.server.delivery.DeliveryManager;
 import org.apache.hedwig.server.delivery.FIFODeliveryManager;
+import org.apache.hedwig.server.handlers.CloseSubscriptionHandler;
 import org.apache.hedwig.server.handlers.ConsumeHandler;
 import org.apache.hedwig.server.handlers.Handler;
 import org.apache.hedwig.server.handlers.NettyHandlerBean;
@@ -227,8 +228,11 @@ public class PubSubServer {
         handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf));
         handlers.put(OperationType.SUBSCRIBE,
                      new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr));
-        handlers.put(OperationType.UNSUBSCRIBE, new UnsubscribeHandler(tm, conf, sm, dm));
+        handlers.put(OperationType.UNSUBSCRIBE,
+                     new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr));
         handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf));
+        handlers.put(OperationType.CLOSESUBSCRIPTION,
+                     new CloseSubscriptionHandler(conf, tm, sm, dm, subChannelMgr));
         handlers = Collections.unmodifiableMap(handlers);
         return handlers;
     }

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Mon Oct 22 11:04:47 2012
@@ -38,6 +38,7 @@ import org.apache.hedwig.protocol.PubSub
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
 import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
 import org.apache.hedwig.protoextensions.MessageIdUtils;
 import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
 import org.apache.hedwig.server.common.ServerConfiguration;
@@ -276,7 +277,8 @@ public abstract class AbstractSubscripti
                                            + subId.toStringUtf8() + ") when losing topic");
                             }
                             if (null != dm) {
-                                dm.stopServingSubscriber(topic, subId);
+                                dm.stopServingSubscriber(topic, subId, SubscriptionEvent.TOPIC_MOVED,
+                                                         noopCallback, null);
                             }
                         }
                     }
@@ -578,6 +580,28 @@ public abstract class AbstractSubscripti
         queuer.pushAndMaybeRun(topic, new ConsumeOp(topic, subscriberId, consumeSeqId, callback, ctx));
     }
 
+    private class CloseSubscriptionOp extends TopicOpQueuer.AsynchronousOp<Void> {
+
+        public CloseSubscriptionOp(ByteString topic, ByteString subscriberId,
+                                   Callback<Void> callback, Object ctx) {
+            queuer.super(topic, callback, ctx);
+        }
+
+        @Override
+        public void run() {
+            // TODO: BOOKKEEPER-412: we might need to move the loaded subscription
+            //                       to reclaim memory
+            // But for now we do nothing
+            cb.operationFinished(ctx, null);
+        }
+    }
+
+    @Override
+    public void closeSubscription(ByteString topic, ByteString subscriberId,
+                                  Callback<Void> callback, Object ctx) {
+        queuer.pushAndMaybeRun(topic, new CloseSubscriptionOp(topic, subscriberId, callback, ctx));
+    }
+
     private class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
         ByteString subscriberId;
 

Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java Mon Oct 22 11:04:47 2012
@@ -64,6 +64,21 @@ public interface SubscriptionManager {
             Callback<Void> callback, Object ctx);
 
     /**
+     * Close a particular subscription
+     *
+     * @param topic
+     *          Topic Name
+     * @param subscriberId
+     *          Subscriber Id
+     * @param callback
+     *          Callback
+     * @param ctx
+     *          Callback context
+     */
+    public void closeSubscription(ByteString topic, ByteString subscriberId,
+                                  Callback<Void> callback, Object ctx);
+
+    /**
      * Delete a particular subscription
      *
      * @param topic