You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by iv...@apache.org on 2012/10/22 13:04:48 UTC
svn commit: r1400836 [1/2] - in /zookeeper/bookkeeper/trunk: ./
hedwig-client/src/main/java/org/apache/hedwig/client/handlers/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/
hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/s...
Author: ivank
Date: Mon Oct 22 11:04:47 2012
New Revision: 1400836
URL: http://svn.apache.org/viewvc?rev=1400836&view=rev
Log:
BOOKKEEPER-411: Add CloseSubscription Request for multiplexing support (sijie via ivank)
Added:
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/client/netty/TestCloseSubscription.java
Modified:
zookeeper/bookkeeper/trunk/CHANGES.txt
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/delivery/StubDeliveryManager.java
zookeeper/bookkeeper/trunk/hedwig-server/src/test/java/org/apache/hedwig/server/handlers/TestSubUnsubHandler.java
Modified: zookeeper/bookkeeper/trunk/CHANGES.txt
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/CHANGES.txt?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/CHANGES.txt (original)
+++ zookeeper/bookkeeper/trunk/CHANGES.txt Mon Oct 22 11:04:47 2012
@@ -188,6 +188,8 @@ Trunk (unreleased changes)
BOOKKEEPER-435: Create SubscriptionChannelManager to manage all subscription channel (sijie via ivank)
+ BOOKKEEPER-411: Add CloseSubscription Request for multiplexing support (sijie via ivank)
+
hedwig-client:
BOOKKEEPER-306: Change C++ client to use gtest for testing (ivank via sijie)
Added: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java?rev=1400836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/handlers/CloseSubscriptionResponseHandler.java Mon Oct 22 11:04:47 2012
@@ -0,0 +1,84 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.client.handlers;
+
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.data.PubSubData;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.client.netty.HChannelManager;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
+import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
+
+public class CloseSubscriptionResponseHandler extends AbstractResponseHandler {
+
+ private static Logger logger =
+ LoggerFactory.getLogger(CloseSubscriptionResponseHandler.class);
+
+ public CloseSubscriptionResponseHandler(ClientConfiguration cfg,
+ HChannelManager channelManager) {
+ super(cfg, channelManager);
+ }
+
+ @Override
+ public void handleResponse(final PubSubResponse response, final PubSubData pubSubData,
+ final Channel channel)
+ throws Exception {
+ switch (response.getStatusCode()) {
+ case SUCCESS:
+ pubSubData.getCallback().operationFinished(pubSubData.context, null);
+ break;
+ case CLIENT_NOT_SUBSCRIBED:
+ // For closesubscription requests, the server says that the client was
+ // never subscribed to the topic.
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ClientNotSubscribedException(
+ "Client was never subscribed to topic: " +
+ pubSubData.topic.toStringUtf8() + ", subscriberId: " +
+ pubSubData.subscriberId.toStringUtf8()));
+ break;
+ case SERVICE_DOWN:
+ // Response was service down failure so just invoke the callback's
+ // operationFailed method.
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a SERVICE_DOWN status"));
+ break;
+ case NOT_RESPONSIBLE_FOR_TOPIC:
+ // Redirect response so we'll need to repost the original
+ // Unsubscribe Request
+ handleRedirectResponse(response, pubSubData, channel);
+ break;
+ default:
+ // Consider all other status codes as errors, operation failed
+ // cases.
+ logger.error("Unexpected error response from server for PubSubResponse: " + response);
+ pubSubData.getCallback().operationFailed(pubSubData.context, new ServiceDownException(
+ "Server responded with a status code of: " +
+ response.getStatusCode()));
+ break;
+ }
+ }
+
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/NetUtils.java Mon Oct 22 11:04:47 2012
@@ -23,6 +23,7 @@ import org.jboss.netty.channel.Channel;
import org.apache.hedwig.client.data.PubSubData;
import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
@@ -90,6 +91,11 @@ public class NetUtils {
// Set the UnsubscribeRequest into the outer PubSubRequest
pubsubRequestBuilder.setUnsubscribeRequest(buildUnsubscribeRequest(pubSubData));
break;
+ case CLOSESUBSCRIPTION:
+ // Set the CloseSubscriptionRequest into the outer PubSubRequest
+ pubsubRequestBuilder.setCloseSubscriptionRequest(
+ buildCloseSubscriptionRequest(pubSubData));
+ break;
}
// Update the PubSubData with the txnId and the requestWriteTime
@@ -133,6 +139,16 @@ public class NetUtils {
return unsubscribeRequestBuilder;
}
+ // build closesubscription request
+ private static CloseSubscriptionRequest.Builder
+ buildCloseSubscriptionRequest(PubSubData pubSubData) {
+ // Create the CloseSubscriptionRequest
+ CloseSubscriptionRequest.Builder closeSubscriptionRequestBuilder =
+ CloseSubscriptionRequest.newBuilder();
+ closeSubscriptionRequestBuilder.setSubscriberId(pubSubData.subscriberId);
+ return closeSubscriptionRequestBuilder;
+ }
+
/**
* Build consume request
*
Modified: zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-client/src/main/java/org/apache/hedwig/client/netty/impl/simple/SimpleSubscriptionChannelPipelineFactory.java Mon Oct 22 11:04:47 2012
@@ -22,6 +22,7 @@ import java.util.Map;
import org.apache.hedwig.client.conf.ClientConfiguration;
import org.apache.hedwig.client.handlers.AbstractResponseHandler;
+import org.apache.hedwig.client.handlers.CloseSubscriptionResponseHandler;
import org.apache.hedwig.client.netty.impl.AbstractHChannelManager;
import org.apache.hedwig.client.netty.impl.ClientChannelPipelineFactory;
import org.apache.hedwig.client.netty.impl.HChannelHandler;
@@ -40,6 +41,8 @@ public class SimpleSubscriptionChannelPi
new HashMap<OperationType, AbstractResponseHandler>();
handlers.put(OperationType.SUBSCRIBE,
new SimpleSubscribeResponseHandler(cfg, channelManager));
+ handlers.put(OperationType.CLOSESUBSCRIPTION,
+ new CloseSubscriptionResponseHandler(cfg, channelManager));
return handlers;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/java/org/apache/hedwig/protocol/PubSubProtocol.java Mon Oct 22 11:04:47 2012
@@ -82,6 +82,7 @@ public final class PubSubProtocol {
UNSUBSCRIBE(3, 3),
START_DELIVERY(4, 4),
STOP_DELIVERY(5, 5),
+ CLOSESUBSCRIPTION(6, 6),
;
public static final int PUBLISH_VALUE = 0;
@@ -90,6 +91,7 @@ public final class PubSubProtocol {
public static final int UNSUBSCRIBE_VALUE = 3;
public static final int START_DELIVERY_VALUE = 4;
public static final int STOP_DELIVERY_VALUE = 5;
+ public static final int CLOSESUBSCRIPTION_VALUE = 6;
public final int getNumber() { return value; }
@@ -102,6 +104,7 @@ public final class PubSubProtocol {
case 3: return UNSUBSCRIBE;
case 4: return START_DELIVERY;
case 5: return STOP_DELIVERY;
+ case 6: return CLOSESUBSCRIPTION;
default: return null;
}
}
@@ -132,7 +135,7 @@ public final class PubSubProtocol {
}
private static final OperationType[] VALUES = {
- PUBLISH, SUBSCRIBE, CONSUME, UNSUBSCRIBE, START_DELIVERY, STOP_DELIVERY,
+ PUBLISH, SUBSCRIBE, CONSUME, UNSUBSCRIBE, START_DELIVERY, STOP_DELIVERY, CLOSESUBSCRIPTION,
};
public static OperationType valueOf(
@@ -3652,6 +3655,11 @@ public final class PubSubProtocol {
boolean hasStartDeliveryRequest();
org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest getStartDeliveryRequest();
org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequestOrBuilder getStartDeliveryRequestOrBuilder();
+
+ // optional .Hedwig.CloseSubscriptionRequest closeSubscriptionRequest = 58;
+ boolean hasCloseSubscriptionRequest();
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getCloseSubscriptionRequest();
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder getCloseSubscriptionRequestOrBuilder();
}
public static final class PubSubRequest extends
com.google.protobuf.GeneratedMessage
@@ -3824,6 +3832,19 @@ public final class PubSubProtocol {
return startDeliveryRequest_;
}
+ // optional .Hedwig.CloseSubscriptionRequest closeSubscriptionRequest = 58;
+ public static final int CLOSESUBSCRIPTIONREQUEST_FIELD_NUMBER = 58;
+ private org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest closeSubscriptionRequest_;
+ public boolean hasCloseSubscriptionRequest() {
+ return ((bitField0_ & 0x00000800) == 0x00000800);
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getCloseSubscriptionRequest() {
+ return closeSubscriptionRequest_;
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder getCloseSubscriptionRequestOrBuilder() {
+ return closeSubscriptionRequest_;
+ }
+
private void initFields() {
protocolVersion_ = org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion.VERSION_ONE;
type_ = org.apache.hedwig.protocol.PubSubProtocol.OperationType.PUBLISH;
@@ -3837,6 +3858,7 @@ public final class PubSubProtocol {
unsubscribeRequest_ = org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest.getDefaultInstance();
stopDeliveryRequest_ = org.apache.hedwig.protocol.PubSubProtocol.StopDeliveryRequest.getDefaultInstance();
startDeliveryRequest_ = org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest.getDefaultInstance();
+ closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
}
private byte memoizedIsInitialized = -1;
public final boolean isInitialized() {
@@ -3895,6 +3917,12 @@ public final class PubSubProtocol {
return false;
}
}
+ if (hasCloseSubscriptionRequest()) {
+ if (!getCloseSubscriptionRequest().isInitialized()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ }
memoizedIsInitialized = 1;
return true;
}
@@ -3938,6 +3966,9 @@ public final class PubSubProtocol {
if (((bitField0_ & 0x00000400) == 0x00000400)) {
output.writeMessage(57, startDeliveryRequest_);
}
+ if (((bitField0_ & 0x00000800) == 0x00000800)) {
+ output.writeMessage(58, closeSubscriptionRequest_);
+ }
getUnknownFields().writeTo(output);
}
@@ -4000,6 +4031,10 @@ public final class PubSubProtocol {
size += com.google.protobuf.CodedOutputStream
.computeMessageSize(57, startDeliveryRequest_);
}
+ if (((bitField0_ & 0x00000800) == 0x00000800)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeMessageSize(58, closeSubscriptionRequest_);
+ }
size += getUnknownFields().getSerializedSize();
memoizedSerializedSize = size;
return size;
@@ -4122,6 +4157,7 @@ public final class PubSubProtocol {
getUnsubscribeRequestFieldBuilder();
getStopDeliveryRequestFieldBuilder();
getStartDeliveryRequestFieldBuilder();
+ getCloseSubscriptionRequestFieldBuilder();
}
}
private static Builder create() {
@@ -4178,6 +4214,12 @@ public final class PubSubProtocol {
startDeliveryRequestBuilder_.clear();
}
bitField0_ = (bitField0_ & ~0x00000800);
+ if (closeSubscriptionRequestBuilder_ == null) {
+ closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+ } else {
+ closeSubscriptionRequestBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00001000);
return this;
}
@@ -4289,6 +4331,14 @@ public final class PubSubProtocol {
} else {
result.startDeliveryRequest_ = startDeliveryRequestBuilder_.build();
}
+ if (((from_bitField0_ & 0x00001000) == 0x00001000)) {
+ to_bitField0_ |= 0x00000800;
+ }
+ if (closeSubscriptionRequestBuilder_ == null) {
+ result.closeSubscriptionRequest_ = closeSubscriptionRequest_;
+ } else {
+ result.closeSubscriptionRequest_ = closeSubscriptionRequestBuilder_.build();
+ }
result.bitField0_ = to_bitField0_;
onBuilt();
return result;
@@ -4348,6 +4398,9 @@ public final class PubSubProtocol {
if (other.hasStartDeliveryRequest()) {
mergeStartDeliveryRequest(other.getStartDeliveryRequest());
}
+ if (other.hasCloseSubscriptionRequest()) {
+ mergeCloseSubscriptionRequest(other.getCloseSubscriptionRequest());
+ }
this.mergeUnknownFields(other.getUnknownFields());
return this;
}
@@ -4405,6 +4458,12 @@ public final class PubSubProtocol {
return false;
}
}
+ if (hasCloseSubscriptionRequest()) {
+ if (!getCloseSubscriptionRequest().isInitialized()) {
+
+ return false;
+ }
+ }
return true;
}
@@ -4527,6 +4586,15 @@ public final class PubSubProtocol {
setStartDeliveryRequest(subBuilder.buildPartial());
break;
}
+ case 466: {
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder subBuilder = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.newBuilder();
+ if (hasCloseSubscriptionRequest()) {
+ subBuilder.mergeFrom(getCloseSubscriptionRequest());
+ }
+ input.readMessage(subBuilder, extensionRegistry);
+ setCloseSubscriptionRequest(subBuilder.buildPartial());
+ break;
+ }
}
}
}
@@ -5238,6 +5306,96 @@ public final class PubSubProtocol {
return startDeliveryRequestBuilder_;
}
+ // optional .Hedwig.CloseSubscriptionRequest closeSubscriptionRequest = 58;
+ private org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder> closeSubscriptionRequestBuilder_;
+ public boolean hasCloseSubscriptionRequest() {
+ return ((bitField0_ & 0x00001000) == 0x00001000);
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getCloseSubscriptionRequest() {
+ if (closeSubscriptionRequestBuilder_ == null) {
+ return closeSubscriptionRequest_;
+ } else {
+ return closeSubscriptionRequestBuilder_.getMessage();
+ }
+ }
+ public Builder setCloseSubscriptionRequest(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest value) {
+ if (closeSubscriptionRequestBuilder_ == null) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ closeSubscriptionRequest_ = value;
+ onChanged();
+ } else {
+ closeSubscriptionRequestBuilder_.setMessage(value);
+ }
+ bitField0_ |= 0x00001000;
+ return this;
+ }
+ public Builder setCloseSubscriptionRequest(
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder builderForValue) {
+ if (closeSubscriptionRequestBuilder_ == null) {
+ closeSubscriptionRequest_ = builderForValue.build();
+ onChanged();
+ } else {
+ closeSubscriptionRequestBuilder_.setMessage(builderForValue.build());
+ }
+ bitField0_ |= 0x00001000;
+ return this;
+ }
+ public Builder mergeCloseSubscriptionRequest(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest value) {
+ if (closeSubscriptionRequestBuilder_ == null) {
+ if (((bitField0_ & 0x00001000) == 0x00001000) &&
+ closeSubscriptionRequest_ != org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance()) {
+ closeSubscriptionRequest_ =
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.newBuilder(closeSubscriptionRequest_).mergeFrom(value).buildPartial();
+ } else {
+ closeSubscriptionRequest_ = value;
+ }
+ onChanged();
+ } else {
+ closeSubscriptionRequestBuilder_.mergeFrom(value);
+ }
+ bitField0_ |= 0x00001000;
+ return this;
+ }
+ public Builder clearCloseSubscriptionRequest() {
+ if (closeSubscriptionRequestBuilder_ == null) {
+ closeSubscriptionRequest_ = org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+ onChanged();
+ } else {
+ closeSubscriptionRequestBuilder_.clear();
+ }
+ bitField0_ = (bitField0_ & ~0x00001000);
+ return this;
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder getCloseSubscriptionRequestBuilder() {
+ bitField0_ |= 0x00001000;
+ onChanged();
+ return getCloseSubscriptionRequestFieldBuilder().getBuilder();
+ }
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder getCloseSubscriptionRequestOrBuilder() {
+ if (closeSubscriptionRequestBuilder_ != null) {
+ return closeSubscriptionRequestBuilder_.getMessageOrBuilder();
+ } else {
+ return closeSubscriptionRequest_;
+ }
+ }
+ private com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder>
+ getCloseSubscriptionRequestFieldBuilder() {
+ if (closeSubscriptionRequestBuilder_ == null) {
+ closeSubscriptionRequestBuilder_ = new com.google.protobuf.SingleFieldBuilder<
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder, org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder>(
+ closeSubscriptionRequest_,
+ getParentForChildren(),
+ isClean());
+ closeSubscriptionRequest_ = null;
+ }
+ return closeSubscriptionRequestBuilder_;
+ }
+
// @@protoc_insertion_point(builder_scope:Hedwig.PubSubRequest)
}
@@ -9489,6 +9647,355 @@ public final class PubSubProtocol {
// @@protoc_insertion_point(class_scope:Hedwig.StartDeliveryRequest)
}
+ public interface CloseSubscriptionRequestOrBuilder
+ extends com.google.protobuf.MessageOrBuilder {
+
+ // required bytes subscriberId = 2;
+ boolean hasSubscriberId();
+ com.google.protobuf.ByteString getSubscriberId();
+ }
+ public static final class CloseSubscriptionRequest extends
+ com.google.protobuf.GeneratedMessage
+ implements CloseSubscriptionRequestOrBuilder {
+ // Use CloseSubscriptionRequest.newBuilder() to construct.
+ private CloseSubscriptionRequest(Builder builder) {
+ super(builder);
+ }
+ private CloseSubscriptionRequest(boolean noInit) {}
+
+ private static final CloseSubscriptionRequest defaultInstance;
+ public static CloseSubscriptionRequest getDefaultInstance() {
+ return defaultInstance;
+ }
+
+ public CloseSubscriptionRequest getDefaultInstanceForType() {
+ return defaultInstance;
+ }
+
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable;
+ }
+
+ private int bitField0_;
+ // required bytes subscriberId = 2;
+ public static final int SUBSCRIBERID_FIELD_NUMBER = 2;
+ private com.google.protobuf.ByteString subscriberId_;
+ public boolean hasSubscriberId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public com.google.protobuf.ByteString getSubscriberId() {
+ return subscriberId_;
+ }
+
+ private void initFields() {
+ subscriberId_ = com.google.protobuf.ByteString.EMPTY;
+ }
+ private byte memoizedIsInitialized = -1;
+ public final boolean isInitialized() {
+ byte isInitialized = memoizedIsInitialized;
+ if (isInitialized != -1) return isInitialized == 1;
+
+ if (!hasSubscriberId()) {
+ memoizedIsInitialized = 0;
+ return false;
+ }
+ memoizedIsInitialized = 1;
+ return true;
+ }
+
+ public void writeTo(com.google.protobuf.CodedOutputStream output)
+ throws java.io.IOException {
+ getSerializedSize();
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ output.writeBytes(2, subscriberId_);
+ }
+ getUnknownFields().writeTo(output);
+ }
+
+ private int memoizedSerializedSize = -1;
+ public int getSerializedSize() {
+ int size = memoizedSerializedSize;
+ if (size != -1) return size;
+
+ size = 0;
+ if (((bitField0_ & 0x00000001) == 0x00000001)) {
+ size += com.google.protobuf.CodedOutputStream
+ .computeBytesSize(2, subscriberId_);
+ }
+ size += getUnknownFields().getSerializedSize();
+ memoizedSerializedSize = size;
+ return size;
+ }
+
+ private static final long serialVersionUID = 0L;
+ @java.lang.Override
+ protected java.lang.Object writeReplace()
+ throws java.io.ObjectStreamException {
+ return super.writeReplace();
+ }
+
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+ com.google.protobuf.ByteString data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+ com.google.protobuf.ByteString data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(byte[] data)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+ byte[] data,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ return newBuilder().mergeFrom(data, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseDelimitedFrom(java.io.InputStream input)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseDelimitedFrom(
+ java.io.InputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ Builder builder = newBuilder();
+ if (builder.mergeDelimitedFrom(input, extensionRegistry)) {
+ return builder.buildParsed();
+ } else {
+ return null;
+ }
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+ com.google.protobuf.CodedInputStream input)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input).buildParsed();
+ }
+ public static org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest parseFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ return newBuilder().mergeFrom(input, extensionRegistry)
+ .buildParsed();
+ }
+
+ public static Builder newBuilder() { return Builder.create(); }
+ public Builder newBuilderForType() { return newBuilder(); }
+ public static Builder newBuilder(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest prototype) {
+ return newBuilder().mergeFrom(prototype);
+ }
+ public Builder toBuilder() { return newBuilder(this); }
+
+ @java.lang.Override
+ protected Builder newBuilderForType(
+ com.google.protobuf.GeneratedMessage.BuilderParent parent) {
+ Builder builder = new Builder(parent);
+ return builder;
+ }
+ public static final class Builder extends
+ com.google.protobuf.GeneratedMessage.Builder<Builder>
+ implements org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequestOrBuilder {
+ public static final com.google.protobuf.Descriptors.Descriptor
+ getDescriptor() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_descriptor;
+ }
+
+ protected com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internalGetFieldAccessorTable() {
+ return org.apache.hedwig.protocol.PubSubProtocol.internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable;
+ }
+
+ // Construct using org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.newBuilder()
+ private Builder() {
+ maybeForceBuilderInitialization();
+ }
+
+ private Builder(BuilderParent parent) {
+ super(parent);
+ maybeForceBuilderInitialization();
+ }
+ private void maybeForceBuilderInitialization() {
+ if (com.google.protobuf.GeneratedMessage.alwaysUseFieldBuilders) {
+ }
+ }
+ private static Builder create() {
+ return new Builder();
+ }
+
+ public Builder clear() {
+ super.clear();
+ subscriberId_ = com.google.protobuf.ByteString.EMPTY;
+ bitField0_ = (bitField0_ & ~0x00000001);
+ return this;
+ }
+
+ public Builder clone() {
+ return create().mergeFrom(buildPartial());
+ }
+
+ public com.google.protobuf.Descriptors.Descriptor
+ getDescriptorForType() {
+ return org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDescriptor();
+ }
+
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest getDefaultInstanceForType() {
+ return org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance();
+ }
+
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest build() {
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(result);
+ }
+ return result;
+ }
+
+ private org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest buildParsed()
+ throws com.google.protobuf.InvalidProtocolBufferException {
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest result = buildPartial();
+ if (!result.isInitialized()) {
+ throw newUninitializedMessageException(
+ result).asInvalidProtocolBufferException();
+ }
+ return result;
+ }
+
+ public org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest buildPartial() {
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest result = new org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest(this);
+ int from_bitField0_ = bitField0_;
+ int to_bitField0_ = 0;
+ if (((from_bitField0_ & 0x00000001) == 0x00000001)) {
+ to_bitField0_ |= 0x00000001;
+ }
+ result.subscriberId_ = subscriberId_;
+ result.bitField0_ = to_bitField0_;
+ onBuilt();
+ return result;
+ }
+
+ public Builder mergeFrom(com.google.protobuf.Message other) {
+ if (other instanceof org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest) {
+ return mergeFrom((org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest)other);
+ } else {
+ super.mergeFrom(other);
+ return this;
+ }
+ }
+
+ public Builder mergeFrom(org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest other) {
+ if (other == org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.getDefaultInstance()) return this;
+ if (other.hasSubscriberId()) {
+ setSubscriberId(other.getSubscriberId());
+ }
+ this.mergeUnknownFields(other.getUnknownFields());
+ return this;
+ }
+
+ public final boolean isInitialized() {
+ if (!hasSubscriberId()) {
+
+ return false;
+ }
+ return true;
+ }
+
+ public Builder mergeFrom(
+ com.google.protobuf.CodedInputStream input,
+ com.google.protobuf.ExtensionRegistryLite extensionRegistry)
+ throws java.io.IOException {
+ com.google.protobuf.UnknownFieldSet.Builder unknownFields =
+ com.google.protobuf.UnknownFieldSet.newBuilder(
+ this.getUnknownFields());
+ while (true) {
+ int tag = input.readTag();
+ switch (tag) {
+ case 0:
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ default: {
+ if (!parseUnknownField(input, unknownFields,
+ extensionRegistry, tag)) {
+ this.setUnknownFields(unknownFields.build());
+ onChanged();
+ return this;
+ }
+ break;
+ }
+ case 18: {
+ bitField0_ |= 0x00000001;
+ subscriberId_ = input.readBytes();
+ break;
+ }
+ }
+ }
+ }
+
+ private int bitField0_;
+
+ // required bytes subscriberId = 2;
+ private com.google.protobuf.ByteString subscriberId_ = com.google.protobuf.ByteString.EMPTY;
+ public boolean hasSubscriberId() {
+ return ((bitField0_ & 0x00000001) == 0x00000001);
+ }
+ public com.google.protobuf.ByteString getSubscriberId() {
+ return subscriberId_;
+ }
+ public Builder setSubscriberId(com.google.protobuf.ByteString value) {
+ if (value == null) {
+ throw new NullPointerException();
+ }
+ bitField0_ |= 0x00000001;
+ subscriberId_ = value;
+ onChanged();
+ return this;
+ }
+ public Builder clearSubscriberId() {
+ bitField0_ = (bitField0_ & ~0x00000001);
+ subscriberId_ = getDefaultInstance().getSubscriberId();
+ onChanged();
+ return this;
+ }
+
+ // @@protoc_insertion_point(builder_scope:Hedwig.CloseSubscriptionRequest)
+ }
+
+ static {
+ defaultInstance = new CloseSubscriptionRequest(true);
+ defaultInstance.initFields();
+ }
+
+ // @@protoc_insertion_point(class_scope:Hedwig.CloseSubscriptionRequest)
+ }
+
public interface PubSubResponseOrBuilder
extends com.google.protobuf.MessageOrBuilder {
@@ -15408,6 +15915,11 @@ public final class PubSubProtocol {
com.google.protobuf.GeneratedMessage.FieldAccessorTable
internal_static_Hedwig_StartDeliveryRequest_fieldAccessorTable;
private static com.google.protobuf.Descriptors.Descriptor
+ internal_static_Hedwig_CloseSubscriptionRequest_descriptor;
+ private static
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable
+ internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable;
+ private static com.google.protobuf.Descriptors.Descriptor
internal_static_Hedwig_PubSubResponse_descriptor;
private static
com.google.protobuf.GeneratedMessage.FieldAccessorTable
@@ -15482,7 +15994,7 @@ public final class PubSubProtocol {
"4\n\023RegionSpecificSeqId\022\016\n\006region\030\001 \002(\014\022\r" +
"\n\005seqId\030\002 \002(\004\"]\n\014MessageSeqId\022\026\n\016localCo",
"mponent\030\001 \001(\004\0225\n\020remoteComponents\030\002 \003(\0132" +
- "\033.Hedwig.RegionSpecificSeqId\"\361\003\n\rPubSubR" +
+ "\033.Hedwig.RegionSpecificSeqId\"\265\004\n\rPubSubR" +
"equest\0220\n\017protocolVersion\030\001 \002(\0162\027.Hedwig" +
".ProtocolVersion\022#\n\004type\030\002 \002(\0162\025.Hedwig." +
"OperationType\022\024\n\014triedServers\030\003 \003(\014\022\r\n\005t" +
@@ -15495,75 +16007,79 @@ public final class PubSubProtocol {
"ibeRequest\0228\n\023stopDeliveryRequest\0308 \001(\0132" +
"\033.Hedwig.StopDeliveryRequest\022:\n\024startDel" +
"iveryRequest\0309 \001(\0132\034.Hedwig.StartDeliver" +
- "yRequest\".\n\016PublishRequest\022\034\n\003msg\030\002 \002(\0132" +
- "\017.Hedwig.Message\"\177\n\027SubscriptionPreferen" +
- "ces\022\034\n\007options\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014mes" +
- "sageBound\030\002 \001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022\031" +
- "\n\021messageWindowSize\030\004 \001(\r\"\277\002\n\020SubscribeR" +
- "equest\022\024\n\014subscriberId\030\002 \002(\014\022Q\n\016createOr",
- "Attach\030\003 \001(\0162\'.Hedwig.SubscribeRequest.C" +
- "reateOrAttach:\020CREATE_OR_ATTACH\022\032\n\013synch" +
- "ronous\030\004 \001(\010:\005false\022\024\n\014messageBound\030\005 \001(" +
- "\r\0224\n\013preferences\030\006 \001(\0132\037.Hedwig.Subscrip" +
- "tionPreferences\022\032\n\013forceAttach\030\007 \001(\010:\005fa" +
- "lse\">\n\016CreateOrAttach\022\n\n\006CREATE\020\000\022\n\n\006ATT" +
- "ACH\020\001\022\024\n\020CREATE_OR_ATTACH\020\002\"\216\002\n\023Subscrip" +
- "tionOptions\022\032\n\013forceAttach\030\001 \001(\010:\005false\022" +
- "Q\n\016createOrAttach\030\002 \001(\0162\'.Hedwig.Subscri" +
- "beRequest.CreateOrAttach:\020CREATE_OR_ATTA",
- "CH\022\027\n\014messageBound\030\003 \001(\r:\0010\022\034\n\007options\030\004" +
- " \001(\0132\013.Hedwig.Map\022\025\n\rmessageFilter\030\005 \001(\t" +
- "\022\031\n\021messageWindowSize\030\006 \001(\r\022\037\n\021enableRes" +
- "ubscribe\030\007 \001(\010:\004true\"K\n\016ConsumeRequest\022\024" +
- "\n\014subscriberId\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.He" +
- "dwig.MessageSeqId\"*\n\022UnsubscribeRequest\022" +
- "\024\n\014subscriberId\030\002 \002(\014\"+\n\023StopDeliveryReq" +
- "uest\022\024\n\014subscriberId\030\002 \002(\014\",\n\024StartDeliv" +
- "eryRequest\022\024\n\014subscriberId\030\002 \002(\014\"\377\001\n\016Pub" +
- "SubResponse\0220\n\017protocolVersion\030\001 \002(\0162\027.H",
- "edwig.ProtocolVersion\022&\n\nstatusCode\030\002 \002(" +
- "\0162\022.Hedwig.StatusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\t" +
- "statusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(\0132\017.Hedwi" +
- "g.Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014subscriberId" +
- "\030\007 \001(\014\022*\n\014responseBody\030\010 \001(\0132\024.Hedwig.Re" +
- "sponseBody\"?\n\017PublishResponse\022,\n\016publish" +
- "edMsgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\"I\n\021S" +
- "ubscribeResponse\0224\n\013preferences\030\002 \001(\0132\037." +
- "Hedwig.SubscriptionPreferences\"v\n\014Respon" +
- "seBody\0220\n\017publishResponse\030\001 \001(\0132\027.Hedwig",
- ".PublishResponse\0224\n\021subscribeResponse\030\002 " +
- "\001(\0132\031.Hedwig.SubscribeResponse\"N\n\021Subscr" +
- "iptionState\022#\n\005msgId\030\001 \002(\0132\024.Hedwig.Mess" +
- "ageSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscr" +
- "iptionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subsc" +
- "riptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedw" +
- "ig.SubscriptionPreferences\"O\n\013LedgerRang" +
- "e\022\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030" +
- "\002 \001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRan" +
- "ges\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange",
- "\":\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016" +
- "managerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010h" +
- "ostname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadD" +
- "ata\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersio" +
- "n\022\017\n\013VERSION_ONE\020\001*p\n\rOperationType\022\013\n\007P" +
- "UBLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013" +
- "UNSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTO" +
- "P_DELIVERY\020\005*D\n\021SubscriptionEvent\022\017\n\013TOP" +
- "IC_MOVED\020\001\022\036\n\032SUBSCRIPTION_FORCED_CLOSED" +
- "\020\002*\205\004\n\nStatusCode\022\013\n\007SUCCESS\020\000\022\026\n\021MALFOR",
- "MED_REQUEST\020\221\003\022\022\n\rNO_SUCH_TOPIC\020\222\003\022\036\n\031CL" +
- "IENT_ALREADY_SUBSCRIBED\020\223\003\022\032\n\025CLIENT_NOT" +
- "_SUBSCRIBED\020\224\003\022\026\n\021COULD_NOT_CONNECT\020\225\003\022\017" +
- "\n\nTOPIC_BUSY\020\226\003\022\036\n\031NOT_RESPONSIBLE_FOR_T" +
- "OPIC\020\365\003\022\021\n\014SERVICE_DOWN\020\366\003\022\024\n\017UNCERTAIN_" +
- "STATE\020\367\003\022\033\n\026INVALID_MESSAGE_FILTER\020\370\003\022\020\n" +
- "\013BAD_VERSION\020\210\004\022\036\n\031NO_TOPIC_PERSISTENCE_" +
- "INFO\020\211\004\022\"\n\035TOPIC_PERSISTENCE_INFO_EXISTS" +
- "\020\212\004\022\032\n\025NO_SUBSCRIPTION_STATE\020\213\004\022\036\n\031SUBSC" +
- "RIPTION_STATE_EXISTS\020\214\004\022\030\n\023NO_TOPIC_OWNE",
- "R_INFO\020\215\004\022\034\n\027TOPIC_OWNER_INFO_EXISTS\020\216\004\022" +
- "\031\n\024UNEXPECTED_CONDITION\020\330\004\022\016\n\tCOMPOSITE\020" +
- "\274\005B\036\n\032org.apache.hedwig.protocolH\001"
+ "yRequest\022B\n\030closeSubscriptionRequest\030: \001" +
+ "(\0132 .Hedwig.CloseSubscriptionRequest\".\n\016" +
+ "PublishRequest\022\034\n\003msg\030\002 \002(\0132\017.Hedwig.Mes" +
+ "sage\"\177\n\027SubscriptionPreferences\022\034\n\007optio" +
+ "ns\030\001 \001(\0132\013.Hedwig.Map\022\024\n\014messageBound\030\002 " +
+ "\001(\r\022\025\n\rmessageFilter\030\003 \001(\t\022\031\n\021messageWin",
+ "dowSize\030\004 \001(\r\"\277\002\n\020SubscribeRequest\022\024\n\014su" +
+ "bscriberId\030\002 \002(\014\022Q\n\016createOrAttach\030\003 \001(\016" +
+ "2\'.Hedwig.SubscribeRequest.CreateOrAttac" +
+ "h:\020CREATE_OR_ATTACH\022\032\n\013synchronous\030\004 \001(\010" +
+ ":\005false\022\024\n\014messageBound\030\005 \001(\r\0224\n\013prefere" +
+ "nces\030\006 \001(\0132\037.Hedwig.SubscriptionPreferen" +
+ "ces\022\032\n\013forceAttach\030\007 \001(\010:\005false\">\n\016Creat" +
+ "eOrAttach\022\n\n\006CREATE\020\000\022\n\n\006ATTACH\020\001\022\024\n\020CRE" +
+ "ATE_OR_ATTACH\020\002\"\216\002\n\023SubscriptionOptions\022" +
+ "\032\n\013forceAttach\030\001 \001(\010:\005false\022Q\n\016createOrA",
+ "ttach\030\002 \001(\0162\'.Hedwig.SubscribeRequest.Cr" +
+ "eateOrAttach:\020CREATE_OR_ATTACH\022\027\n\014messag" +
+ "eBound\030\003 \001(\r:\0010\022\034\n\007options\030\004 \001(\0132\013.Hedwi" +
+ "g.Map\022\025\n\rmessageFilter\030\005 \001(\t\022\031\n\021messageW" +
+ "indowSize\030\006 \001(\r\022\037\n\021enableResubscribe\030\007 \001" +
+ "(\010:\004true\"K\n\016ConsumeRequest\022\024\n\014subscriber" +
+ "Id\030\002 \002(\014\022#\n\005msgId\030\003 \002(\0132\024.Hedwig.Message" +
+ "SeqId\"*\n\022UnsubscribeRequest\022\024\n\014subscribe" +
+ "rId\030\002 \002(\014\"+\n\023StopDeliveryRequest\022\024\n\014subs" +
+ "criberId\030\002 \002(\014\",\n\024StartDeliveryRequest\022\024",
+ "\n\014subscriberId\030\002 \002(\014\"0\n\030CloseSubscriptio" +
+ "nRequest\022\024\n\014subscriberId\030\002 \002(\014\"\377\001\n\016PubSu" +
+ "bResponse\0220\n\017protocolVersion\030\001 \002(\0162\027.Hed" +
+ "wig.ProtocolVersion\022&\n\nstatusCode\030\002 \002(\0162" +
+ "\022.Hedwig.StatusCode\022\r\n\005txnId\030\003 \002(\004\022\021\n\tst" +
+ "atusMsg\030\004 \001(\t\022 \n\007message\030\005 \001(\0132\017.Hedwig." +
+ "Message\022\r\n\005topic\030\006 \001(\014\022\024\n\014subscriberId\030\007" +
+ " \001(\014\022*\n\014responseBody\030\010 \001(\0132\024.Hedwig.Resp" +
+ "onseBody\"?\n\017PublishResponse\022,\n\016published" +
+ "MsgId\030\001 \002(\0132\024.Hedwig.MessageSeqId\"I\n\021Sub",
+ "scribeResponse\0224\n\013preferences\030\002 \001(\0132\037.He" +
+ "dwig.SubscriptionPreferences\"v\n\014Response" +
+ "Body\0220\n\017publishResponse\030\001 \001(\0132\027.Hedwig.P" +
+ "ublishResponse\0224\n\021subscribeResponse\030\002 \001(" +
+ "\0132\031.Hedwig.SubscribeResponse\"N\n\021Subscrip" +
+ "tionState\022#\n\005msgId\030\001 \002(\0132\024.Hedwig.Messag" +
+ "eSeqId\022\024\n\014messageBound\030\002 \001(\r\"r\n\020Subscrip" +
+ "tionData\022(\n\005state\030\001 \001(\0132\031.Hedwig.Subscri" +
+ "ptionState\0224\n\013preferences\030\002 \001(\0132\037.Hedwig" +
+ ".SubscriptionPreferences\"O\n\013LedgerRange\022",
+ "\020\n\010ledgerId\030\001 \002(\004\022.\n\020endSeqIdIncluded\030\002 " +
+ "\001(\0132\024.Hedwig.MessageSeqId\"3\n\014LedgerRange" +
+ "s\022#\n\006ranges\030\001 \003(\0132\023.Hedwig.LedgerRange\":" +
+ "\n\013ManagerMeta\022\023\n\013managerImpl\030\002 \002(\t\022\026\n\016ma" +
+ "nagerVersion\030\003 \002(\r\".\n\013HubInfoData\022\020\n\010hos" +
+ "tname\030\002 \002(\t\022\r\n\005czxid\030\003 \002(\004\" \n\013HubLoadDat" +
+ "a\022\021\n\tnumTopics\030\002 \002(\004*\"\n\017ProtocolVersion\022" +
+ "\017\n\013VERSION_ONE\020\001*\207\001\n\rOperationType\022\013\n\007PU" +
+ "BLISH\020\000\022\r\n\tSUBSCRIBE\020\001\022\013\n\007CONSUME\020\002\022\017\n\013U" +
+ "NSUBSCRIBE\020\003\022\022\n\016START_DELIVERY\020\004\022\021\n\rSTOP",
+ "_DELIVERY\020\005\022\025\n\021CLOSESUBSCRIPTION\020\006*D\n\021Su" +
+ "bscriptionEvent\022\017\n\013TOPIC_MOVED\020\001\022\036\n\032SUBS" +
+ "CRIPTION_FORCED_CLOSED\020\002*\205\004\n\nStatusCode\022" +
+ "\013\n\007SUCCESS\020\000\022\026\n\021MALFORMED_REQUEST\020\221\003\022\022\n\r" +
+ "NO_SUCH_TOPIC\020\222\003\022\036\n\031CLIENT_ALREADY_SUBSC" +
+ "RIBED\020\223\003\022\032\n\025CLIENT_NOT_SUBSCRIBED\020\224\003\022\026\n\021" +
+ "COULD_NOT_CONNECT\020\225\003\022\017\n\nTOPIC_BUSY\020\226\003\022\036\n" +
+ "\031NOT_RESPONSIBLE_FOR_TOPIC\020\365\003\022\021\n\014SERVICE" +
+ "_DOWN\020\366\003\022\024\n\017UNCERTAIN_STATE\020\367\003\022\033\n\026INVALI" +
+ "D_MESSAGE_FILTER\020\370\003\022\020\n\013BAD_VERSION\020\210\004\022\036\n",
+ "\031NO_TOPIC_PERSISTENCE_INFO\020\211\004\022\"\n\035TOPIC_P" +
+ "ERSISTENCE_INFO_EXISTS\020\212\004\022\032\n\025NO_SUBSCRIP" +
+ "TION_STATE\020\213\004\022\036\n\031SUBSCRIPTION_STATE_EXIS" +
+ "TS\020\214\004\022\030\n\023NO_TOPIC_OWNER_INFO\020\215\004\022\034\n\027TOPIC" +
+ "_OWNER_INFO_EXISTS\020\216\004\022\031\n\024UNEXPECTED_COND" +
+ "ITION\020\330\004\022\016\n\tCOMPOSITE\020\274\005B\036\n\032org.apache.h" +
+ "edwig.protocolH\001"
};
com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner assigner =
new com.google.protobuf.Descriptors.FileDescriptor.InternalDescriptorAssigner() {
@@ -15623,7 +16139,7 @@ public final class PubSubProtocol {
internal_static_Hedwig_PubSubRequest_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_PubSubRequest_descriptor,
- new java.lang.String[] { "ProtocolVersion", "Type", "TriedServers", "TxnId", "ShouldClaim", "Topic", "PublishRequest", "SubscribeRequest", "ConsumeRequest", "UnsubscribeRequest", "StopDeliveryRequest", "StartDeliveryRequest", },
+ new java.lang.String[] { "ProtocolVersion", "Type", "TriedServers", "TxnId", "ShouldClaim", "Topic", "PublishRequest", "SubscribeRequest", "ConsumeRequest", "UnsubscribeRequest", "StopDeliveryRequest", "StartDeliveryRequest", "CloseSubscriptionRequest", },
org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest.class,
org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest.Builder.class);
internal_static_Hedwig_PublishRequest_descriptor =
@@ -15690,8 +16206,16 @@ public final class PubSubProtocol {
new java.lang.String[] { "SubscriberId", },
org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest.class,
org.apache.hedwig.protocol.PubSubProtocol.StartDeliveryRequest.Builder.class);
- internal_static_Hedwig_PubSubResponse_descriptor =
+ internal_static_Hedwig_CloseSubscriptionRequest_descriptor =
getDescriptor().getMessageTypes().get(14);
+ internal_static_Hedwig_CloseSubscriptionRequest_fieldAccessorTable = new
+ com.google.protobuf.GeneratedMessage.FieldAccessorTable(
+ internal_static_Hedwig_CloseSubscriptionRequest_descriptor,
+ new java.lang.String[] { "SubscriberId", },
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.class,
+ org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest.Builder.class);
+ internal_static_Hedwig_PubSubResponse_descriptor =
+ getDescriptor().getMessageTypes().get(15);
internal_static_Hedwig_PubSubResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_PubSubResponse_descriptor,
@@ -15699,7 +16223,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse.class,
org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse.Builder.class);
internal_static_Hedwig_PublishResponse_descriptor =
- getDescriptor().getMessageTypes().get(15);
+ getDescriptor().getMessageTypes().get(16);
internal_static_Hedwig_PublishResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_PublishResponse_descriptor,
@@ -15707,7 +16231,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.class,
org.apache.hedwig.protocol.PubSubProtocol.PublishResponse.Builder.class);
internal_static_Hedwig_SubscribeResponse_descriptor =
- getDescriptor().getMessageTypes().get(16);
+ getDescriptor().getMessageTypes().get(17);
internal_static_Hedwig_SubscribeResponse_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscribeResponse_descriptor,
@@ -15715,7 +16239,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse.Builder.class);
internal_static_Hedwig_ResponseBody_descriptor =
- getDescriptor().getMessageTypes().get(17);
+ getDescriptor().getMessageTypes().get(18);
internal_static_Hedwig_ResponseBody_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_ResponseBody_descriptor,
@@ -15723,7 +16247,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.class,
org.apache.hedwig.protocol.PubSubProtocol.ResponseBody.Builder.class);
internal_static_Hedwig_SubscriptionState_descriptor =
- getDescriptor().getMessageTypes().get(18);
+ getDescriptor().getMessageTypes().get(19);
internal_static_Hedwig_SubscriptionState_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionState_descriptor,
@@ -15731,7 +16255,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState.Builder.class);
internal_static_Hedwig_SubscriptionData_descriptor =
- getDescriptor().getMessageTypes().get(19);
+ getDescriptor().getMessageTypes().get(20);
internal_static_Hedwig_SubscriptionData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_SubscriptionData_descriptor,
@@ -15739,7 +16263,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.class,
org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData.Builder.class);
internal_static_Hedwig_LedgerRange_descriptor =
- getDescriptor().getMessageTypes().get(20);
+ getDescriptor().getMessageTypes().get(21);
internal_static_Hedwig_LedgerRange_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_LedgerRange_descriptor,
@@ -15747,7 +16271,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.class,
org.apache.hedwig.protocol.PubSubProtocol.LedgerRange.Builder.class);
internal_static_Hedwig_LedgerRanges_descriptor =
- getDescriptor().getMessageTypes().get(21);
+ getDescriptor().getMessageTypes().get(22);
internal_static_Hedwig_LedgerRanges_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_LedgerRanges_descriptor,
@@ -15755,7 +16279,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.class,
org.apache.hedwig.protocol.PubSubProtocol.LedgerRanges.Builder.class);
internal_static_Hedwig_ManagerMeta_descriptor =
- getDescriptor().getMessageTypes().get(22);
+ getDescriptor().getMessageTypes().get(23);
internal_static_Hedwig_ManagerMeta_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_ManagerMeta_descriptor,
@@ -15763,7 +16287,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.class,
org.apache.hedwig.protocol.PubSubProtocol.ManagerMeta.Builder.class);
internal_static_Hedwig_HubInfoData_descriptor =
- getDescriptor().getMessageTypes().get(23);
+ getDescriptor().getMessageTypes().get(24);
internal_static_Hedwig_HubInfoData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_HubInfoData_descriptor,
@@ -15771,7 +16295,7 @@ public final class PubSubProtocol {
org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.class,
org.apache.hedwig.protocol.PubSubProtocol.HubInfoData.Builder.class);
internal_static_Hedwig_HubLoadData_descriptor =
- getDescriptor().getMessageTypes().get(24);
+ getDescriptor().getMessageTypes().get(25);
internal_static_Hedwig_HubLoadData_fieldAccessorTable = new
com.google.protobuf.GeneratedMessage.FieldAccessorTable(
internal_static_Hedwig_HubLoadData_descriptor,
Modified: zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto (original)
+++ zookeeper/bookkeeper/trunk/hedwig-protocol/src/main/protobuf/PubSubProtocol.proto Mon Oct 22 11:04:47 2012
@@ -70,6 +70,9 @@ enum OperationType{
//the following two are only used for the hedwig proxy
START_DELIVERY = 4;
STOP_DELIVERY = 5;
+ // end for requests only used for hedwig proxy
+
+ CLOSESUBSCRIPTION = 6;
}
/* A PubSubRequest is just a union of the various request types, with
@@ -95,6 +98,7 @@ message PubSubRequest{
optional UnsubscribeRequest unsubscribeRequest = 55;
optional StopDeliveryRequest stopDeliveryRequest = 56;
optional StartDeliveryRequest startDeliveryRequest = 57;
+ optional CloseSubscriptionRequest closeSubscriptionRequest = 58;
}
@@ -191,6 +195,10 @@ enum SubscriptionEvent {
SUBSCRIPTION_FORCED_CLOSED = 2;
}
+message CloseSubscriptionRequest {
+ required bytes subscriberId = 2;
+}
+
message PubSubResponse{
required ProtocolVersion protocolVersion = 1;
required StatusCode statusCode = 2;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/DeliveryManager.java Mon Oct 22 11:04:47 2012
@@ -19,8 +19,10 @@ package org.apache.hedwig.server.deliver
import com.google.protobuf.ByteString;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.filter.ServerMessageFilter;
+import org.apache.hedwig.util.Callback;
public interface DeliveryManager {
public void start();
@@ -31,7 +33,17 @@ public interface DeliveryManager {
DeliveryEndPoint endPoint,
ServerMessageFilter filter);
- public void stopServingSubscriber(ByteString topic, ByteString subscriberId);
+ /**
+ * Stop serving a given subscription.
+ *
+ * @param topic
+ * Topic Name
+ * @param subscriberId
+ * Subscriber Id
+ */
+ public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event,
+ Callback<Void> callback, Object ctx);
/**
* Tell the delivery manager where that a subscriber has consumed
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Mon Oct 22 11:04:47 2012
@@ -37,12 +37,14 @@ import com.google.protobuf.ByteString;
import org.apache.bookkeeper.util.MathUtils;
import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.filter.ServerMessageFilter;
import org.apache.hedwig.protocol.PubSubProtocol.Message;
import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.server.common.ServerConfiguration;
import org.apache.hedwig.server.common.UnexpectedError;
@@ -52,12 +54,22 @@ import org.apache.hedwig.server.persiste
import org.apache.hedwig.server.persistence.PersistenceManager;
import org.apache.hedwig.server.persistence.ScanCallback;
import org.apache.hedwig.server.persistence.ScanRequest;
+import org.apache.hedwig.util.Callback;
import static org.apache.hedwig.util.VarArgs.va;
public class FIFODeliveryManager implements Runnable, DeliveryManager {
protected static final Logger logger = LoggerFactory.getLogger(FIFODeliveryManager.class);
+ private static Callback<Void> NOP_CALLBACK = new Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void result) {
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ }
+ };
+
protected interface DeliveryManagerRequest {
public void performRequest();
}
@@ -160,11 +172,16 @@ public class FIFODeliveryManager impleme
enqueueWithoutFailure(subscriber);
}
- public void stopServingSubscriber(ByteString topic, ByteString subscriberId) {
- ActiveSubscriberState subState = subscriberStates.get(new TopicSubscriber(topic, subscriberId));
+ public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
+ SubscriptionEvent event,
+ Callback<Void> cb, Object ctx) {
+ ActiveSubscriberState subState =
+ subscriberStates.get(new TopicSubscriber(topic, subscriberId));
if (subState != null) {
- stopServingSubscriber(subState);
+ stopServingSubscriber(subState, event, cb, ctx);
+ } else {
+ cb.operationFinished(ctx, null);
}
}
@@ -172,10 +189,19 @@ public class FIFODeliveryManager impleme
* Due to some error or disconnection or unsusbcribe, someone asks us to
* stop serving a particular endpoint
*
- * @param endPoint
- */
- protected void stopServingSubscriber(ActiveSubscriberState subscriber) {
- enqueueWithoutFailure(new StopServingSubscriber(subscriber));
+ * @param subscriber
+ * Subscriber instance
+ * @param event
+ * Subscription event indicates why to stop subscriber.
+ * @param cb
+ * Callback after the subscriber is stopped.
+ * @param ctx
+ * Callback context
+ */
+ protected void stopServingSubscriber(ActiveSubscriberState subscriber,
+ SubscriptionEvent event,
+ Callback<Void> cb, Object ctx) {
+ enqueueWithoutFailure(new StopServingSubscriber(subscriber, event, cb, ctx));
}
/**
@@ -376,13 +402,19 @@ public class FIFODeliveryManager impleme
}
}
- public void setNotConnected() {
+ public void setNotConnected(SubscriptionEvent event) {
// have closed it.
if (!isConnected()) {
return;
}
this.connected = false;
- deliveryEndPoint.close();
+ if (null != event &&
+ (SubscriptionEvent.TOPIC_MOVED == event ||
+ SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED == event)) {
+ // for we need to close the underlying when topic moved
+ // or subscription forced closed.
+ deliveryEndPoint.close();
+ }
// uninitialize filter
this.filter.uninitialize();
}
@@ -549,7 +581,11 @@ public class FIFODeliveryManager impleme
public void permanentErrorOnSend() {
- stopServingSubscriber(this);
+ // the underlying channel is broken, the channel will
+ // be closed in UmbrellaHandler when exception happened.
+ // so we don't need to close the channel again
+ stopServingSubscriber(this, null,
+ NOP_CALLBACK, null);
}
public void transientErrorOnSend() {
@@ -563,10 +599,12 @@ public class FIFODeliveryManager impleme
public void performRequest() {
// Put this subscriber in the channel to subscriber mapping
- ActiveSubscriberState prevSubscriber = subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
+ ActiveSubscriberState prevSubscriber =
+ subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
if (prevSubscriber != null) {
- stopServingSubscriber(prevSubscriber);
+ stopServingSubscriber(prevSubscriber, SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED,
+ NOP_CALLBACK, null);
}
lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
@@ -589,16 +627,24 @@ public class FIFODeliveryManager impleme
protected class StopServingSubscriber implements DeliveryManagerRequest {
ActiveSubscriberState subscriber;
-
- public StopServingSubscriber(ActiveSubscriberState subscriber) {
+ SubscriptionEvent event;
+ final Callback<Void> cb;
+ final Object ctx;
+
+ public StopServingSubscriber(ActiveSubscriberState subscriber,
+ SubscriptionEvent event,
+ Callback<Void> callback, Object ctx) {
this.subscriber = subscriber;
+ this.event = event;
+ this.cb = callback;
+ this.ctx = ctx;
}
@Override
public void performRequest() {
// This will automatically stop delivery, and disconnect the channel
- subscriber.setNotConnected();
+ subscriber.setNotConnected(event);
// if the subscriber has moved on, a move request for its delivery
// pointer must be pending in the request queue. Note that the
@@ -609,6 +655,7 @@ public class FIFODeliveryManager impleme
true,
// pruneTopic=
true);
+ cb.operationFinished(ctx, null);
}
}
Added: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java?rev=1400836&view=auto
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java (added)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java Mon Oct 22 11:04:47 2012
@@ -0,0 +1,105 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.ServerStats;
+import org.apache.hedwig.server.netty.ServerStats.OpStats;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class CloseSubscriptionHandler extends BaseHandler {
+ SubscriptionManager subMgr;
+ DeliveryManager deliveryMgr;
+ SubscriptionChannelManager subChannelMgr;
+ // op stats
+ final OpStats closesubStats;
+
+ public CloseSubscriptionHandler(ServerConfiguration cfg, TopicManager tm,
+ SubscriptionManager subMgr,
+ DeliveryManager deliveryMgr,
+ SubscriptionChannelManager subChannelMgr) {
+ super(tm, cfg);
+ this.subMgr = subMgr;
+ this.deliveryMgr = deliveryMgr;
+ this.subChannelMgr = subChannelMgr;
+ closesubStats = ServerStats.getInstance().getOpStats(OperationType.CLOSESUBSCRIPTION);
+ }
+
+ @Override
+ public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+ if (!request.hasCloseSubscriptionRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing closesubscription request data");
+ closesubStats.incrementFailedOps();
+ return;
+ }
+
+ final CloseSubscriptionRequest closesubRequest =
+ request.getCloseSubscriptionRequest();
+ final ByteString topic = request.getTopic();
+ final ByteString subscriberId = closesubRequest.getSubscriberId();
+
+ final long requestTime = System.currentTimeMillis();
+
+ subMgr.closeSubscription(topic, subscriberId, new Callback<Void>() {
+ @Override
+ public void operationFinished(Object ctx, Void result) {
+ // we should not close the channel in delivery manager
+ // since client waits the response for closeSubscription request
+ // client side would close the channel
+ deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
+ new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ closesubStats.incrementFailedOps();
+ }
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ // remove the topic subscription from subscription channels
+ subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
+ channel);
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+ closesubStats.updateLatency(System.currentTimeMillis() - requestTime);
+ }
+ }, null);
+ }
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ closesubStats.incrementFailedOps();
+ }
+ }, null);
+ }
+}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java Mon Oct 22 11:04:47 2012
@@ -21,9 +21,11 @@ import org.jboss.netty.channel.Channel;
import com.google.protobuf.ByteString;
import org.apache.bookkeeper.util.MathUtils;
+import org.apache.hedwig.client.data.TopicSubscriber;
import org.apache.hedwig.exceptions.PubSubException;
import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
import org.apache.hedwig.protoextensions.PubSubResponseUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
@@ -34,18 +36,24 @@ import org.apache.hedwig.server.netty.Um
import org.apache.hedwig.server.subscriptions.SubscriptionManager;
import org.apache.hedwig.server.topics.TopicManager;
import org.apache.hedwig.util.Callback;
+import static org.apache.hedwig.util.VarArgs.va;
public class UnsubscribeHandler extends BaseHandler {
SubscriptionManager subMgr;
DeliveryManager deliveryMgr;
+ SubscriptionChannelManager subChannelMgr;
// op stats
final OpStats unsubStats;
- public UnsubscribeHandler(TopicManager tm, ServerConfiguration cfg, SubscriptionManager subMgr,
- DeliveryManager deliveryMgr) {
+ public UnsubscribeHandler(ServerConfiguration cfg,
+ TopicManager tm,
+ SubscriptionManager subMgr,
+ DeliveryManager deliveryMgr,
+ SubscriptionChannelManager subChannelMgr) {
super(tm, cfg);
this.subMgr = subMgr;
this.deliveryMgr = deliveryMgr;
+ this.subChannelMgr = subChannelMgr;
unsubStats = ServerStats.getInstance().getOpStats(OperationType.UNSUBSCRIBE);
}
@@ -72,9 +80,25 @@ public class UnsubscribeHandler extends
@Override
public void operationFinished(Object ctx, Void resultOfOperation) {
- deliveryMgr.stopServingSubscriber(topic, subscriberId);
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
- unsubStats.updateLatency(MathUtils.now() - requestTime);
+ // we should not close the channel in delivery manager
+ // since client waits the response for closeSubscription request
+ // client side would close the channel
+ deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
+ new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ unsubStats.incrementFailedOps();
+ }
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ // remove the topic subscription from subscription channels
+ subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
+ channel);
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+ unsubStats.updateLatency(System.currentTimeMillis() - requestTime);
+ }
+ }, ctx);
}
}, null);
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Mon Oct 22 11:04:47 2012
@@ -56,6 +56,7 @@ import org.apache.hedwig.server.common.S
import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
import org.apache.hedwig.server.delivery.DeliveryManager;
import org.apache.hedwig.server.delivery.FIFODeliveryManager;
+import org.apache.hedwig.server.handlers.CloseSubscriptionHandler;
import org.apache.hedwig.server.handlers.ConsumeHandler;
import org.apache.hedwig.server.handlers.Handler;
import org.apache.hedwig.server.handlers.NettyHandlerBean;
@@ -227,8 +228,11 @@ public class PubSubServer {
handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf));
handlers.put(OperationType.SUBSCRIBE,
new SubscribeHandler(conf, tm, dm, pm, sm, subChannelMgr));
- handlers.put(OperationType.UNSUBSCRIBE, new UnsubscribeHandler(tm, conf, sm, dm));
+ handlers.put(OperationType.UNSUBSCRIBE,
+ new UnsubscribeHandler(conf, tm, sm, dm, subChannelMgr));
handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf));
+ handlers.put(OperationType.CLOSESUBSCRIPTION,
+ new CloseSubscriptionHandler(conf, tm, sm, dm, subChannelMgr));
handlers = Collections.unmodifiableMap(handlers);
return handlers;
}
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java Mon Oct 22 11:04:47 2012
@@ -38,6 +38,7 @@ import org.apache.hedwig.protocol.PubSub
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
import org.apache.hedwig.protoextensions.MessageIdUtils;
import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
import org.apache.hedwig.server.common.ServerConfiguration;
@@ -276,7 +277,8 @@ public abstract class AbstractSubscripti
+ subId.toStringUtf8() + ") when losing topic");
}
if (null != dm) {
- dm.stopServingSubscriber(topic, subId);
+ dm.stopServingSubscriber(topic, subId, SubscriptionEvent.TOPIC_MOVED,
+ noopCallback, null);
}
}
}
@@ -578,6 +580,28 @@ public abstract class AbstractSubscripti
queuer.pushAndMaybeRun(topic, new ConsumeOp(topic, subscriberId, consumeSeqId, callback, ctx));
}
+ private class CloseSubscriptionOp extends TopicOpQueuer.AsynchronousOp<Void> {
+
+ public CloseSubscriptionOp(ByteString topic, ByteString subscriberId,
+ Callback<Void> callback, Object ctx) {
+ queuer.super(topic, callback, ctx);
+ }
+
+ @Override
+ public void run() {
+ // TODO: BOOKKEEPER-412: we might need to move the loaded subscription
+ // to reclaim memory
+ // But for now we do nothing
+ cb.operationFinished(ctx, null);
+ }
+ }
+
+ @Override
+ public void closeSubscription(ByteString topic, ByteString subscriberId,
+ Callback<Void> callback, Object ctx) {
+ queuer.pushAndMaybeRun(topic, new CloseSubscriptionOp(topic, subscriberId, callback, ctx));
+ }
+
private class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
ByteString subscriberId;
Modified: zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
URL: http://svn.apache.org/viewvc/zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java?rev=1400836&r1=1400835&r2=1400836&view=diff
==============================================================================
--- zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java (original)
+++ zookeeper/bookkeeper/trunk/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java Mon Oct 22 11:04:47 2012
@@ -64,6 +64,21 @@ public interface SubscriptionManager {
Callback<Void> callback, Object ctx);
/**
+ * Close a particular subscription
+ *
+ * @param topic
+ * Topic Name
+ * @param subscriberId
+ * Subscriber Id
+ * @param callback
+ * Callback
+ * @param ctx
+ * Callback context
+ */
+ public void closeSubscription(ByteString topic, ByteString subscriberId,
+ Callback<Void> callback, Object ctx);
+
+ /**
* Delete a particular subscription
*
* @param topic