You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:20 UTC
[10/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
deleted file mode 100644
index eaed39d..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AbstractSubscriptionManager.java
+++ /dev/null
@@ -1,798 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.ArrayList;
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.Timer;
-import java.util.TimerTask;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.ScheduledExecutorService;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protoextensions.MessageIdUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.server.topics.TopicOwnershipChangeListener;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.ConcurrencyUtils;
-
-public abstract class AbstractSubscriptionManager implements SubscriptionManager, TopicOwnershipChangeListener {
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractSubscriptionManager.class);
-
- protected final ServerConfiguration cfg;
- protected final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seq =
- new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
- protected final TopicOpQueuer queuer;
- private final ArrayList<SubscriptionEventListener> listeners = new ArrayList<SubscriptionEventListener>();
-
- // Handle to the DeliveryManager for the server so we can stop serving subscribers
- // when losing topics
- private final DeliveryManager dm;
- // Handle to the PersistenceManager for the server so we can pass along the
- // message consume pointers for each topic.
- private final PersistenceManager pm;
- // Timer for running a recurring thread task to get the minimum message
- // sequence ID for each topic that all subscribers for it have consumed
- // already. With that information, we can call the PersistenceManager to
- // update it on the messages that are safe to be garbage collected.
- private final Timer timer = new Timer(true);
- // In memory mapping of topics to the minimum consumed message sequence ID
- // for all subscribers to the topic.
- private final ConcurrentHashMap<ByteString, Long> topic2MinConsumedMessagesMap = new ConcurrentHashMap<ByteString, Long>();
-
- protected final Callback<Void> noopCallback = new NoopCallback<Void>();
-
- static class NoopCallback<T> implements Callback<T> {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.warn("Exception found in AbstractSubscriptionManager : ", exception);
- }
-
- public void operationFinished(Object ctx, T resultOfOperation) {
- };
- }
-
- public AbstractSubscriptionManager(ServerConfiguration cfg, TopicManager tm,
- PersistenceManager pm, DeliveryManager dm,
- ScheduledExecutorService scheduler) {
- this.cfg = cfg;
- queuer = new TopicOpQueuer(scheduler);
- tm.addTopicOwnershipChangeListener(this);
- this.pm = pm;
- this.dm = dm;
- // Schedule the recurring MessagesConsumedTask only if a
- // PersistenceManager is passed.
- if (pm != null) {
- timer.schedule(new MessagesConsumedTask(), 0, cfg.getMessagesConsumedThreadRunInterval());
- }
- }
-
- /**
- * This is the Timer Task for finding out for each topic, what the minimum
- * consumed message by the subscribers are. This information is used to pass
- * along to the server's PersistenceManager so it can garbage collect older
- * topic messages that are no longer needed by the subscribers.
- */
- class MessagesConsumedTask extends TimerTask {
- /**
- * Implement the TimerTask's abstract run method.
- */
- @Override
- public void run() {
- // We are looping through relatively small in memory data structures
- // so it should be safe to run this fairly often.
- for (ByteString topic : top2sub2seq.keySet()) {
- final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
- if (topicSubscriptions == null) {
- continue;
- }
-
- long minConsumedMessage = Long.MAX_VALUE;
- boolean hasBound = true;
- // Loop through all subscribers on the current topic to find the
- // minimum persisted message id. The reason not using in-memory
- // consumed message id is LedgerRangs and InMemorySubscriptionState
- // may be inconsistent in case of a server crash.
- for (InMemorySubscriptionState curSubscription : topicSubscriptions.values()) {
- if (curSubscription.getLastPersistedSeqId() < minConsumedMessage) {
- minConsumedMessage = curSubscription.getLastPersistedSeqId();
- }
- hasBound = hasBound && curSubscription.getSubscriptionPreferences().hasMessageBound();
- }
- boolean callPersistenceManager = true;
- // Call the PersistenceManager if nobody subscribes to the topic
- // yet, or the consume pointer has moved ahead since the last
- // time, or if this is the initial subscription.
- Long minConsumedFromMap = topic2MinConsumedMessagesMap.get(topic);
- if (topicSubscriptions.isEmpty()
- || (minConsumedFromMap != null && minConsumedFromMap < minConsumedMessage)
- || (minConsumedFromMap == null && minConsumedMessage != 0)) {
- // Replace or put the new min consumed value. If it has changed
- // do nothing, as another thread has updated the min consumed message
- if ((minConsumedFromMap != null
- && (topic2MinConsumedMessagesMap.replace(topic, minConsumedFromMap,
- minConsumedMessage)))
- || (topic2MinConsumedMessagesMap.putIfAbsent(topic, minConsumedMessage) == null)) {
- pm.consumedUntil(topic, minConsumedMessage);
- }
- } else if (hasBound) {
- pm.consumeToBound(topic);
- }
- }
- }
- }
-
- private class AcquireOp extends TopicOpQueuer.AsynchronousOp<Void> {
- public AcquireOp(ByteString topic, Callback<Void> callback, Object ctx) {
- queuer.super(topic, callback, ctx);
- }
-
- @Override
- public void run() {
- if (top2sub2seq.containsKey(topic)) {
- cb.operationFinished(ctx, null);
- return;
- }
-
- readSubscriptions(topic, new Callback<Map<ByteString, InMemorySubscriptionState>>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
-
- @Override
- public void operationFinished(final Object ctx,
- final Map<ByteString, InMemorySubscriptionState> resultOfOperation) {
- // We've just inherited a bunch of subscriber for this
- // topic, some of which may be local. If they are, then we
- // need to (1) notify listeners of this and (2) record the
- // number for bookkeeping so that future
- // subscribes/unsubscribes can efficiently notify listeners.
-
- // The final "commit" (and "abort") operations.
- final Callback<Void> cb2 = new Callback<Void>() {
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("Subscription manager failed to acquired topic " + topic.toStringUtf8(),
- exception);
- cb.operationFailed(ctx, null);
- }
-
- @Override
- public void operationFinished(Object ctx, Void voidObj) {
- top2sub2seq.put(topic, resultOfOperation);
- logger.info("Subscription manager successfully acquired topic: " + topic.toStringUtf8());
- cb.operationFinished(ctx, null);
- }
-
- };
-
- // Notify listeners if necessary.
- if (hasLocalSubscriptions(resultOfOperation)) {
- notifyFirstLocalSubscribe(topic, false, cb2, ctx);
- } else {
- cb2.operationFinished(ctx, null);
- }
-
- updateMessageBound(topic);
- }
-
- }, ctx);
-
- }
-
- }
-
- private void notifyFirstLocalSubscribe(ByteString topic, boolean synchronous, final Callback<Void> cb, final Object ctx) {
- Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), cb, ctx);
- for (SubscriptionEventListener listener : listeners) {
- listener.onFirstLocalSubscribe(topic, synchronous, mcb);
- }
- }
-
- /**
- * Figure out who is subscribed. Do nothing if already acquired. If there's
- * an error reading the subscribers' sequence IDs, then the topic is not
- * acquired.
- *
- * @param topic
- * @param callback
- * @param ctx
- */
- @Override
- public void acquiredTopic(final ByteString topic, final Callback<Void> callback, Object ctx) {
- queuer.pushAndMaybeRun(topic, new AcquireOp(topic, callback, ctx));
- }
-
- class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
-
- public ReleaseOp(final ByteString topic, final Callback<Void> cb, Object ctx) {
- queuer.super(topic, cb, ctx);
- }
-
- @Override
- public void run() {
- Callback<Void> finalCb = new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- logger.info("Finished update subscription states when losting topic "
- + topic.toStringUtf8());
- finish();
- }
-
- @Override
- public void operationFailed(Object ctx,
- PubSubException exception) {
- logger.warn("Error when releasing topic : " + topic.toStringUtf8(), exception);
- finish();
- }
-
- private void finish() {
- // tell delivery manager to stop delivery for subscriptions of this topic
- final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.remove(topic);
- // no subscriptions now, it may be removed by other release ops
- if (null != topicSubscriptions) {
- for (ByteString subId : topicSubscriptions.keySet()) {
- if (logger.isDebugEnabled()) {
- logger.debug("Stop serving subscriber (" + topic.toStringUtf8() + ", "
- + subId.toStringUtf8() + ") when losing topic");
- }
- if (null != dm) {
- dm.stopServingSubscriber(topic, subId, SubscriptionEvent.TOPIC_MOVED,
- noopCallback, null);
- }
- }
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Stop serving topic " + topic.toStringUtf8());
- }
- // Since we decrement local count when some of remote subscriptions failed,
- // while we don't unsubscribe those succeed subscriptions. so we can't depends
- // on local count, just try to notify unsubscribe.
- notifyLastLocalUnsubscribe(topic);
- cb.operationFinished(ctx, null);
- }
- };
- if (logger.isDebugEnabled()) {
- logger.debug("Try to update subscription states when losing topic " + topic.toStringUtf8());
- }
- updateSubscriptionStates(topic, finalCb, ctx);
- }
- }
-
- void updateSubscriptionStates(ByteString topic, Callback<Void> finalCb, Object ctx) {
- // Try to update subscription states of a specified topic
- Map<ByteString, InMemorySubscriptionState> states = top2sub2seq.get(topic);
- if (null == states) {
- finalCb.operationFinished(ctx, null);
- } else {
- Callback<Void> mcb = CallbackUtils.multiCallback(states.size(), finalCb, ctx);
- for (Entry<ByteString, InMemorySubscriptionState> entry : states.entrySet()) {
- InMemorySubscriptionState memState = entry.getValue();
- if (memState.setLastConsumeSeqIdImmediately()) {
- updateSubscriptionState(topic, entry.getKey(), memState, mcb, ctx);
- } else {
- mcb.operationFinished(ctx, null);
- }
- }
- }
- }
-
- /**
- * Remove the local mapping.
- */
- @Override
- public void lostTopic(ByteString topic) {
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
- }
-
- private void notifyLastLocalUnsubscribe(ByteString topic) {
- for (SubscriptionEventListener listener : listeners)
- listener.onLastLocalUnsubscribe(topic);
- }
-
- protected abstract void readSubscriptions(final ByteString topic,
- final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx);
-
- protected abstract void readSubscriptionData(final ByteString topic, final ByteString subscriberId,
- final Callback<InMemorySubscriptionState> cb, Object ctx);
-
- private class SubscribeOp extends TopicOpQueuer.AsynchronousOp<SubscriptionData> {
- SubscribeRequest subRequest;
- MessageSeqId consumeSeqId;
-
- public SubscribeOp(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<SubscriptionData> callback, Object ctx) {
- queuer.super(topic, callback, ctx);
- this.subRequest = subRequest;
- this.consumeSeqId = consumeSeqId;
- }
-
- @Override
- public void run() {
-
- final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
- if (topicSubscriptions == null) {
- cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
- return;
- }
-
- final ByteString subscriberId = subRequest.getSubscriberId();
- final InMemorySubscriptionState subscriptionState = topicSubscriptions.get(subscriberId);
- CreateOrAttach createOrAttach = subRequest.getCreateOrAttach();
-
- if (subscriptionState != null) {
-
- if (createOrAttach.equals(CreateOrAttach.CREATE)) {
- String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " requested creating a subscription but it is already subscribed with state: "
- + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState());
- logger.error(msg);
- cb.operationFailed(ctx, new PubSubException.ClientAlreadySubscribedException(msg));
- return;
- }
-
- // Subscription existed before, check whether new preferences provided
- // if new preferences provided, merged the subscription data and updated them
- // TODO: needs ACL mechanism when changing preferences
- if (subRequest.hasPreferences() &&
- subscriptionState.updatePreferences(subRequest.getPreferences())) {
- updateSubscriptionPreferences(topic, subscriberId, subscriptionState, new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- if (logger.isDebugEnabled()) {
- logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " attaching to subscription with state: "
- + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState())
- + ", with preferences: "
- + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionPreferences()));
- }
- // update message bound if necessary
- updateMessageBound(topic);
- cb.operationFinished(ctx, subscriptionState.toSubscriptionData());
- }
- }, ctx);
- return;
- }
-
- // otherwise just attach
- if (logger.isDebugEnabled()) {
- logger.debug("Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " attaching to subscription with state: "
- + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionState())
- + ", with preferences: "
- + SubscriptionStateUtils.toString(subscriptionState.getSubscriptionPreferences()));
- }
-
- cb.operationFinished(ctx, subscriptionState.toSubscriptionData());
- return;
- }
-
- // we don't have a mapping for this subscriber
- if (createOrAttach.equals(CreateOrAttach.ATTACH)) {
- String msg = "Topic: " + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " requested attaching to an existing subscription but it is not subscribed";
- logger.error(msg);
- cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(msg));
- return;
- }
-
- // now the hard case, this is a brand new subscription, must record
- SubscriptionState.Builder stateBuilder = SubscriptionState.newBuilder().setMsgId(consumeSeqId);
-
- SubscriptionPreferences.Builder preferencesBuilder;
- if (subRequest.hasPreferences()) {
- preferencesBuilder = SubscriptionPreferences.newBuilder(subRequest.getPreferences());
- } else {
- preferencesBuilder = SubscriptionPreferences.newBuilder();
- }
-
- // backward compability
- if (subRequest.hasMessageBound()) {
- preferencesBuilder = preferencesBuilder.setMessageBound(subRequest.getMessageBound());
- }
-
- SubscriptionData.Builder subDataBuilder =
- SubscriptionData.newBuilder().setState(stateBuilder).setPreferences(preferencesBuilder);
- final SubscriptionData subData = subDataBuilder.build();
-
- createSubscriptionData(topic, subscriberId, subData, new Callback<Version>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
-
- @Override
- public void operationFinished(Object ctx, final Version version) {
- Callback<Void> cb2 = new Callback<Void>() {
- @Override
- public void operationFailed(final Object ctx, final PubSubException exception) {
- logger.error("subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
- + topic.toStringUtf8() + " failed due to failed listener callback", exception);
- // should remove subscription when synchronized cross-region subscription failed
- deleteSubscriptionData(topic, subscriberId, version, new Callback<Void>() {
- @Override
- public void operationFinished(Object context,
- Void resultOfOperation) {
- finish();
- }
- @Override
- public void operationFailed(Object context,
- PubSubException ex) {
- logger.error("Remove subscription for subscriber " + subscriberId.toStringUtf8() + " to topic "
- + topic.toStringUtf8() + " failed : ", ex);
- finish();
- }
- private void finish() {
- cb.operationFailed(ctx, exception);
- }
- }, ctx);
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- topicSubscriptions.put(subscriberId, new InMemorySubscriptionState(subData, version));
-
- updateMessageBound(topic);
-
- cb.operationFinished(ctx, subData);
- }
-
- };
-
- // if this will be the first local subscription, notifyFirstLocalSubscribe
- if (!SubscriptionStateUtils.isHubSubscriber(subRequest.getSubscriberId())
- && !hasLocalSubscriptions(topicSubscriptions))
- notifyFirstLocalSubscribe(topic, subRequest.getSynchronous(), cb2, ctx);
- else
- cb2.operationFinished(ctx, null);
- }
- }, ctx);
- }
- }
-
- /**
- * @return True if the given subscriberId-to-subscriberState map contains a local subscription:
- * the vast majority of subscriptions are local, so we will quickly encounter one if it exists.
- */
- private static boolean hasLocalSubscriptions(Map<ByteString, InMemorySubscriptionState> topicSubscriptions) {
- for (ByteString subId : topicSubscriptions.keySet())
- if (!SubscriptionStateUtils.isHubSubscriber(subId))
- return true;
- return false;
- }
-
- public void updateMessageBound(ByteString topic) {
- final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
- if (topicSubscriptions == null) {
- return;
- }
- int maxBound = Integer.MIN_VALUE;
- for (Map.Entry<ByteString, InMemorySubscriptionState> e : topicSubscriptions.entrySet()) {
- if (!e.getValue().getSubscriptionPreferences().hasMessageBound()) {
- maxBound = Integer.MIN_VALUE;
- break;
- } else {
- maxBound = Math.max(maxBound, e.getValue().getSubscriptionPreferences().getMessageBound());
- }
- }
- if (maxBound == Integer.MIN_VALUE) {
- pm.clearMessageBound(topic);
- } else {
- pm.setMessageBound(topic, maxBound);
- }
- }
-
- @Override
- public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<SubscriptionData> callback, Object ctx) {
- queuer.pushAndMaybeRun(topic, new SubscribeOp(topic, subRequest, consumeSeqId, callback, ctx));
- }
-
- private class ConsumeOp extends TopicOpQueuer.AsynchronousOp<Void> {
- ByteString subscriberId;
- MessageSeqId consumeSeqId;
-
- public ConsumeOp(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId, Callback<Void> callback,
- Object ctx) {
- queuer.super(topic, callback, ctx);
- this.subscriberId = subscriberId;
- this.consumeSeqId = consumeSeqId;
- }
-
- @Override
- public void run() {
- Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seq.get(topic);
- if (topicSubs == null) {
- cb.operationFinished(ctx, null);
- return;
- }
-
- final InMemorySubscriptionState subState = topicSubs.get(subscriberId);
- if (subState == null) {
- cb.operationFinished(ctx, null);
- return;
- }
-
- if (subState.setLastConsumeSeqId(consumeSeqId, cfg.getConsumeInterval())) {
- updateSubscriptionState(topic, subscriberId, subState, new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- subState.setLastPersistedSeqId(consumeSeqId.getLocalComponent());
- cb.operationFinished(ctx, resultOfOperation);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
- }, ctx);
- } else {
- if (logger.isDebugEnabled()) {
- logger.debug("Only advanced consume pointer in memory, will persist later, topic: "
- + topic.toStringUtf8() + " subscriberId: " + subscriberId.toStringUtf8()
- + " persistentState: " + SubscriptionStateUtils.toString(subState.getSubscriptionState())
- + " in-memory consume-id: "
- + MessageIdUtils.msgIdToReadableString(subState.getLastConsumeSeqId()));
- }
- cb.operationFinished(ctx, null);
- }
- // tell delivery manage about the consume event
- if (null != dm) {
- dm.messageConsumed(topic, subscriberId, consumeSeqId);
- }
- }
- }
-
- @Override
- public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
- Callback<Void> callback, Object ctx) {
- queuer.pushAndMaybeRun(topic, new ConsumeOp(topic, subscriberId, consumeSeqId, callback, ctx));
- }
-
- private class CloseSubscriptionOp extends TopicOpQueuer.AsynchronousOp<Void> {
-
- public CloseSubscriptionOp(ByteString topic, ByteString subscriberId,
- Callback<Void> callback, Object ctx) {
- queuer.super(topic, callback, ctx);
- }
-
- @Override
- public void run() {
- // TODO: BOOKKEEPER-412: we might need to move the loaded subscription
- // to reclaim memory
- // But for now we do nothing
- cb.operationFinished(ctx, null);
- }
- }
-
- @Override
- public void closeSubscription(ByteString topic, ByteString subscriberId,
- Callback<Void> callback, Object ctx) {
- queuer.pushAndMaybeRun(topic, new CloseSubscriptionOp(topic, subscriberId, callback, ctx));
- }
-
- private class UnsubscribeOp extends TopicOpQueuer.AsynchronousOp<Void> {
- ByteString subscriberId;
-
- public UnsubscribeOp(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
- queuer.super(topic, callback, ctx);
- this.subscriberId = subscriberId;
- }
-
- @Override
- public void run() {
- final Map<ByteString, InMemorySubscriptionState> topicSubscriptions = top2sub2seq.get(topic);
- if (topicSubscriptions == null) {
- cb.operationFailed(ctx, new PubSubException.ServerNotResponsibleForTopicException(""));
- return;
- }
-
- if (!topicSubscriptions.containsKey(subscriberId)) {
- cb.operationFailed(ctx, new PubSubException.ClientNotSubscribedException(""));
- return;
- }
-
- deleteSubscriptionData(topic, subscriberId, topicSubscriptions.get(subscriberId).getVersion(),
- new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- topicSubscriptions.remove(subscriberId);
- // Notify listeners if necessary.
- if (!SubscriptionStateUtils.isHubSubscriber(subscriberId)
- && !hasLocalSubscriptions(topicSubscriptions))
- notifyLastLocalUnsubscribe(topic);
-
- updateMessageBound(topic);
- cb.operationFinished(ctx, null);
- }
- }, ctx);
-
- }
-
- }
-
- @Override
- public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx) {
- queuer.pushAndMaybeRun(topic, new UnsubscribeOp(topic, subscriberId, callback, ctx));
- }
-
- /**
- * Not thread-safe.
- */
- @Override
- public void addListener(SubscriptionEventListener listener) {
- listeners.add(listener);
- }
-
- /**
- * Method to stop this class gracefully including releasing any resources
- * used and stopping all threads spawned.
- */
- public void stop() {
- timer.cancel();
- try {
- final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
- // update dirty subscriptions
- for (ByteString topic : top2sub2seq.keySet()) {
- Callback<Void> finalCb = new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- ConcurrencyUtils.put(queue, true);
- }
- @Override
- public void operationFailed(Object ctx,
- PubSubException exception) {
- ConcurrencyUtils.put(queue, false);
- }
- };
- updateSubscriptionStates(topic, finalCb, null);
- queue.take();
- }
- } catch (InterruptedException ie) {
- logger.warn("Error during updating subscription states : ", ie);
- }
- }
-
- private void updateSubscriptionState(final ByteString topic, final ByteString subscriberId,
- final InMemorySubscriptionState state,
- final Callback<Void> callback, Object ctx) {
- SubscriptionData subData;
- Callback<Version> cb = new Callback<Version>() {
- @Override
- public void operationFinished(Object ctx, Version version) {
- state.setVersion(version);
- callback.operationFinished(ctx, null);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (exception instanceof PubSubException.BadVersionException) {
- readSubscriptionData(topic, subscriberId, new Callback<InMemorySubscriptionState>() {
- @Override
- public void operationFinished(Object ctx,
- InMemorySubscriptionState resultOfOperation) {
- state.setVersion(resultOfOperation.getVersion());
- updateSubscriptionState(topic, subscriberId, state, callback, ctx);
- }
- @Override
- public void operationFailed(Object ctx,
- PubSubException exception) {
- callback.operationFailed(ctx, exception);
- }
- }, ctx);
-
- return;
- }
- callback.operationFailed(ctx, exception);
- }
- };
- if (isPartialUpdateSupported()) {
- subData = SubscriptionData.newBuilder().setState(state.getSubscriptionState()).build();
- updateSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
- } else {
- subData = state.toSubscriptionData();
- replaceSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
- }
- }
-
- private void updateSubscriptionPreferences(final ByteString topic, final ByteString subscriberId,
- final InMemorySubscriptionState state,
- final Callback<Void> callback, Object ctx) {
- SubscriptionData subData;
- Callback<Version> cb = new Callback<Version>() {
- @Override
- public void operationFinished(Object ctx, Version version) {
- state.setVersion(version);
- callback.operationFinished(ctx, null);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- if (exception instanceof PubSubException.BadVersionException) {
- readSubscriptionData(topic, subscriberId, new Callback<InMemorySubscriptionState>() {
- @Override
- public void operationFinished(Object ctx,
- InMemorySubscriptionState resultOfOperation) {
- state.setVersion(resultOfOperation.getVersion());
- updateSubscriptionPreferences(topic, subscriberId, state, callback, ctx);
- }
- @Override
- public void operationFailed(Object ctx,
- PubSubException exception) {
- callback.operationFailed(ctx, exception);
- }
- }, ctx);
-
- return;
- }
- callback.operationFailed(ctx, exception);
- }
- };
- if (isPartialUpdateSupported()) {
- subData = SubscriptionData.newBuilder().setPreferences(state.getSubscriptionPreferences()).build();
- updateSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
- } else {
- subData = state.toSubscriptionData();
- replaceSubscriptionData(topic, subscriberId, subData, state.getVersion(), cb, ctx);
- }
- }
-
- protected abstract boolean isPartialUpdateSupported();
-
- protected abstract void createSubscriptionData(final ByteString topic, ByteString subscriberId,
- SubscriptionData data, Callback<Version> callback, Object ctx);
-
- protected abstract void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
- Version version, Callback<Version> callback, Object ctx);
-
- protected abstract void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
- Version version, Callback<Version> callback, Object ctx);
-
- protected abstract void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, Callback<Void> callback,
- Object ctx);
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
deleted file mode 100644
index 389ccc9..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/AllToAllTopologyFilter.java
+++ /dev/null
@@ -1,75 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.io.IOException;
-
-import com.google.protobuf.ByteString;
-import org.apache.commons.configuration.Configuration;
-import org.apache.commons.configuration.ConfigurationException;
-import org.apache.hedwig.filter.MessageFilterBase;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-
-public class AllToAllTopologyFilter implements ServerMessageFilter {
-
- ByteString subscriberRegion;
- boolean isHubSubscriber;
-
- @Override
- public ServerMessageFilter initialize(Configuration conf)
- throws ConfigurationException, IOException {
- String region = conf.getString(ServerConfiguration.REGION, "standalone");
- if (null == region) {
- throw new IOException("No region found to run " + getClass().getName());
- }
- subscriberRegion = ByteString.copyFromUtf8(region);
- return this;
- }
-
- @Override
- public void uninitialize() {
- // do nothing now
- }
-
- @Override
- public MessageFilterBase setSubscriptionPreferences(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences) {
- isHubSubscriber = SubscriptionStateUtils.isHubSubscriber(subscriberId);
- return this;
- }
-
- @Override
- public boolean testMessage(Message message) {
- // We're using a simple all-to-all network topology, so no region
- // should ever need to forward messages to any other region.
- // Otherwise, with the current logic, messages will end up
- // ping-pong-ing back and forth between regions with subscriptions
- // to each other without termination (or in any other cyclic
- // configuration).
- if (isHubSubscriber && !message.getSrcRegion().equals(subscriberRegion)) {
- return false;
- } else {
- return true;
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
deleted file mode 100644
index 4adbf1c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionManager.java
+++ /dev/null
@@ -1,128 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-public class InMemorySubscriptionManager extends AbstractSubscriptionManager {
- private static final Logger logger = LoggerFactory.getLogger(InMemorySubscriptionManager.class);
- // Backup for top2sub2seq
- final ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>> top2sub2seqBackup =
- new ConcurrentHashMap<ByteString, Map<ByteString, InMemorySubscriptionState>>();
-
- public InMemorySubscriptionManager(ServerConfiguration conf,
- TopicManager tm, PersistenceManager pm,
- DeliveryManager dm,
- ScheduledExecutorService scheduler) {
- super(conf, tm, pm, dm, scheduler);
- }
-
- @Override
- protected void createSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData subData,
- Callback<Version> callback, Object ctx) {
- // nothing to do, in-memory info is already recorded by base class
- callback.operationFinished(ctx, null);
- }
-
- @Override
- protected void deleteSubscriptionData(ByteString topic, ByteString subscriberId, Version version, Callback<Void> callback,
- Object ctx) {
- // nothing to do, in-memory info is already deleted by base class
- callback.operationFinished(ctx, null);
- }
-
- @Override
- protected boolean isPartialUpdateSupported() {
- return false;
- }
-
- @Override
- protected void updateSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
- Version version, Callback<Version> callback, Object ctx) {
- throw new UnsupportedOperationException("Doesn't support partial update");
- }
-
- @Override
- protected void replaceSubscriptionData(ByteString topic, ByteString subscriberId, SubscriptionData data,
- Version version, Callback<Version> callback, Object ctx) {
- // nothing to do, in-memory info is already updated by base class
- callback.operationFinished(ctx, null);
- }
-
- @Override
- public void lostTopic(ByteString topic) {
- // Backup topic-sub2seq map for readSubscriptions
- final Map<ByteString, InMemorySubscriptionState> sub2seq = top2sub2seq.get(topic);
- if (null != sub2seq)
- top2sub2seqBackup.put(topic, sub2seq);
-
- if (logger.isDebugEnabled()) {
- logger.debug("InMemorySubscriptionManager is losing topic " + topic.toStringUtf8());
- }
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, noopCallback, null));
- }
-
- @Override
- protected void readSubscriptions(ByteString topic,
- Callback<Map<ByteString, InMemorySubscriptionState>> cb, Object ctx) {
- // Since we backed up in-memory information on lostTopic, we can just return that back
- Map<ByteString, InMemorySubscriptionState> topicSubs = top2sub2seqBackup.remove(topic);
-
- if (topicSubs != null) {
- cb.operationFinished(ctx, topicSubs);
- } else {
- cb.operationFinished(ctx, new ConcurrentHashMap<ByteString, InMemorySubscriptionState>());
- }
-
- }
-
- @Override
- protected void readSubscriptionData(ByteString topic,
- ByteString subscriberId, Callback<InMemorySubscriptionState> cb, Object ctx) {
- // Since we backed up in-memory information on lostTopic, we can just return that back
- Map<ByteString, InMemorySubscriptionState> sub2seqBackup = top2sub2seqBackup.get(topic);
- if (sub2seqBackup == null) {
- cb.operationFinished(ctx, new InMemorySubscriptionState(
- SubscriptionData.getDefaultInstance(), Version.NEW));
- return;
- }
- InMemorySubscriptionState subState = sub2seqBackup.remove(subscriberId);
-
- if (subState != null) {
- cb.operationFinished(ctx, subState);
- } else {
- cb.operationFinished(ctx, new InMemorySubscriptionState(
- SubscriptionData.getDefaultInstance(), Version.NEW));
- }
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
deleted file mode 100644
index ea74599..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/InMemorySubscriptionState.java
+++ /dev/null
@@ -1,198 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-
-import java.util.Map;
-import com.google.protobuf.ByteString;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionState;
-import org.apache.hedwig.protoextensions.MapUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-
-public class InMemorySubscriptionState {
- SubscriptionState subscriptionState;
- SubscriptionPreferences subscriptionPreferences;
- MessageSeqId lastConsumeSeqId;
- Version version;
- long lastPersistedSeqId;
-
- public InMemorySubscriptionState(SubscriptionData subscriptionData, Version version, MessageSeqId lastConsumeSeqId) {
- this.subscriptionState = subscriptionData.getState();
- if (subscriptionData.hasPreferences()) {
- this.subscriptionPreferences = subscriptionData.getPreferences();
- } else {
- // set initial subscription preferences
- SubscriptionPreferences.Builder prefsBuilder = SubscriptionPreferences.newBuilder();
- // progate the old system preferences from subscription state to preferences
- prefsBuilder.setMessageBound(subscriptionState.getMessageBound());
- this.subscriptionPreferences = prefsBuilder.build();
-
- }
- this.lastConsumeSeqId = lastConsumeSeqId;
- this.version = version;
- this.lastPersistedSeqId = subscriptionState.getMsgId().getLocalComponent();
- }
-
- public InMemorySubscriptionState(SubscriptionData subscriptionData, Version version) {
- this(subscriptionData, version, subscriptionData.getState().getMsgId());
- }
-
- public SubscriptionData toSubscriptionData() {
- SubscriptionState.Builder stateBuilder =
- SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId);
- return SubscriptionData.newBuilder().setState(stateBuilder)
- .setPreferences(subscriptionPreferences)
- .build();
- }
-
- public SubscriptionState getSubscriptionState() {
- return subscriptionState;
- }
-
- public SubscriptionPreferences getSubscriptionPreferences() {
- return subscriptionPreferences;
- }
-
- public MessageSeqId getLastConsumeSeqId() {
- return lastConsumeSeqId;
- }
-
- public Version getVersion() {
- return version;
- }
-
- public void setVersion(Version version) {
- this.version = version;
- }
-
- /**
- *
- * @param lastConsumeSeqId
- * @param consumeInterval
- * The amount of laziness we want in persisting the consume
- * pointers
- * @return true if the resulting structure needs to be persisted, false
- * otherwise
- */
- public boolean setLastConsumeSeqId(MessageSeqId lastConsumeSeqId, int consumeInterval) {
- long interval = lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent();
- if (interval <= 0) {
- return false;
- }
-
- // set consume seq id when it is larger
- this.lastConsumeSeqId = lastConsumeSeqId;
- if (interval < consumeInterval) {
- return false;
- }
-
- // subscription state will be updated, marked it as clean
- subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build();
- return true;
- }
-
- /**
- * Set lastConsumeSeqId Immediately
- *
- * @return true if the resulting structure needs to be persisted, false otherwise
- */
- public boolean setLastConsumeSeqIdImmediately() {
- long interval = lastConsumeSeqId.getLocalComponent() - subscriptionState.getMsgId().getLocalComponent();
- // no need to set
- if (interval <= 0) {
- return false;
- }
- subscriptionState = SubscriptionState.newBuilder(subscriptionState).setMsgId(lastConsumeSeqId).build();
- return true;
- }
-
- public long getLastPersistedSeqId() {
- return lastPersistedSeqId;
- }
-
- public void setLastPersistedSeqId(long lastPersistedSeqId) {
- this.lastPersistedSeqId = lastPersistedSeqId;
- }
-
- /**
- * Update preferences.
- *
- * @return true if preferences is updated, which needs to be persisted, false otherwise.
- */
- public boolean updatePreferences(SubscriptionPreferences preferences) {
- boolean changed = false;
- SubscriptionPreferences.Builder newPreferencesBuilder = SubscriptionPreferences.newBuilder(subscriptionPreferences);
- if (preferences.hasMessageBound()) {
- if (!subscriptionPreferences.hasMessageBound() ||
- subscriptionPreferences.getMessageBound() != preferences.getMessageBound()) {
- newPreferencesBuilder.setMessageBound(preferences.getMessageBound());
- changed = true;
- }
- }
- if (preferences.hasMessageFilter()) {
- if (!subscriptionPreferences.hasMessageFilter() ||
- !subscriptionPreferences.getMessageFilter().equals(preferences.getMessageFilter())) {
- newPreferencesBuilder.setMessageFilter(preferences.getMessageFilter());
- changed = true;
- }
- }
- if (preferences.hasMessageWindowSize()) {
- if (!subscriptionPreferences.hasMessageWindowSize() ||
- subscriptionPreferences.getMessageWindowSize() !=
- preferences.getMessageWindowSize()) {
- newPreferencesBuilder.setMessageWindowSize(preferences.getMessageWindowSize());
- changed = true;
- }
- }
- if (preferences.hasOptions()) {
- Map<String, ByteString> userOptions = SubscriptionStateUtils.buildUserOptions(subscriptionPreferences);
- Map<String, ByteString> optUpdates = SubscriptionStateUtils.buildUserOptions(preferences);
- boolean optChanged = false;
- for (Map.Entry<String, ByteString> entry : optUpdates.entrySet()) {
- String key = entry.getKey();
- if (userOptions.containsKey(key)) {
- if (null == entry.getValue()) {
- userOptions.remove(key);
- optChanged = true;
- } else {
- if (!entry.getValue().equals(userOptions.get(key))) {
- userOptions.put(key, entry.getValue());
- optChanged = true;
- }
- }
- } else {
- userOptions.put(key, entry.getValue());
- optChanged = true;
- }
- }
- if (optChanged) {
- changed = true;
- newPreferencesBuilder.setOptions(MapUtils.buildMapBuilder(userOptions));
- }
- }
- if (changed) {
- subscriptionPreferences = newPreferencesBuilder.build();
- }
- return changed;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
deleted file mode 100644
index 47fdfd2..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/MMSubscriptionManager.java
+++ /dev/null
@@ -1,138 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import java.io.IOException;
-import java.util.Map;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ScheduledExecutorService;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.meta.MetadataManagerFactory;
-import org.apache.hedwig.server.meta.SubscriptionDataManager;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.bookkeeper.versioning.Version;
-import org.apache.bookkeeper.versioning.Versioned;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * MetaManager-based subscription manager.
- */
-public class MMSubscriptionManager extends AbstractSubscriptionManager {
-
- private static final Logger logger = LoggerFactory.getLogger(MMSubscriptionManager.class);
- SubscriptionDataManager subManager;
-
- public MMSubscriptionManager(ServerConfiguration cfg,
- MetadataManagerFactory metaManagerFactory,
- TopicManager topicMgr, PersistenceManager pm,
- DeliveryManager dm,
- ScheduledExecutorService scheduler) {
- super(cfg, topicMgr, pm, dm, scheduler);
- this.subManager = metaManagerFactory.newSubscriptionDataManager();
- }
-
- @Override
- protected void readSubscriptions(final ByteString topic,
- final Callback<Map<ByteString, InMemorySubscriptionState>> cb, final Object ctx) {
- subManager.readSubscriptions(topic, new Callback<Map<ByteString, Versioned<SubscriptionData>>>() {
- @Override
- public void operationFailed(Object ctx, PubSubException pse) {
- cb.operationFailed(ctx, pse);
- }
- @Override
- public void operationFinished(Object ctx, Map<ByteString, Versioned<SubscriptionData>> subs) {
- Map<ByteString, InMemorySubscriptionState> results = new ConcurrentHashMap<ByteString, InMemorySubscriptionState>();
- for (Map.Entry<ByteString, Versioned<SubscriptionData>> subEntry : subs.entrySet()) {
- Versioned<SubscriptionData> vv = subEntry.getValue();
- results.put(subEntry.getKey(), new InMemorySubscriptionState(vv.getValue(), vv.getVersion()));
- }
- cb.operationFinished(ctx, results);
- }
- }, ctx);
- }
-
- @Override
- protected void readSubscriptionData(final ByteString topic, final ByteString subscriberId,
- final Callback<InMemorySubscriptionState> cb, final Object ctx) {
- subManager.readSubscriptionData(topic, subscriberId, new Callback<Versioned<SubscriptionData>>() {
- @Override
- public void operationFinished(Object ctx,
- Versioned<SubscriptionData> subData) {
- if (null != subData) {
- cb.operationFinished(ctx,
- new InMemorySubscriptionState(subData.getValue(), subData.getVersion()));
- } else {
- cb.operationFinished(ctx, new InMemorySubscriptionState(
- SubscriptionData.getDefaultInstance(), Version.NEW));
- }
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- cb.operationFailed(ctx, exception);
- }
- }, ctx);
- }
-
- @Override
- protected boolean isPartialUpdateSupported() {
- return subManager.isPartialUpdateSupported();
- }
-
- @Override
- protected void createSubscriptionData(final ByteString topic, final ByteString subscriberId,
- final SubscriptionData subData, final Callback<Version> callback, final Object ctx) {
- subManager.createSubscriptionData(topic, subscriberId, subData, callback, ctx);
- }
-
- @Override
- protected void replaceSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData subData,
- final Version version, final Callback<Version> callback, final Object ctx) {
- subManager.replaceSubscriptionData(topic, subscriberId, subData, version, callback, ctx);
- }
-
- @Override
- protected void updateSubscriptionData(final ByteString topic, final ByteString subscriberId, final SubscriptionData subData,
- final Version version, final Callback<Version> callback, final Object ctx) {
- subManager.updateSubscriptionData(topic, subscriberId, subData, version, callback, ctx);
- }
-
- @Override
- protected void deleteSubscriptionData(final ByteString topic, final ByteString subscriberId, Version version,
- final Callback<Void> callback, final Object ctx) {
- subManager.deleteSubscriptionData(topic, subscriberId, version, callback, ctx);
- }
-
- @Override
- public void stop() {
- super.stop();
- try {
- subManager.close();
- } catch (IOException ioe) {
- logger.warn("Exception closing subscription data manager : ", ioe);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
deleted file mode 100644
index 6c6e626..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionEventListener.java
+++ /dev/null
@@ -1,57 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.util.Callback;
-
-/**
- * For listening to events that are issued by a SubscriptionManager.
- *
- */
-public interface SubscriptionEventListener {
-
- /**
- * Called by the subscription manager when it previously had zero local
- * subscribers for a topic and is currently accepting its first local
- * subscriber.
- *
- * @param topic
- * The topic of interest.
- * @param synchronous
- * Whether this request was actually initiated by a new local
- * subscriber, or whether it was an existing subscription
- * inherited by the hub (e.g. when recovering the state from ZK).
- * @param cb
- * The subscription will not complete until success is called on
- * this callback. An error on cb will result in a subscription
- * error.
- */
- public void onFirstLocalSubscribe(ByteString topic, boolean synchronous, Callback<Void> cb);
-
- /**
- * Called by the SubscriptionManager when it previously had non-zero local
- * subscribers for a topic and is currently dropping its last local
- * subscriber. This is fully asynchronous so there is no callback.
- *
- * @param topic
- * The topic of interest.
- */
- public void onLastLocalUnsubscribe(ByteString topic);
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
deleted file mode 100644
index eadebcb..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/subscriptions/SubscriptionManager.java
+++ /dev/null
@@ -1,123 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.subscriptions;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.util.Callback;
-
-/**
- * All methods are thread-safe.
- */
-public interface SubscriptionManager {
-
- /**
- *
- * Register a new subscription for the given subscriber for the given topic.
- * This method should reliably persist the existence of the subscription in
- * a way that it can't be lost. If the subscription already exists,
- * depending on the create or attach flag in the subscribe request, an
- * exception may be returned.
- *
- * This is an asynchronous method.
- *
- * @param topic
- * @param subRequest
- * @param consumeSeqId
- * The seqId to start serving the subscription from, if this is a
- * brand new subscription
- * @param callback
- * The subscription data returned by the callback.
- * @param ctx
- */
- public void serveSubscribeRequest(ByteString topic, SubscribeRequest subRequest, MessageSeqId consumeSeqId,
- Callback<SubscriptionData> callback, Object ctx);
-
- /**
- * Set the consume position of a given subscriber on a given topic. Note
- * that this method need not persist the consume position immediately but
- * can be lazy and persist it later asynchronously, if that is more
- * efficient.
- *
- * @param topic
- * @param subscriberId
- * @param consumeSeqId
- */
- public void setConsumeSeqIdForSubscriber(ByteString topic, ByteString subscriberId, MessageSeqId consumeSeqId,
- Callback<Void> callback, Object ctx);
-
- /**
- * Close a particular subscription
- *
- * @param topic
- * Topic Name
- * @param subscriberId
- * Subscriber Id
- * @param callback
- * Callback
- * @param ctx
- * Callback context
- */
- public void closeSubscription(ByteString topic, ByteString subscriberId,
- Callback<Void> callback, Object ctx);
-
- /**
- * Delete a particular subscription
- *
- * @param topic
- * @param subscriberId
- */
- public void unsubscribe(ByteString topic, ByteString subscriberId, Callback<Void> callback, Object ctx);
-
- // Management API methods that we will fill in later
- // /**
- // * Get the ids of all subscribers for a given topic
- // *
- // * @param topic
- // * @return A list of subscriber ids that are currently subscribed to the
- // * given topic
- // */
- // public List<ByteString> getSubscriptionsForTopic(ByteString topic);
- //
- // /**
- // * Get the topics to which a given subscriber is subscribed to
- // *
- // * @param subscriberId
- // * @return A list of the topics to which the given subscriber is
- // subscribed
- // * to
- // * @throws ServiceDownException
- // * If there is an error in looking up the subscription
- // * information
- // */
- // public List<ByteString> getTopicsForSubscriber(ByteString subscriberId)
- // throws ServiceDownException;
-
- /**
- * Add a listener that is notified when topic-subscription pairs are added
- * or removed.
- */
- public void addListener(SubscriptionEventListener listener);
-
- /**
- * Stop Subscription Manager
- */
- public void stop();
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
deleted file mode 100644
index 2d9aba2..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/topics/AbstractTopicManager.java
+++ /dev/null
@@ -1,314 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.topics;
-
-import java.net.UnknownHostException;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.List;
-import java.util.concurrent.ScheduledExecutorService;
-import java.util.concurrent.TimeUnit;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.TopicOpQueuer;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.CallbackUtils;
-import org.apache.hedwig.util.HedwigSocketAddress;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.cache.Cache;
-import com.google.common.cache.CacheBuilder;
-import com.google.common.cache.RemovalListener;
-import com.google.common.cache.RemovalNotification;
-import com.google.common.collect.Lists;
-import com.google.protobuf.ByteString;
-
-public abstract class AbstractTopicManager implements TopicManager {
-
- /**
- * Stats for a topic. For now it just an empty stub class.
- */
- static class TopicStats {
- }
-
- final static TopicStats STUB_TOPIC_STATS = new TopicStats();
-
- /**
- * My name.
- */
- protected HedwigSocketAddress addr;
-
- /**
- * Topic change listeners.
- */
- protected ArrayList<TopicOwnershipChangeListener> listeners = new ArrayList<TopicOwnershipChangeListener>();
-
- /**
- * List of topics I believe I am responsible for.
- */
- protected Cache<ByteString, TopicStats> topics;
-
- protected TopicOpQueuer queuer;
- protected ServerConfiguration cfg;
- protected ScheduledExecutorService scheduler;
-
- private static final Logger logger = LoggerFactory.getLogger(AbstractTopicManager.class);
-
- private class GetOwnerOp extends TopicOpQueuer.AsynchronousOp<HedwigSocketAddress> {
- public boolean shouldClaim;
-
- public GetOwnerOp(final ByteString topic, boolean shouldClaim,
- final Callback<HedwigSocketAddress> cb, Object ctx) {
- queuer.super(topic, cb, ctx);
- this.shouldClaim = shouldClaim;
- }
-
- @Override
- public void run() {
- realGetOwner(topic, shouldClaim, cb, ctx);
- }
- }
-
- private class ReleaseOp extends TopicOpQueuer.AsynchronousOp<Void> {
- final boolean checkExistence;
-
- public ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx) {
- this(topic, cb, ctx, true);
- }
-
- ReleaseOp(ByteString topic, Callback<Void> cb, Object ctx,
- boolean checkExistence) {
- queuer.super(topic, cb, ctx);
- this.checkExistence = checkExistence;
- }
-
- @Override
- public void run() {
- if (checkExistence) {
- TopicStats stats = topics.getIfPresent(topic);
- if (null == stats) {
- cb.operationFinished(ctx, null);
- return;
- }
- }
- realReleaseTopic(topic, cb, ctx);
- }
- }
-
- /**
- * Release topic when the topic is removed from topics cache.
- */
- class ReleaseTopicListener implements RemovalListener<ByteString, TopicStats> {
- @Override
- public void onRemoval(RemovalNotification<ByteString, TopicStats> notification) {
- if (notification.wasEvicted()) {
- logger.info("topic {} is evicted", notification.getKey().toStringUtf8());
- // if the topic is evicted, we need to release the topic.
- releaseTopicInternally(notification.getKey(), false);
- }
- }
- }
-
- public AbstractTopicManager(ServerConfiguration cfg, ScheduledExecutorService scheduler)
- throws UnknownHostException {
- this.cfg = cfg;
- this.queuer = new TopicOpQueuer(scheduler);
- this.scheduler = scheduler;
- addr = cfg.getServerAddr();
-
- // build the topic cache
- CacheBuilder<ByteString, TopicStats> cacheBuilder = CacheBuilder.newBuilder()
- .maximumSize(cfg.getMaxNumTopics())
- .initialCapacity(cfg.getInitNumTopics())
- // TODO: change to same number as topic op queuer threads
- .concurrencyLevel(Runtime.getRuntime().availableProcessors())
- .removalListener(new ReleaseTopicListener());
- if (cfg.getRetentionSecsAfterAccess() > 0) {
- cacheBuilder.expireAfterAccess(cfg.getRetentionSecsAfterAccess(), TimeUnit.SECONDS);
- }
- topics = cacheBuilder.build();
- }
-
- @Override
- public void incrementTopicAccessTimes(ByteString topic) {
- // let guava cache handle hits counting
- topics.getIfPresent(topic);
- }
-
- @Override
- public synchronized void addTopicOwnershipChangeListener(TopicOwnershipChangeListener listener) {
- listeners.add(listener);
- }
-
- private void releaseTopicInternally(final ByteString topic, boolean checkExistence) {
- // Enqueue a release operation. (Recall that release
- // doesn't "fail" even if the topic is missing.)
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, new Callback<Void>() {
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- logger.error("failure that should never happen when releasing topic "
- + topic, exception);
- }
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- logger.info("successfully release of topic "
- + topic.toStringUtf8());
- if (logger.isDebugEnabled()) {
- logger.debug("successfully release of topic "
- + topic.toStringUtf8());
- }
- }
-
- }, null, checkExistence));
- }
-
- protected final synchronized void notifyListenersAndAddToOwnedTopics(final ByteString topic,
- final Callback<HedwigSocketAddress> originalCallback, final Object originalContext) {
-
- Callback<Void> postCb = new Callback<Void>() {
-
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- topics.put(topic, STUB_TOPIC_STATS);
- if (cfg.getRetentionSecs() > 0) {
- scheduler.schedule(new Runnable() {
- @Override
- public void run() {
- releaseTopicInternally(topic, true);
- }
- }, cfg.getRetentionSecs(), TimeUnit.SECONDS);
- }
- originalCallback.operationFinished(originalContext, addr);
- }
-
- @Override
- public void operationFailed(final Object ctx, final PubSubException exception) {
- // TODO: optimization: we can release this as soon as we experience the first error.
- Callback<Void> cb = new Callback<Void>() {
- @Override
- public void operationFinished(Object _ctx, Void _resultOfOperation) {
- originalCallback.operationFailed(ctx, exception);
- }
- @Override
- public void operationFailed(Object _ctx, PubSubException _exception) {
- logger.error("Exception releasing topic", _exception);
- originalCallback.operationFailed(ctx, exception);
- }
- };
-
- realReleaseTopic(topic, cb, originalContext);
- }
- };
-
- Callback<Void> mcb = CallbackUtils.multiCallback(listeners.size(), postCb, null);
- for (TopicOwnershipChangeListener listener : listeners) {
- listener.acquiredTopic(topic, mcb, null);
- }
- }
-
- private void realReleaseTopic(ByteString topic, Callback<Void> callback, Object ctx) {
- for (TopicOwnershipChangeListener listener : listeners)
- listener.lostTopic(topic);
- topics.invalidate(topic);
- postReleaseCleanup(topic, callback, ctx);
- }
-
- @Override
- public final void getOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx) {
- queuer.pushAndMaybeRun(topic, new GetOwnerOp(topic, shouldClaim, cb, ctx));
- }
-
- @Override
- public final void releaseTopic(ByteString topic, Callback<Void> cb, Object ctx) {
- queuer.pushAndMaybeRun(topic, new ReleaseOp(topic, cb, ctx));
- }
-
- @Override
- public final void releaseTopics(int numTopics, final Callback<Long> callback, final Object ctx) {
- // This is a best effort function. We sacrifice accuracy to not hold a lock on the topics set.
- List<ByteString> topicList = getTopicList();
- // Make sure we release only as many topics as we own.
- final long numTopicsToRelease = Math.min(topicList.size(), numTopics);
- // Shuffle the list of topics we own, so that we release a random subset.
- Collections.shuffle(topicList);
- Callback<Void> mcb = CallbackUtils.multiCallback((int)numTopicsToRelease, new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void ignoreVal) {
- callback.operationFinished(ctx, numTopicsToRelease);
- }
-
- @Override
- public void operationFailed(Object ctx, PubSubException e) {
- long notReleased = 0;
- if (e instanceof PubSubException.CompositeException) {
- notReleased = ((PubSubException.CompositeException)e).getExceptions().size();
- }
- callback.operationFinished(ctx, numTopicsToRelease - notReleased);
- }
- }, ctx);
-
- // Try to release "numTopicsToRelease" topics. It's okay if we're not
- // able to release some topics. We signal that we tried by invoking the callback's
- // operationFinished() with the actual number of topics released.
- logger.info("This hub is releasing {} topics", numTopicsToRelease);
- long releaseCount = 0;
- for (ByteString topic : topicList) {
- if (++releaseCount > numTopicsToRelease) {
- break;
- }
- releaseTopic(topic, mcb, ctx);
- }
- }
-
- @Override
- public List<ByteString> getTopicList() {
- List<ByteString> topicList;
- synchronized (this.topics) {
- topicList = Lists.newArrayList(this.topics.asMap().keySet());
- }
- return topicList;
- }
-
- /**
- * This method should "return" the owner of the topic if one has been chosen
- * already. If there is no pre-chosen owner, either this hub or some other
- * should be chosen based on the shouldClaim parameter. If its ends up
- * choosing this hub as the owner, the {@code
- * AbstractTopicManager#notifyListenersAndAddToOwnedTopics(ByteString,
- * OperationCallback, Object)} method must be called.
- *
- */
- protected abstract void realGetOwner(ByteString topic, boolean shouldClaim,
- Callback<HedwigSocketAddress> cb, Object ctx);
-
- /**
- * The method should do any cleanup necessary to indicate to other hubs that
- * this topic has been released
- */
- protected abstract void postReleaseCleanup(ByteString topic, Callback<Void> cb, Object ctx);
-
- @Override
- public void stop() {
- // do nothing now
- }
-}