You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:26 UTC
[16/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
deleted file mode 100644
index 0480b22..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
+++ /dev/null
@@ -1,978 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.delivery;
-
-import static org.apache.hedwig.util.VarArgs.va;
-
-import java.util.Comparator;
-import java.util.HashSet;
-import java.util.Queue;
-import java.util.Set;
-import java.util.SortedMap;
-import java.util.TreeMap;
-import java.util.concurrent.BlockingQueue;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.ConcurrentMap;
-import java.util.concurrent.PriorityBlockingQueue;
-import java.util.concurrent.LinkedBlockingQueue;
-import java.util.concurrent.TimeUnit;
-import java.util.concurrent.locks.ReentrantReadWriteLock;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.annotations.VisibleForTesting;
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionPreferences;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.common.UnexpectedError;
-import org.apache.hedwig.server.handlers.SubscriptionChannelManager.SubChannelDisconnectedListener;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.persistence.CancelScanRequest;
-import org.apache.hedwig.server.persistence.Factory;
-import org.apache.hedwig.server.persistence.MapMethods;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.persistence.ReadAheadCache;
-import org.apache.hedwig.server.persistence.ScanCallback;
-import org.apache.hedwig.server.persistence.ScanRequest;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class FIFODeliveryManager implements DeliveryManager, SubChannelDisconnectedListener {
-
- protected static final Logger logger = LoggerFactory.getLogger(FIFODeliveryManager.class);
-
- private static Callback<Void> NOP_CALLBACK = new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- }
- };
-
- protected interface DeliveryManagerRequest {
- public void performRequest();
- }
-
- /**
- * Stores a mapping from topic to the delivery pointers on the topic. The
- * delivery pointers are stored in a sorted map from seq-id to the set of
- * subscribers at that seq-id
- */
- ConcurrentMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>> perTopicDeliveryPtrs;
-
- /**
- * Mapping from delivery end point to the subscriber state that we are
- * serving at that end point. This prevents us e.g., from serving two
- * subscriptions to the same endpoint
- */
- ConcurrentMap<TopicSubscriber, ActiveSubscriberState> subscriberStates;
-
- private final ReadAheadCache cache;
- private final PersistenceManager persistenceMgr;
- private TopicManager tm;
- private ServerConfiguration cfg;
-
- private final int numDeliveryWorkers;
- private final DeliveryWorker[] deliveryWorkers;
-
- private class DeliveryWorker implements Runnable {
-
- BlockingQueue<DeliveryManagerRequest> requestQueue =
- new LinkedBlockingQueue<DeliveryManagerRequest>();;
-
- /**
- * The queue of all subscriptions that are facing a transient error either
- * in scanning from the persistence manager, or in sending to the consumer
- */
- Queue<ActiveSubscriberState> retryQueue =
- new PriorityBlockingQueue<ActiveSubscriberState>(32, new Comparator<ActiveSubscriberState>() {
- @Override
- public int compare(ActiveSubscriberState as1, ActiveSubscriberState as2) {
- long s = as1.lastScanErrorTime - as2.lastScanErrorTime;
- return s > 0 ? 1 : (s < 0 ? -1 : 0);
- }
- });
-
- // Boolean indicating if this thread should continue running. This is used
- // when we want to stop the thread during a PubSubServer shutdown.
- protected volatile boolean keepRunning = true;
- private final Thread workerThread;
- private final int idx;
-
- private final Object suspensionLock = new Object();
- private boolean suspended = false;
-
- DeliveryWorker(int index) {
- this.idx = index;
- workerThread = new Thread(this, "DeliveryManagerThread-" + index);
- }
-
- void start() {
- workerThread.start();
- }
-
- /**
- * Stop method which will enqueue a ShutdownDeliveryManagerRequest.
- */
- void stop() {
- enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
- }
-
- /**
- * Stop FIFO delivery worker from processing requests. (for testing)
- */
- void suspendProcessing() {
- synchronized(suspensionLock) {
- suspended = true;
- }
- }
-
- /**
- * Resume FIFO delivery worker. (for testing)
- */
- void resumeProcessing() {
- synchronized(suspensionLock) {
- suspended = false;
- suspensionLock.notify();
- }
- }
-
- @Override
- public void run() {
- while (keepRunning) {
- DeliveryManagerRequest request = null;
-
- try {
- // We use a timeout of 1 second, so that we can wake up once in
- // a while to check if there is something in the retry queue.
- request = requestQueue.poll(1, TimeUnit.SECONDS);
- synchronized(suspensionLock) {
- while (suspended) {
- suspensionLock.wait();
- }
- }
- } catch (InterruptedException e) {
- Thread.currentThread().interrupt();
- }
-
- // First retry any subscriptions that had failed and need a retry
- retryErroredSubscribers();
-
- if (request == null) {
- continue;
- }
-
- request.performRequest();
-
- }
- }
-
- protected void enqueueWithoutFailure(DeliveryManagerRequest request) {
- if (!requestQueue.offer(request)) {
- throw new UnexpectedError("Could not enqueue object: " + request
- + " to request queue for delivery worker ." + idx);
- }
- }
-
- public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) {
- subscriber.setLastScanErrorTime(MathUtils.now());
-
- if (!retryQueue.offer(subscriber)) {
- throw new UnexpectedError("Could not enqueue to retry queue for delivery worker " + idx);
- }
- }
-
- public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) {
- subscriber.clearLastScanErrorTime();
- if (!retryQueue.offer(subscriber)) {
- throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
- }
- // no request in request queue now
- // issue a empty delivery request to not waiting for polling requests queue
- if (requestQueue.isEmpty()) {
- enqueueWithoutFailure(new DeliveryManagerRequest() {
- @Override
- public void performRequest() {
- // do nothing
- }
- });
- }
- }
-
- protected void retryErroredSubscribers() {
- long lastInterestingFailureTime = MathUtils.now() - cfg.getScanBackoffPeriodMs();
- ActiveSubscriberState subscriber;
-
- while ((subscriber = retryQueue.peek()) != null) {
- if (subscriber.getLastScanErrorTime() > lastInterestingFailureTime) {
- // Not enough time has elapsed yet, will retry later
- // Since the queue is fifo, no need to check later items
- return;
- }
-
- // retry now
- subscriber.deliverNextMessage();
- retryQueue.poll();
- }
- }
-
- protected class ShutdownDeliveryManagerRequest implements DeliveryManagerRequest {
- // This is a simple type of Request we will enqueue when the
- // PubSubServer is shut down and we want to stop the DeliveryManager
- // thread.
- @Override
- public void performRequest() {
- keepRunning = false;
- }
- }
-
- }
-
-
-
- public FIFODeliveryManager(TopicManager tm, PersistenceManager persistenceMgr,
- ServerConfiguration cfg) {
- this.tm = tm;
- this.persistenceMgr = persistenceMgr;
- if (persistenceMgr instanceof ReadAheadCache) {
- this.cache = (ReadAheadCache) persistenceMgr;
- } else {
- this.cache = null;
- }
- perTopicDeliveryPtrs =
- new ConcurrentHashMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>>();
- subscriberStates =
- new ConcurrentHashMap<TopicSubscriber, ActiveSubscriberState>();
- this.cfg = cfg;
- // initialize the delivery workers
- this.numDeliveryWorkers = cfg.getNumDeliveryThreads();
- this.deliveryWorkers = new DeliveryWorker[numDeliveryWorkers];
- for (int i=0; i<numDeliveryWorkers; i++) {
- deliveryWorkers[i] = new DeliveryWorker(i);
- }
- }
-
- @Override
- public void start() {
- for (int i=0; i<numDeliveryWorkers; i++) {
- deliveryWorkers[i].start();
- }
- }
-
- /**
- * Stop FIFO delivery manager from processing requests. (for testing)
- */
- @VisibleForTesting
- public void suspendProcessing() {
- for (int i=0; i<numDeliveryWorkers; i++) {
- deliveryWorkers[i].suspendProcessing();
- }
- }
-
- /**
- * Resume FIFO delivery manager. (for testing)
- */
- @VisibleForTesting
- public void resumeProcessing() {
- for (int i=0; i<numDeliveryWorkers; i++) {
- deliveryWorkers[i].resumeProcessing();
- }
- }
-
- /**
- * Stop the FIFO delivery manager.
- */
- @Override
- public void stop() {
- for (int i=0; i<numDeliveryWorkers; i++) {
- deliveryWorkers[i].stop();
- }
- }
-
- private DeliveryWorker getDeliveryWorker(ByteString topic) {
- return deliveryWorkers[MathUtils.signSafeMod(topic.hashCode(), numDeliveryWorkers)];
- }
-
- /**
- * ===================================================================== Our
- * usual enqueue function, stop if error because of unbounded queue, should
- * never happen
- *
- */
- protected void enqueueWithoutFailure(ByteString topic, DeliveryManagerRequest request) {
- getDeliveryWorker(topic).enqueueWithoutFailure(request);
- }
-
- /**
- * Tells the delivery manager to start sending out messages for a particular
- * subscription
- *
- * @param topic
- * @param subscriberId
- * @param seqIdToStartFrom
- * Message sequence-id from where delivery should be started
- * @param endPoint
- * The delivery end point to which send messages to
- * @param filter
- * Only messages passing this filter should be sent to this
- * subscriber
- * @param callback
- * Callback instance
- * @param ctx
- * Callback context
- */
- @Override
- public void startServingSubscription(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences,
- MessageSeqId seqIdToStartFrom,
- DeliveryEndPoint endPoint, ServerMessageFilter filter,
- Callback<Void> callback, Object ctx) {
- ActiveSubscriberState subscriber =
- new ActiveSubscriberState(topic, subscriberId,
- preferences,
- seqIdToStartFrom.getLocalComponent() - 1,
- endPoint, filter, callback, ctx);
-
- enqueueWithoutFailure(topic, subscriber);
- }
-
- public void stopServingSubscriber(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event,
- Callback<Void> cb, Object ctx) {
- enqueueWithoutFailure(topic, new StopServingSubscriber(topic, subscriberId, event, cb, ctx));
- }
-
- /**
- * Instructs the delivery manager to backoff on the given subscriber and
- * retry sending after some time
- *
- * @param subscriber
- */
- public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) {
- getDeliveryWorker(subscriber.getTopic()).retryErroredSubscriberAfterDelay(subscriber);
- }
-
- public void clearRetryDelayForSubscriber(ActiveSubscriberState subscriber) {
- getDeliveryWorker(subscriber.getTopic()).clearRetryDelayForSubscriber(subscriber);
- }
-
- // TODO: for now, I don't move messageConsumed request to delivery manager thread,
- // which is supposed to be fixed in {@link https://issues.apache.org/jira/browse/BOOKKEEPER-503}
- @Override
- public void messageConsumed(ByteString topic, ByteString subscriberId,
- MessageSeqId consumedSeqId) {
- ActiveSubscriberState subState =
- subscriberStates.get(new TopicSubscriber(topic, subscriberId));
- if (null == subState) {
- return;
- }
- subState.messageConsumed(consumedSeqId.getLocalComponent());
- }
-
- /**
- * Instructs the delivery manager to move the delivery pointer for a given
- * subscriber
- *
- * @param subscriber
- * @param prevSeqId
- * @param newSeqId
- */
- public void moveDeliveryPtrForward(ActiveSubscriberState subscriber, long prevSeqId, long newSeqId) {
- enqueueWithoutFailure(subscriber.getTopic(),
- new DeliveryPtrMove(subscriber, prevSeqId, newSeqId));
- }
-
- protected void removeDeliveryPtr(ActiveSubscriberState subscriber, Long seqId, boolean isAbsenceOk,
- boolean pruneTopic) {
-
- assert seqId != null;
-
- // remove this subscriber from the delivery pointers data structure
- ByteString topic = subscriber.getTopic();
- SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
-
- if (deliveryPtrs == null && !isAbsenceOk) {
- throw new UnexpectedError("No delivery pointers found while disconnecting " + "channel for topic:" + topic);
- }
-
- if(null == deliveryPtrs) {
- return;
- }
-
- if (!MapMethods.removeFromMultiMap(deliveryPtrs, seqId, subscriber) && !isAbsenceOk) {
-
- throw new UnexpectedError("Could not find subscriber:" + subscriber + " at the expected delivery pointer");
- }
-
- if (pruneTopic && deliveryPtrs.isEmpty()) {
- perTopicDeliveryPtrs.remove(topic);
- }
-
- }
-
- protected long getMinimumSeqId(ByteString topic) {
- SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
-
- if (deliveryPtrs == null || deliveryPtrs.isEmpty()) {
- return Long.MAX_VALUE - 1;
- }
- return deliveryPtrs.firstKey();
- }
-
- protected void addDeliveryPtr(ActiveSubscriberState subscriber, Long seqId) {
-
- // If this topic doesn't exist in the per-topic delivery pointers table,
- // create an entry for it
- SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = MapMethods.getAfterInsertingIfAbsent(
- perTopicDeliveryPtrs, subscriber.getTopic(), TreeMapLongToSetSubscriberFactory.instance);
-
- MapMethods.addToMultiMap(deliveryPtrs, seqId, subscriber, HashMapSubscriberFactory.instance);
- }
-
- public class ActiveSubscriberState
- implements ScanCallback, DeliveryCallback, DeliveryManagerRequest, CancelScanRequest {
-
- static final int UNLIMITED = 0;
-
- ByteString topic;
- ByteString subscriberId;
- long lastLocalSeqIdDelivered;
- boolean connected = true;
- ReentrantReadWriteLock connectedLock = new ReentrantReadWriteLock();
- DeliveryEndPoint deliveryEndPoint;
- long lastScanErrorTime = -1;
- long localSeqIdDeliveringNow;
- long lastSeqIdCommunicatedExternally;
- long lastSeqIdConsumedUtil;
- boolean isThrottled = false;
- final int messageWindowSize;
- ServerMessageFilter filter;
- Callback<Void> cb;
- Object ctx;
-
- // track the outstanding scan request
- // so we could cancel it
- ScanRequest outstandingScanRequest;
-
- final static int SEQ_ID_SLACK = 10;
-
- public ActiveSubscriberState(ByteString topic, ByteString subscriberId,
- SubscriptionPreferences preferences,
- long lastLocalSeqIdDelivered,
- DeliveryEndPoint deliveryEndPoint,
- ServerMessageFilter filter,
- Callback<Void> cb, Object ctx) {
- this.topic = topic;
- this.subscriberId = subscriberId;
- this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
- this.lastSeqIdConsumedUtil = lastLocalSeqIdDelivered;
- this.deliveryEndPoint = deliveryEndPoint;
- this.filter = filter;
- if (preferences.hasMessageWindowSize()) {
- messageWindowSize = preferences.getMessageWindowSize();
- } else {
- if (FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize() > 0) {
- messageWindowSize =
- FIFODeliveryManager.this.cfg.getDefaultMessageWindowSize();
- } else {
- messageWindowSize = UNLIMITED;
- }
- }
- this.cb = cb;
- this.ctx = ctx;
- }
-
- public void setNotConnected(SubscriptionEvent event) {
- this.connectedLock.writeLock().lock();
- try {
- // have closed it.
- if (!connected) {
- return;
- }
- this.connected = false;
- // put itself in ReadAhead queue to cancel outstanding scan request
- // if outstanding scan request callback before cancel op executed,
- // nothing it would cancel.
- if (null != cache && null != outstandingScanRequest) {
- cache.cancelScanRequest(topic, this);
- }
- } finally {
- this.connectedLock.writeLock().unlock();
- }
-
- if (null != event &&
- (SubscriptionEvent.TOPIC_MOVED == event ||
- SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED == event)) {
- // we should not close the channel now after enabling multiplexing
- PubSubResponse response = PubSubResponseUtils.getResponseForSubscriptionEvent(
- topic, subscriberId, event
- );
- deliveryEndPoint.send(response, new DeliveryCallback() {
- @Override
- public void sendingFinished() {
- // do nothing now
- }
- @Override
- public void transientErrorOnSend() {
- // do nothing now
- }
- @Override
- public void permanentErrorOnSend() {
- // if channel is broken, close the channel
- deliveryEndPoint.close();
- }
- });
- }
- // uninitialize filter
- this.filter.uninitialize();
- }
-
- public ByteString getTopic() {
- return topic;
- }
-
- public synchronized long getLastScanErrorTime() {
- return lastScanErrorTime;
- }
-
- public synchronized void setLastScanErrorTime(long lastScanErrorTime) {
- this.lastScanErrorTime = lastScanErrorTime;
- }
-
- /**
- * Clear the last scan error time so it could be retry immediately.
- */
- protected synchronized void clearLastScanErrorTime() {
- this.lastScanErrorTime = -1;
- }
-
- protected boolean isConnected() {
- connectedLock.readLock().lock();
- try {
- return connected;
- } finally {
- connectedLock.readLock().unlock();
- }
- }
-
- protected synchronized void messageConsumed(long newSeqIdConsumed) {
- if (newSeqIdConsumed <= lastSeqIdConsumedUtil) {
- return;
- }
- if (logger.isDebugEnabled()) {
- logger.debug("Subscriber ({}) moved consumed ptr from {} to {}.",
- va(this, lastSeqIdConsumedUtil, newSeqIdConsumed));
- }
- lastSeqIdConsumedUtil = newSeqIdConsumed;
- // after updated seq id check whether it still exceed msg limitation
- if (msgLimitExceeded()) {
- return;
- }
- if (isThrottled) {
- isThrottled = false;
- logger.info("Try to wake up subscriber ({}) to deliver messages again : last delivered {}, last consumed {}.",
- va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
-
- enqueueWithoutFailure(topic, new DeliveryManagerRequest() {
- @Override
- public void performRequest() {
- // enqueue
- clearRetryDelayForSubscriber(ActiveSubscriberState.this);
- }
- });
- }
- }
-
- protected boolean msgLimitExceeded() {
- if (messageWindowSize == UNLIMITED) {
- return false;
- }
- if (lastLocalSeqIdDelivered - lastSeqIdConsumedUtil >= messageWindowSize) {
- return true;
- }
- return false;
- }
-
- public void deliverNextMessage() {
- connectedLock.readLock().lock();
- try {
- doDeliverNextMessage();
- } finally {
- connectedLock.readLock().unlock();
- }
- }
-
- private void doDeliverNextMessage() {
- if (!connected) {
- return;
- }
-
- synchronized (this) {
- // check whether we have delivered enough messages without receiving their consumes
- if (msgLimitExceeded()) {
- logger.info("Subscriber ({}) is throttled : last delivered {}, last consumed {}.",
- va(this, lastLocalSeqIdDelivered, lastSeqIdConsumedUtil));
- isThrottled = true;
- // do nothing, since the delivery process would be throttled.
- // After message consumed, it would be added back to retry queue.
- return;
- }
-
- localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
-
- outstandingScanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
- /* callback= */this, /* ctx= */null);
- }
-
- persistenceMgr.scanSingleMessage(outstandingScanRequest);
- }
-
- /**
- * ===============================================================
- * {@link CancelScanRequest} methods
- *
- * This method runs ins same threads with ScanCallback. When it runs,
- * it checked whether it is outstanding scan request. if there is one,
- * cancel it.
- */
- @Override
- public ScanRequest getScanRequest() {
- // no race between cancel request and scan callback
- // the only race is between stopServing and deliverNextMessage
- // deliverNextMessage would be executed in netty callback which is in netty thread
- // stopServing is run in delivery thread. if stopServing runs before deliverNextMessage
- // deliverNextMessage would have chance to put a stub in ReadAheadCache
- // then we don't have any chance to cancel it.
- // use connectedLock to avoid such race.
- return outstandingScanRequest;
- }
-
- private boolean checkConnected() {
- connectedLock.readLock().lock();
- try {
- // message scanned means the outstanding request is executed
- outstandingScanRequest = null;
- return connected;
- } finally {
- connectedLock.readLock().unlock();
- }
- }
-
- /**
- * ===============================================================
- * {@link ScanCallback} methods
- */
-
- public void messageScanned(Object ctx, Message message) {
- if (!checkConnected()) {
- return;
- }
-
- // only increment topic access times when tried to deliver a message
- // for those subscribers just waiting for a published for a long time
- // we don't increment topic access times, so the topic would be evicted
- // in future.
- tm.incrementTopicAccessTimes(topic);
-
- if (!filter.testMessage(message)) {
- // for filtered out messages, we don't deliver the message to client, so we would not
- // receive its consume request which moves the <i>lastSeqIdConsumedUtil</i> pointer.
- // we move the <i>lastSeqIdConsumedUtil</i> here for filtered out messages, which would
- // avoid a subscriber being throttled due to the message gap introduced by filtering.
- //
- // it is OK to move <i>lastSeqIdConsumedUtil</i> here, since this pointer is subscriber's
- // delivery state which to trottling deliver. changing <i>lastSeqIdConsumedUtil</i> would
- // not affect the subscriber's consume pointer in zookeeper which is managed in subscription
- // manager.
- //
- // And marking message consumed before calling sending finished, would avoid the subscriber
- // being throttled first and released from throttled state laster.
- messageConsumed(message.getMsgId().getLocalComponent());
- sendingFinished();
- return;
- }
-
- /**
- * The method below will invoke our sendingFinished() method when
- * done
- */
- PubSubResponse response = PubSubResponse.newBuilder()
- .setProtocolVersion(ProtocolVersion.VERSION_ONE)
- .setStatusCode(StatusCode.SUCCESS).setTxnId(0)
- .setMessage(message).setTopic(topic)
- .setSubscriberId(subscriberId).build();
-
- deliveryEndPoint.send(response, //
- // callback =
- this);
-
- }
-
- @Override
- public void scanFailed(Object ctx, Exception exception) {
- if (!checkConnected()) {
- return;
- }
-
- // wait for some time and then retry
- retryErroredSubscriberAfterDelay(this);
- }
-
- @Override
- public void scanFinished(Object ctx, ReasonForFinish reason) {
- checkConnected();
- }
-
- /**
- * ===============================================================
- * {@link DeliveryCallback} methods
- */
- @Override
- public void sendingFinished() {
- if (!isConnected()) {
- return;
- }
-
- synchronized (this) {
- lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
-
- if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK) {
- // Note: The order of the next 2 statements is important. We should
- // submit a request to change our delivery pointer only *after* we
- // have actually changed it. Otherwise, there is a race condition
- // with removal of this channel, w.r.t, maintaining the deliveryPtrs
- // tree map.
- long prevId = lastSeqIdCommunicatedExternally;
- lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
- moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
- }
- }
- // increment deliveried message
- ServerStats.getInstance().incrementMessagesDelivered();
- deliverNextMessage();
- }
-
- public synchronized long getLastSeqIdCommunicatedExternally() {
- return lastSeqIdCommunicatedExternally;
- }
-
-
- @Override
- public void permanentErrorOnSend() {
- // the underlying channel is broken, the channel will
- // be closed in UmbrellaHandler when exception happened.
- // so we don't need to close the channel again
- stopServingSubscriber(topic, subscriberId, null,
- NOP_CALLBACK, null);
- }
-
- @Override
- public void transientErrorOnSend() {
- retryErroredSubscriberAfterDelay(this);
- }
-
- /**
- * ===============================================================
- * {@link DeliveryManagerRequest} methods
- */
- @Override
- public void performRequest() {
- // Put this subscriber in the channel to subscriber mapping
- ActiveSubscriberState prevSubscriber =
- subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
-
- // after put the active subscriber in subscriber states mapping
- // trigger the callback to tell it started to deliver the message
- // should let subscriber response go first before first delivered message.
- cb.operationFinished(ctx, (Void)null);
-
- if (prevSubscriber != null) {
- // we already in the delivery thread, we don't need to equeue a stop request
- // just stop it now, since stop is not blocking operation.
- // and also it cleans the old state of the active subscriber immediately.
- SubscriptionEvent se;
- if (deliveryEndPoint.equals(prevSubscriber.deliveryEndPoint)) {
- logger.debug("Subscriber {} replaced a duplicated subscriber {} at same delivery point {}.",
- va(this, prevSubscriber, deliveryEndPoint));
- se = null;
- } else {
- logger.debug("Subscriber {} from delivery point {} forcelly closed delivery point {}.",
- va(this, deliveryEndPoint, prevSubscriber.deliveryEndPoint));
- se = SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED;
- }
- doStopServingSubscriber(prevSubscriber, se);
- }
-
- synchronized (this) {
- lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
- addDeliveryPtr(this, lastLocalSeqIdDelivered);
- }
-
- deliverNextMessage();
- };
-
- @Override
- public String toString() {
- StringBuilder sb = new StringBuilder();
- sb.append("Topic: ");
- sb.append(topic.toStringUtf8());
- sb.append("Subscriber: ");
- sb.append(subscriberId.toStringUtf8());
- sb.append(", DeliveryPtr: ");
- sb.append(lastLocalSeqIdDelivered);
- return sb.toString();
-
- }
- }
-
- protected class StopServingSubscriber implements DeliveryManagerRequest {
- TopicSubscriber ts;
- SubscriptionEvent event;
- final Callback<Void> cb;
- final Object ctx;
-
- public StopServingSubscriber(ByteString topic, ByteString subscriberId,
- SubscriptionEvent event,
- Callback<Void> callback, Object ctx) {
- this.ts = new TopicSubscriber(topic, subscriberId);
- this.event = event;
- this.cb = callback;
- this.ctx = ctx;
- }
-
- @Override
- public void performRequest() {
- ActiveSubscriberState subscriber = subscriberStates.remove(ts);
- if (null != subscriber) {
- doStopServingSubscriber(subscriber, event);
- }
- cb.operationFinished(ctx, null);
- }
-
- }
-
- /**
- * Stop serving a subscriber. This method should be called in a
- * {@link DeliveryManagerRequest}.
- *
- * @param subscriber
- * Active Subscriber to stop
- * @param event
- * Subscription Event for the stop reason
- */
- private void doStopServingSubscriber(ActiveSubscriberState subscriber, SubscriptionEvent event) {
- // This will automatically stop delivery, and disconnect the channel
- subscriber.setNotConnected(event);
-
- // if the subscriber has moved on, a move request for its delivery
- // pointer must be pending in the request queue. Note that the
- // subscriber first changes its delivery pointer and then submits a
- // request to move so this works.
- removeDeliveryPtr(subscriber, subscriber.getLastSeqIdCommunicatedExternally(), //
- // isAbsenceOk=
- true,
- // pruneTopic=
- true);
- }
-
- protected class DeliveryPtrMove implements DeliveryManagerRequest {
-
- ActiveSubscriberState subscriber;
- Long oldSeqId;
- Long newSeqId;
-
- public DeliveryPtrMove(ActiveSubscriberState subscriber, Long oldSeqId, Long newSeqId) {
- this.subscriber = subscriber;
- this.oldSeqId = oldSeqId;
- this.newSeqId = newSeqId;
- }
-
- @Override
- public void performRequest() {
- ByteString topic = subscriber.getTopic();
- long prevMinSeqId = getMinimumSeqId(topic);
-
- if (subscriber.isConnected()) {
- removeDeliveryPtr(subscriber, oldSeqId, //
- // isAbsenceOk=
- false,
- // pruneTopic=
- false);
-
- addDeliveryPtr(subscriber, newSeqId);
- } else {
- removeDeliveryPtr(subscriber, oldSeqId, //
- // isAbsenceOk=
- true,
- // pruneTopic=
- true);
- }
-
- long nowMinSeqId = getMinimumSeqId(topic);
-
- if (nowMinSeqId > prevMinSeqId) {
- persistenceMgr.deliveredUntil(topic, nowMinSeqId);
- }
- }
- }
-
- /**
- * ====================================================================
- *
- * Dumb factories for our map methods
- */
- protected static class TreeMapLongToSetSubscriberFactory implements
- Factory<SortedMap<Long, Set<ActiveSubscriberState>>> {
- static TreeMapLongToSetSubscriberFactory instance = new TreeMapLongToSetSubscriberFactory();
-
- @Override
- public SortedMap<Long, Set<ActiveSubscriberState>> newInstance() {
- return new TreeMap<Long, Set<ActiveSubscriberState>>();
- }
- }
-
- protected static class HashMapSubscriberFactory implements Factory<Set<ActiveSubscriberState>> {
- static HashMapSubscriberFactory instance = new HashMapSubscriberFactory();
-
- @Override
- public Set<ActiveSubscriberState> newInstance() {
- return new HashSet<ActiveSubscriberState>();
- }
- }
-
- @Override
- public void onSubChannelDisconnected(TopicSubscriber topicSubscriber) {
- stopServingSubscriber(topicSubscriber.getTopic(), topicSubscriber.getSubscriberId(),
- null, NOP_CALLBACK, null);
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
deleted file mode 100644
index 4189eb6..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
+++ /dev/null
@@ -1,67 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-import org.apache.hedwig.util.HedwigSocketAddress;
-
-public abstract class BaseHandler implements Handler {
-
- protected TopicManager topicMgr;
- protected ServerConfiguration cfg;
-
- protected BaseHandler(TopicManager tm, ServerConfiguration cfg) {
- this.topicMgr = tm;
- this.cfg = cfg;
- }
-
-
- public void handleRequest(final PubSubRequest request, final Channel channel) {
- topicMgr.getOwner(request.getTopic(), request.getShouldClaim(),
- new Callback<HedwigSocketAddress>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- ServerStats.getInstance().getOpStats(request.getType()).incrementFailedOps();
- }
-
- @Override
- public void operationFinished(Object ctx, HedwigSocketAddress owner) {
- if (!owner.equals(cfg.getServerAddr())) {
- channel.write(PubSubResponseUtils.getResponseForException(
- new ServerNotResponsibleForTopicException(owner.toString()), request.getTxnId()));
- ServerStats.getInstance().incrementRequestsRedirect();
- return;
- }
- handleRequestAtOwner(request, channel);
- }
- }, null);
- }
-
- public abstract void handleRequestAtOwner(PubSubRequest request, Channel channel);
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
deleted file mode 100644
index 458d301..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
+++ /dev/null
@@ -1,29 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-public interface ChannelDisconnectListener {
-
- /**
- * Act on a particular channel being disconnected
- * @param channel
- */
- public void channelDisconnected(Channel channel);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
deleted file mode 100644
index a6ccb7e..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/CloseSubscriptionHandler.java
+++ /dev/null
@@ -1,105 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.CloseSubscriptionRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class CloseSubscriptionHandler extends BaseHandler {
- SubscriptionManager subMgr;
- DeliveryManager deliveryMgr;
- SubscriptionChannelManager subChannelMgr;
- // op stats
- final OpStats closesubStats;
-
- public CloseSubscriptionHandler(ServerConfiguration cfg, TopicManager tm,
- SubscriptionManager subMgr,
- DeliveryManager deliveryMgr,
- SubscriptionChannelManager subChannelMgr) {
- super(tm, cfg);
- this.subMgr = subMgr;
- this.deliveryMgr = deliveryMgr;
- this.subChannelMgr = subChannelMgr;
- closesubStats = ServerStats.getInstance().getOpStats(OperationType.CLOSESUBSCRIPTION);
- }
-
- @Override
- public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
- if (!request.hasCloseSubscriptionRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing closesubscription request data");
- closesubStats.incrementFailedOps();
- return;
- }
-
- final CloseSubscriptionRequest closesubRequest =
- request.getCloseSubscriptionRequest();
- final ByteString topic = request.getTopic();
- final ByteString subscriberId = closesubRequest.getSubscriberId();
-
- final long requestTime = System.currentTimeMillis();
-
- subMgr.closeSubscription(topic, subscriberId, new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- // we should not close the channel in delivery manager
- // since client waits the response for closeSubscription request
- // client side would close the channel
- deliveryMgr.stopServingSubscriber(topic, subscriberId, null,
- new Callback<Void>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- closesubStats.incrementFailedOps();
- }
- @Override
- public void operationFinished(Object ctx, Void resultOfOperation) {
- // remove the topic subscription from subscription channels
- subChannelMgr.remove(new TopicSubscriber(topic, subscriberId),
- channel);
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
- closesubStats.updateLatency(System.currentTimeMillis() - requestTime);
- }
- }, null);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- closesubStats.incrementFailedOps();
- }
- }, null);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
deleted file mode 100644
index 5042a37..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
+++ /dev/null
@@ -1,72 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class ConsumeHandler extends BaseHandler {
-
- SubscriptionManager sm;
- Callback<Void> noopCallback = new NoopCallback<Void>();
- final OpStats consumeStats = ServerStats.getInstance().getOpStats(OperationType.CONSUME);
-
- class NoopCallback<T> implements Callback<T> {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- consumeStats.incrementFailedOps();
- }
-
- public void operationFinished(Object ctx, T resultOfOperation) {
- // we don't collect consume process time
- consumeStats.updateLatency(0);
- };
- }
-
- @Override
- public void handleRequestAtOwner(PubSubRequest request, Channel channel) {
- if (!request.hasConsumeRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing consume request data");
- consumeStats.incrementFailedOps();
- return;
- }
-
- ConsumeRequest consumeRequest = request.getConsumeRequest();
-
- sm.setConsumeSeqIdForSubscriber(request.getTopic(), consumeRequest.getSubscriberId(),
- consumeRequest.getMsgId(), noopCallback, null);
-
- }
-
- public ConsumeHandler(TopicManager tm, SubscriptionManager sm, ServerConfiguration cfg) {
- super(tm, cfg);
- this.sm = sm;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
deleted file mode 100644
index c391f5c..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
+++ /dev/null
@@ -1,37 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.jboss.netty.channel.Channel;
-
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-
-public interface Handler {
-
- /**
- * Handle a request synchronously or asynchronously. After handling the
- * request, the appropriate response should be written on the given channel
- *
- * @param request
- * The request to handle
- *
- * @param channel
- * The channel on which to write the response
- */
- public void handleRequest(final PubSubRequest request, final Channel channel);
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
deleted file mode 100644
index e0f1487..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerBean.java
+++ /dev/null
@@ -1,47 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.handlers;
-
-import org.apache.hedwig.server.handlers.SubscriptionChannelManager;
-import org.apache.hedwig.server.jmx.HedwigMBeanInfo;
-
-public class NettyHandlerBean implements NettyHandlerMXBean, HedwigMBeanInfo {
-
- SubscriptionChannelManager subChannelMgr;
-
- public NettyHandlerBean(SubscriptionChannelManager subChannelMgr) {
- this.subChannelMgr = subChannelMgr;
- }
-
- @Override
- public String getName() {
- return "NettyHandlers";
- }
-
- @Override
- public boolean isHidden() {
- return false;
- }
-
- @Override
- public int getNumSubscriptionChannels() {
- return subChannelMgr.getNumSubscriptionChannels();
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
deleted file mode 100644
index ab8af29..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/NettyHandlerMXBean.java
+++ /dev/null
@@ -1,31 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.server.handlers;
-
-/**
- * Netty Handler MBean
- */
-public interface NettyHandlerMXBean {
-
- /**
- * @return number of subscription channels
- */
- public int getNumSubscriptionChannels();
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
deleted file mode 100644
index 587f904..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
+++ /dev/null
@@ -1,90 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.jboss.netty.channel.Channel;
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.protocol.PubSubProtocol.Message;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.persistence.PersistRequest;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class PublishHandler extends BaseHandler {
-
- private PersistenceManager persistenceMgr;
- private final OpStats pubStats;
-
- public PublishHandler(TopicManager topicMgr, PersistenceManager persistenceMgr, ServerConfiguration cfg) {
- super(topicMgr, cfg);
- this.persistenceMgr = persistenceMgr;
- this.pubStats = ServerStats.getInstance().getOpStats(OperationType.PUBLISH);
- }
-
- @Override
- public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
- if (!request.hasPublishRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing publish request data");
- pubStats.incrementFailedOps();
- return;
- }
-
- Message msgToSerialize = Message.newBuilder(request.getPublishRequest().getMsg()).setSrcRegion(
- cfg.getMyRegionByteString()).build();
-
- final long requestTime = MathUtils.now();
- PersistRequest persistRequest = new PersistRequest(request.getTopic(), msgToSerialize,
- new Callback<PubSubProtocol.MessageSeqId>() {
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
- pubStats.incrementFailedOps();
- }
-
- @Override
- public void operationFinished(Object ctx, PubSubProtocol.MessageSeqId resultOfOperation) {
- channel.write(getSuccessResponse(request.getTxnId(), resultOfOperation));
- pubStats.updateLatency(MathUtils.now() - requestTime);
- }
- }, null);
-
- persistenceMgr.persistMessage(persistRequest);
- }
-
- private static PubSubProtocol.PubSubResponse getSuccessResponse(long txnId, PubSubProtocol.MessageSeqId publishedMessageSeqId) {
- if (null == publishedMessageSeqId) {
- return PubSubResponseUtils.getSuccessResponse(txnId);
- }
- PubSubProtocol.PublishResponse publishResponse = PubSubProtocol.PublishResponse.newBuilder().setPublishedMsgId(publishedMessageSeqId).build();
- PubSubProtocol.ResponseBody responseBody = PubSubProtocol.ResponseBody.newBuilder().setPublishResponse(publishResponse).build();
- return PubSubProtocol.PubSubResponse.newBuilder().
- setProtocolVersion(PubSubResponseUtils.serverVersion).
- setStatusCode(PubSubProtocol.StatusCode.SUCCESS).setTxnId(txnId).
- setResponseBody(responseBody).build();
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
deleted file mode 100644
index 6df70a3..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
+++ /dev/null
@@ -1,221 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import com.google.protobuf.ByteString;
-
-import org.apache.bookkeeper.util.MathUtils;
-import org.apache.bookkeeper.util.ReflectionUtils;
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
-import org.apache.hedwig.filter.PipelineFilter;
-import org.apache.hedwig.filter.ServerMessageFilter;
-import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
-import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.ResponseBody;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionData;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
-import org.apache.hedwig.server.common.ServerConfiguration;
-import org.apache.hedwig.server.delivery.ChannelEndPoint;
-import org.apache.hedwig.server.delivery.DeliveryManager;
-import org.apache.hedwig.server.netty.ServerStats;
-import org.apache.hedwig.server.netty.ServerStats.OpStats;
-import org.apache.hedwig.server.netty.UmbrellaHandler;
-import org.apache.hedwig.server.persistence.PersistenceManager;
-import org.apache.hedwig.server.subscriptions.SubscriptionManager;
-import org.apache.hedwig.server.subscriptions.AllToAllTopologyFilter;
-import org.apache.hedwig.server.topics.TopicManager;
-import org.apache.hedwig.util.Callback;
-
-public class SubscribeHandler extends BaseHandler {
- private static final Logger logger = LoggerFactory.getLogger(SubscribeHandler.class);
-
- private final DeliveryManager deliveryMgr;
- private final PersistenceManager persistenceMgr;
- private final SubscriptionManager subMgr;
- private final SubscriptionChannelManager subChannelMgr;
-
- // op stats
- private final OpStats subStats;
-
- public SubscribeHandler(ServerConfiguration cfg, TopicManager topicMgr,
- DeliveryManager deliveryManager,
- PersistenceManager persistenceMgr,
- SubscriptionManager subMgr,
- SubscriptionChannelManager subChannelMgr) {
- super(topicMgr, cfg);
- this.deliveryMgr = deliveryManager;
- this.persistenceMgr = persistenceMgr;
- this.subMgr = subMgr;
- this.subChannelMgr = subChannelMgr;
- subStats = ServerStats.getInstance().getOpStats(OperationType.SUBSCRIBE);
- }
-
- @Override
- public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
-
- if (!request.hasSubscribeRequest()) {
- UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
- "Missing subscribe request data");
- subStats.incrementFailedOps();
- return;
- }
-
- final ByteString topic = request.getTopic();
-
- MessageSeqId seqId;
- try {
- seqId = persistenceMgr.getCurrentSeqIdForTopic(topic);
- } catch (ServerNotResponsibleForTopicException e) {
- channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener(
- ChannelFutureListener.CLOSE);
- logger.error("Error getting current seq id for topic " + topic.toStringUtf8()
- + " when processing subscribe request (txnid:" + request.getTxnId() + ") :", e);
- subStats.incrementFailedOps();
- ServerStats.getInstance().incrementRequestsRedirect();
- return;
- }
-
- final SubscribeRequest subRequest = request.getSubscribeRequest();
- final ByteString subscriberId = subRequest.getSubscriberId();
-
- MessageSeqId lastSeqIdPublished = MessageSeqId.newBuilder(seqId).setLocalComponent(seqId.getLocalComponent()).build();
-
- final long requestTime = MathUtils.now();
- subMgr.serveSubscribeRequest(topic, subRequest, lastSeqIdPublished, new Callback<SubscriptionData>() {
-
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener(
- ChannelFutureListener.CLOSE);
- logger.error("Error serving subscribe request (" + request.getTxnId() + ") for (topic: "
- + topic.toStringUtf8() + " , subscriber: " + subscriberId.toStringUtf8() + ")", exception);
- subStats.incrementFailedOps();
- }
-
- @Override
- public void operationFinished(Object ctx, final SubscriptionData subData) {
-
- TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
- synchronized (channel) {
- if (!channel.isConnected()) {
- // channel got disconnected while we were processing the
- // subscribe request,
- // nothing much we can do in this case
- subStats.incrementFailedOps();
- return;
- }
- }
- // initialize the message filter
- PipelineFilter filter = new PipelineFilter();
- try {
- // the filter pipeline should be
- // 1) AllToAllTopologyFilter to filter cross-region messages
- filter.addLast(new AllToAllTopologyFilter());
- // 2) User-Customized MessageFilter
- if (subData.hasPreferences() &&
- subData.getPreferences().hasMessageFilter()) {
- String messageFilterName = subData.getPreferences().getMessageFilter();
- filter.addLast(ReflectionUtils.newInstance(messageFilterName, ServerMessageFilter.class));
- }
- // initialize the filter
- filter.initialize(cfg.getConf());
- filter.setSubscriptionPreferences(topic, subscriberId,
- subData.getPreferences());
- } catch (RuntimeException re) {
- String errMsg = "RuntimeException caught when instantiating message filter for (topic:"
- + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8() + ")."
- + "It might be introduced by programming error in message filter.";
- logger.error(errMsg, re);
- PubSubException pse = new PubSubException.InvalidMessageFilterException(errMsg, re);
- subStats.incrementFailedOps();
- // we should not close the subscription channel, just response error
- // client decide to close it or not.
- channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()));
- return;
- } catch (Throwable t) {
- String errMsg = "Failed to instantiate message filter for (topic:" + topic.toStringUtf8()
- + ", subscriber:" + subscriberId.toStringUtf8() + ").";
- logger.error(errMsg, t);
- PubSubException pse = new PubSubException.InvalidMessageFilterException(errMsg, t);
- subStats.incrementFailedOps();
- channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
- .addListener(ChannelFutureListener.CLOSE);
- return;
- }
- boolean forceAttach = false;
- if (subRequest.hasForceAttach()) {
- forceAttach = subRequest.getForceAttach();
- }
- // Try to store the subscription channel for the topic subscriber
- Channel oldChannel = subChannelMgr.put(topicSub, channel, forceAttach);
- if (null != oldChannel) {
- PubSubException pse = new PubSubException.TopicBusyException(
- "Subscriber " + subscriberId.toStringUtf8() + " for topic " + topic.toStringUtf8()
- + " is already being served on a different channel " + oldChannel + ".");
- subStats.incrementFailedOps();
- channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
- .addListener(ChannelFutureListener.CLOSE);
- return;
- }
-
- // want to start 1 ahead of the consume ptr
- MessageSeqId lastConsumedSeqId = subData.getState().getMsgId();
- MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(lastConsumedSeqId).setLocalComponent(
- lastConsumedSeqId.getLocalComponent() + 1).build();
- deliveryMgr.startServingSubscription(topic, subscriberId,
- subData.getPreferences(), seqIdToStartFrom, new ChannelEndPoint(channel), filter,
- new Callback<Void>() {
- @Override
- public void operationFinished(Object ctx, Void result) {
- // First write success and then tell the delivery manager,
- // otherwise the first message might go out before the response
- // to the subscribe
- SubscribeResponse.Builder subRespBuilder = SubscribeResponse.newBuilder()
- .setPreferences(subData.getPreferences());
- ResponseBody respBody = ResponseBody.newBuilder()
- .setSubscribeResponse(subRespBuilder).build();
- channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId(), respBody));
- logger.info("Subscribe request (" + request.getTxnId() + ") for (topic:"
- + topic.toStringUtf8() + ", subscriber:" + subscriberId.toStringUtf8()
- + ") from channel " + channel.getRemoteAddress()
- + " succeed - its subscription data is "
- + SubscriptionStateUtils.toString(subData));
- subStats.updateLatency(MathUtils.now() - requestTime);
- }
- @Override
- public void operationFailed(Object ctx, PubSubException exception) {
- // would not happened
- }
- }, null);
- }
- }, null);
-
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
----------------------------------------------------------------------
diff --git a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java b/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
deleted file mode 100644
index 3481d81..0000000
--- a/hedwig-server/src/main/java/org/apache/hedwig/server/handlers/SubscriptionChannelManager.java
+++ /dev/null
@@ -1,213 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.server.handlers;
-
-import java.util.ArrayList;
-import java.util.HashSet;
-import java.util.List;
-import java.util.Set;
-import java.util.concurrent.ConcurrentHashMap;
-
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-import org.jboss.netty.channel.Channel;
-import org.jboss.netty.channel.ChannelFuture;
-import org.jboss.netty.channel.ChannelFutureListener;
-
-import org.apache.hedwig.client.data.TopicSubscriber;
-import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionEvent;
-import org.apache.hedwig.protoextensions.PubSubResponseUtils;
-import static org.apache.hedwig.util.VarArgs.va;
-
-public class SubscriptionChannelManager implements ChannelDisconnectListener {
-
- private static final Logger logger = LoggerFactory.getLogger(SubscriptionChannelManager.class);
-
- static class CloseSubscriptionListener implements ChannelFutureListener {
-
- final TopicSubscriber ts;
-
- CloseSubscriptionListener(TopicSubscriber topicSubscriber) {
- this.ts = topicSubscriber;
- }
-
- @Override
- public void operationComplete(ChannelFuture future) throws Exception {
- if (!future.isSuccess()) {
- logger.warn("Failed to write response to close old subscription {}.", ts);
- } else {
- logger.debug("Close old subscription {} succeed.", ts);
- }
- }
- };
-
- final List<SubChannelDisconnectedListener> listeners;
-
- public interface SubChannelDisconnectedListener {
- /**
- * Act on a particular topicSubscriber being disconnected
- * @param topicSubscriber
- */
- public void onSubChannelDisconnected(TopicSubscriber topicSubscriber);
- }
-
- final ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
- final ConcurrentHashMap<Channel, Set<TopicSubscriber>> channel2sub;
-
- public SubscriptionChannelManager() {
- sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
- channel2sub = new ConcurrentHashMap<Channel, Set<TopicSubscriber>>();
- listeners = new ArrayList<SubChannelDisconnectedListener>();
- }
-
- public void addSubChannelDisconnectedListener(SubChannelDisconnectedListener listener) {
- if (null != listener) {
- listeners.add(listener);
- }
- }
-
- @Override
- public void channelDisconnected(Channel channel) {
- // Evils of synchronized programming: there is a race between a channel
- // getting disconnected, and us adding it to the maps when a subscribe
- // succeeds
- Set<TopicSubscriber> topicSubs;
- synchronized (channel) {
- topicSubs = channel2sub.remove(channel);
- }
- if (topicSubs != null) {
- for (TopicSubscriber topicSub : topicSubs) {
- logger.info("Subscription channel {} for {} is disconnected.",
- va(channel.getRemoteAddress(), topicSub));
- // remove entry only currently mapped to given value.
- sub2Channel.remove(topicSub, channel);
- for (SubChannelDisconnectedListener listener : listeners) {
- listener.onSubChannelDisconnected(topicSub);
- }
- }
- }
- }
-
- public int getNumSubscriptionChannels() {
- return channel2sub.size();
- }
-
- public int getNumSubscriptions() {
- return sub2Channel.size();
- }
-
- /**
- * Put <code>topicSub</code> on Channel <code>channel</code>.
- *
- * @param topicSub
- * Topic Subscription
- * @param channel
- * Netty channel
- * @param mode
- * Create or Attach mode
- * @return null succeed, otherwise the old existed channel.
- */
- public Channel put(TopicSubscriber topicSub, Channel channel, boolean forceAttach) {
- // race with channel getting disconnected while we are adding it
- // to the 2 maps
- synchronized (channel) {
- Channel oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
- // if a subscribe request send from same channel,
- // we treated it a success action.
- if (null != oldChannel && !oldChannel.equals(channel)) {
- boolean subSuccess = false;
- if (forceAttach) {
- // it is safe to close old subscription here since the new subscription
- // has come from other channel succeed.
- synchronized (oldChannel) {
- Set<TopicSubscriber> oldTopicSubs = channel2sub.get(oldChannel);
- if (null != oldTopicSubs) {
- if (!oldTopicSubs.remove(topicSub)) {
- logger.warn("Failed to remove old subscription ({}) due to it isn't on channel ({}).",
- va(topicSub, oldChannel));
- } else if (oldTopicSubs.isEmpty()) {
- channel2sub.remove(oldChannel);
- }
- }
- }
- PubSubResponse resp = PubSubResponseUtils.getResponseForSubscriptionEvent(
- topicSub.getTopic(), topicSub.getSubscriberId(),
- SubscriptionEvent.SUBSCRIPTION_FORCED_CLOSED
- );
- oldChannel.write(resp).addListener(new CloseSubscriptionListener(topicSub));
- logger.info("Subscribe request for ({}) from channel ({}) closes old subscripiton on channel ({}).",
- va(topicSub, channel, oldChannel));
- // try replace the oldChannel
- // if replace failure, it migth caused because channelDisconnect callback
- // has removed the old channel.
- if (!sub2Channel.replace(topicSub, oldChannel, channel)) {
- // try to add it now.
- // if add failure, it means other one has obtained the channel
- oldChannel = sub2Channel.putIfAbsent(topicSub, channel);
- if (null == oldChannel) {
- subSuccess = true;
- }
- } else {
- subSuccess = true;
- }
- }
- if (!subSuccess) {
- logger.error("Error serving subscribe request for ({}) from ({}) since it already served on ({}).",
- va(topicSub, channel, oldChannel));
- return oldChannel;
- }
- }
- // channel2sub is just a cache, so we can add to it
- // without synchronization
- Set<TopicSubscriber> topicSubs = channel2sub.get(channel);
- if (null == topicSubs) {
- topicSubs = new HashSet<TopicSubscriber>();
- channel2sub.put(channel, topicSubs);
- }
- topicSubs.add(topicSub);
- return null;
- }
- }
-
- /**
- * Remove <code>topicSub</code> from Channel <code>channel</code>
- *
- * @param topicSub
- * Topic Subscription
- * @param channel
- * Netty channel
- */
- public void remove(TopicSubscriber topicSub, Channel channel) {
- synchronized (channel) {
- Set<TopicSubscriber> topicSubs = channel2sub.get(channel);
- if (null != topicSubs) {
- if (!topicSubs.remove(topicSub)) {
- logger.warn("Failed to remove subscription ({}) due to it isn't on channel ({}).",
- va(topicSub, channel));
- } else if (topicSubs.isEmpty()) {
- channel2sub.remove(channel);
- }
- }
- if (!sub2Channel.remove(topicSub, channel)) {
- logger.warn("Failed to remove channel ({}) due to it isn't ({})'s channel.",
- va(channel, topicSub));
- }
- }
- }
-}