You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC

svn commit: r987314 [12/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cp...

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,703 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.util.Callback;
+
+public class ReadAheadCache implements PersistenceManager, Runnable {
+
+    static Logger logger = Logger.getLogger(ReadAheadCache.class);
+
+    protected interface CacheRequest {
+        public void performRequest();
+    }
+
+    /**
+     * The underlying persistence manager that will be used for persistence and
+     * scanning below the cache
+     */
+    protected PersistenceManagerWithRangeScan realPersistenceManager;
+
+    /**
+     * The structure for the cache
+     */
+    protected Map<CacheKey, CacheValue> cache = new HashMap<CacheKey, CacheValue>();
+
+    /**
+     * To simplify synchronization, the cache will be maintained by a single
+     * cache maintainer thread. This is the queue that will hold requests that
+     * need to be served by this thread
+     */
+    protected BlockingQueue<CacheRequest> requestQueue = new LinkedBlockingQueue<CacheRequest>();
+
+    /**
+     * We want to keep track of when entries were added in the cache, so that we
+     * can remove them in a FIFO fashion
+     */
+    protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new TreeMap<Long, Set<CacheKey>>();
+
+    /**
+     * We also want to track the entries in seq-id order so that we can clean up
+     * entries after the last subscriber
+     */
+    protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId = new HashMap<ByteString, SortedSet<Long>>();
+
+    /**
+     * We maintain an estimate of the current size of the cache, so that we know
+     * when to evict entries.
+     */
+    protected long presentCacheSize = 0;
+
+    /**
+     * One instance of a callback that we will pass to the underlying
+     * persistence manager when asking it to persist messages
+     */
+    protected PersistCallback persistCallbackInstance = new PersistCallback();
+
+    /**
+     * 2 kinds of exceptions that we will use to signal error from readahead
+     */
+    protected NoSuchSeqIdException noSuchSeqIdExceptionInstance = new NoSuchSeqIdException();
+    protected ReadAheadException readAheadExceptionInstance = new ReadAheadException();
+
+    protected ServerConfiguration cfg;
+    protected Thread cacheThread;
+    // Boolean indicating if this thread should continue running. This is used
+    // when we want to stop the thread during a PubSubServer shutdown.
+    protected boolean keepRunning = true;
+
+    /**
+     * Constructor. Starts the cache maintainer thread
+     * 
+     * @param realPersistenceManager
+     */
+    public ReadAheadCache(PersistenceManagerWithRangeScan realPersistenceManager, ServerConfiguration cfg) {
+        this.realPersistenceManager = realPersistenceManager;
+        this.cfg = cfg;
+        cacheThread = new Thread(this, "CacheThread");
+    }
+
+    public ReadAheadCache start() {
+        cacheThread.start();
+        return this;
+    }
+
+    /**
+     * ========================================================================
+     * Methods of {@link PersistenceManager} that we will pass straight down to
+     * the real persistence manager.
+     */
+
+    public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
+        return realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, skipAmount);
+    }
+
+    public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
+        return realPersistenceManager.getCurrentSeqIdForTopic(topic);
+    }
+
+    /**
+     * ========================================================================
+     * Other methods of {@link PersistenceManager} that the cache needs to take
+     * some action on.
+     * 
+     * 1. Persist: We pass it through to the real persistence manager but insert
+     * our callback on the return path
+     * 
+     */
+    public void persistMessage(PersistRequest request) {
+        // make a new PersistRequest object so that we can insert our own
+        // callback in the middle. Assign the original request as the context
+        // for the callback.
+
+        PersistRequest newRequest = new PersistRequest(request.getTopic(), request.getMessage(),
+                persistCallbackInstance, request);
+        realPersistenceManager.persistMessage(newRequest);
+    }
+
+    /**
+     * The callback that we insert on the persist request return path. The
+     * callback simply forms a {@link PersistResponse} object and inserts it in
+     * the request queue to be handled serially by the cache maintainer thread.
+     * 
+     */
+    public class PersistCallback implements Callback<Long> {
+
+        /**
+         * In case there is a failure in persisting, just pass it to the
+         * original callback
+         */
+        public void operationFailed(Object ctx, PubSubException exception) {
+            PersistRequest originalRequest = (PersistRequest) ctx;
+            Callback<Long> originalCallback = originalRequest.getCallback();
+            Object originalContext = originalRequest.getCtx();
+            originalCallback.operationFailed(originalContext, exception);
+        }
+
+        /**
+         * When the persist finishes, we first notify the original callback of
+         * success, and then opportunistically treat the message as if it just
+         * came in through a scan
+         */
+        public void operationFinished(Object ctx, Long resultOfOperation) {
+            PersistRequest originalRequest = (PersistRequest) ctx;
+
+            // Lets call the original callback first so that the publisher can
+            // hear success
+            originalRequest.getCallback().operationFinished(originalRequest.getCtx(), resultOfOperation);
+
+            // Original message that was persisted didn't have the local seq-id.
+            // Lets add that in
+            Message messageWithLocalSeqId = MessageIdUtils.mergeLocalSeqId(originalRequest.getMessage(),
+                    resultOfOperation);
+
+            // Now enqueue a request to add this newly persisted message to our
+            // cache
+            CacheKey cacheKey = new CacheKey(originalRequest.getTopic(), resultOfOperation);
+
+            enqueueWithoutFailure(new ScanResponse(cacheKey, messageWithLocalSeqId));
+        }
+
+    }
+
+    /**
+     * Too complicated to deal with enqueue failures from the context of our
+     * callbacks. Its just simpler to quit and restart afresh. Moreover, this
+     * should not happen as the request queue for the cache maintainer is
+     * unbounded.
+     * 
+     * @param obj
+     */
+    protected void enqueueWithoutFailure(CacheRequest obj) {
+        if (!requestQueue.offer(obj)) {
+            throw new UnexpectedError("Could not enqueue object: " + obj.toString()
+                    + " to cache request queue. Exiting.");
+
+        }
+    }
+
+    /**
+     * Another method from {@link PersistenceManager}.
+     * 
+     * 2. Scan - Since the scan needs to touch the cache, we will just enqueue
+     * the scan request and let the cache maintainer thread handle it.
+     */
+    public void scanSingleMessage(ScanRequest request) {
+        // Let the scan requests be serialized through the queue
+        enqueueWithoutFailure(new ScanRequestWrapper(request));
+    }
+
+    /**
+     * Another method from {@link PersistenceManager}.
+     * 
+     * 3. Enqueue the request so that the cache maintainer thread can delete all
+     * message-ids older than the one specified
+     */
+    public void deliveredUntil(ByteString topic, Long seqId) {
+        enqueueWithoutFailure(new DeliveredUntil(topic, seqId));
+    }
+
+    /**
+     * Another method from {@link PersistenceManager}.
+     * 
+     * Since this is a cache layer on top of an underlying persistence manager,
+     * we can just call the consumedUntil method there. The messages older than
+     * the latest one passed here won't be accessed anymore so they should just
+     * get aged out of the cache eventually. For now, there is no need to
+     * proactively remove those entries from the cache.
+     */
+    public void consumedUntil(ByteString topic, Long seqId) {
+        realPersistenceManager.consumedUntil(topic, seqId);
+    }
+
+    /**
+     * ========================================================================
+     * BEGINNING OF CODE FOR THE CACHE MAINTAINER THREAD
+     * 
+     * 1. The run method. It simply dequeues from the request queue, checks the
+     * type of object and acts accordingly
+     */
+    public void run() {
+        while (keepRunning) {
+            CacheRequest obj;
+            try {
+                obj = requestQueue.take();
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+                return;
+            }
+            obj.performRequest();
+        }
+
+    }
+
+    /**
+     * Stop method which will enqueue a ShutdownCacheRequest.
+     */
+    public void stop() {
+        enqueueWithoutFailure(new ShutdownCacheRequest());
+    }
+
+    /**
+     * The readahead policy is simple: We check if an entry already exists for
+     * the message being requested. If an entry exists, it means that either
+     * that message is already in the cache, or a read for that message is
+     * outstanding. In that case, we look a little ahead (by readAheadCount/2)
+     * and issue a range read of readAheadCount/2 messages. The idea is to
+     * ensure that the next readAheadCount messages are always available.
+     * 
+     * @return the range scan that should be issued for read ahead
+     */
+    protected RangeScanRequest doReadAhead(ScanRequest request) {
+        ByteString topic = request.getTopic();
+        Long seqId = request.getStartSeqId();
+
+        int readAheadCount = cfg.getReadAheadCount();
+        // To prevent us from getting screwed by bad configuration
+        readAheadCount = Math.max(1, readAheadCount);
+
+        RangeScanRequest readAheadRequest = doReadAheadStartingFrom(topic, seqId, readAheadCount);
+
+        if (readAheadRequest != null) {
+            return readAheadRequest;
+        }
+
+        // start key was already there in the cache so no readahead happened,
+        // lets look a little beyond
+        seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, readAheadCount / 2);
+
+        readAheadRequest = doReadAheadStartingFrom(topic, seqId, readAheadCount / 2);
+
+        return readAheadRequest;
+    }
+
+    /**
+     * This method just checks if the provided seq-id already exists in the
+     * cache. If not, a range read of the specified amount is issued.
+     * 
+     * @param topic
+     * @param seqId
+     * @param readAheadCount
+     * @return The range read that should be issued
+     */
+    protected RangeScanRequest doReadAheadStartingFrom(ByteString topic, long seqId, int readAheadCount) {
+
+        long startSeqId = seqId;
+        Queue<CacheKey> installedStubs = new LinkedList<CacheKey>();
+
+        int i = 0;
+
+        for (; i < readAheadCount; i++) {
+            CacheKey cacheKey = new CacheKey(topic, seqId);
+
+            // Even if a stub exists, it means that a scan for that is
+            // outstanding
+            if (cache.containsKey(cacheKey)) {
+                break;
+            }
+            CacheValue cacheValue = new CacheValue();
+            cache.put(cacheKey, cacheValue);
+
+            if (logger.isDebugEnabled()) {
+                logger.debug("Adding stub for seq-id: " + seqId + " topic: " + topic.toStringUtf8());
+            }
+            installedStubs.add(cacheKey);
+
+            seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, 1);
+        }
+
+        // so how many did we decide to readahead
+        if (i == 0) {
+            // no readahead, hence return false
+            return null;
+        }
+
+        long readAheadSizeLimit = cfg.getReadAheadSizeBytes();
+        ReadAheadScanCallback callback = new ReadAheadScanCallback(installedStubs, topic);
+        RangeScanRequest rangeScanRequest = new RangeScanRequest(topic, startSeqId, i, readAheadSizeLimit, callback,
+                null);
+
+        return rangeScanRequest;
+
+    }
+
+    /**
+     * This is the callback that is used for the range scans.
+     */
+    protected class ReadAheadScanCallback implements ScanCallback {
+        Queue<CacheKey> installedStubs;
+        ByteString topic;
+
+        /**
+         * Constructor
+         * 
+         * @param installedStubs
+         *            The list of stubs that were installed for this range scan
+         * @param topic
+         */
+        public ReadAheadScanCallback(Queue<CacheKey> installedStubs, ByteString topic) {
+            this.installedStubs = installedStubs;
+            this.topic = topic;
+        }
+
+        public void messageScanned(Object ctx, Message message) {
+
+            // Any message we read is potentially useful for us, so lets first
+            // enqueue it
+            CacheKey cacheKey = new CacheKey(topic, message.getMsgId().getLocalComponent());
+            enqueueWithoutFailure(new ScanResponse(cacheKey, message));
+
+            // Now lets see if this message is the one we were expecting
+            CacheKey expectedKey = installedStubs.peek();
+
+            if (expectedKey == null) {
+                // Was not expecting any more messages to come in, but they came
+                // in so we will keep them
+                return;
+            }
+
+            if (expectedKey.equals(cacheKey)) {
+                // what we got is what we expected, dequeue it so we get the
+                // next expected one
+                installedStubs.poll();
+                return;
+            }
+
+            // If reached here, what we scanned was not what we were expecting.
+            // This means that we have wrong stubs installed in the cache. We
+            // should remove them, so that whoever is waiting on them can retry.
+            // This shouldn't be happening usually
+            logger.warn("Unexpected message seq-id: " + message.getMsgId().getLocalComponent() + " on topic: "
+                    + topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + expectedKey.seqId
+                    + " topic: " + expectedKey.topic.toStringUtf8() + " installedStubs: " + installedStubs);
+            enqueueDeleteOfRemainingStubs(noSuchSeqIdExceptionInstance);
+
+        }
+
+        public void scanFailed(Object ctx, Exception exception) {
+            enqueueDeleteOfRemainingStubs(exception);
+        }
+
+        public void scanFinished(Object ctx, ReasonForFinish reason) {
+            // If the scan finished because no more messages are present, its ok
+            // to leave the stubs in place because they will get filled in as
+            // new publishes happen. However, if the scan finished due to some
+            // other reason, e.g., read ahead size limit was reached, we want to
+            // delete the stubs, so that when the time comes, we can schedule
+            // another readahead request.
+            if (reason != ReasonForFinish.NO_MORE_MESSAGES) {
+                enqueueDeleteOfRemainingStubs(readAheadExceptionInstance);
+            }
+        }
+
+        private void enqueueDeleteOfRemainingStubs(Exception reason) {
+            CacheKey installedStub;
+            while ((installedStub = installedStubs.poll()) != null) {
+                enqueueWithoutFailure(new ExceptionOnCacheKey(installedStub, reason));
+            }
+        }
+    }
+
+    protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
+        protected static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
+
+        public Set<CacheKey> newInstance() {
+            return new HashSet<CacheKey>();
+        }
+    }
+
+    protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
+        protected static TreeSetLongFactory instance = new TreeSetLongFactory();
+
+        public SortedSet<Long> newInstance() {
+            return new TreeSet<Long>();
+        }
+    }
+
+    /**
+     * For adding the message to the cache, we do some bookeeping such as the
+     * total size of cache, order in which entries were added etc. If the size
+     * of the cache has exceeded our budget, old entries are collected.
+     * 
+     * @param cacheKey
+     * @param message
+     */
+    protected void addMessageToCache(CacheKey cacheKey, Message message, long currTime) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Adding msg (topic: " + cacheKey.getTopic().toStringUtf8() + ", seq-id: "
+                    + message.getMsgId().getLocalComponent() + ") to readahead cache");
+        }
+
+        CacheValue cacheValue;
+
+        if ((cacheValue = cache.get(cacheKey)) == null) {
+            cacheValue = new CacheValue();
+            cache.put(cacheKey, cacheValue);
+        }
+
+        // update the cache size
+        presentCacheSize += message.getBody().size();
+
+        // maintain the time index of addition
+        MapMethods.addToMultiMap(timeIndexOfAddition, currTime, cacheKey, HashSetCacheKeyFactory.instance);
+
+        // maintain the index of seq-id
+        MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId(),
+                TreeSetLongFactory.instance);
+
+        // finally add the message to the cache
+        cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+
+        // if overgrown, collect old entries
+        collectOldCacheEntries();
+    }
+
+    protected void removeMessageFromCache(CacheKey cacheKey, Exception exception, boolean maintainTimeIndex,
+            boolean maintainSeqIdIndex) {
+        CacheValue cacheValue = cache.remove(cacheKey);
+
+        if (cacheValue == null) {
+            return;
+        }
+
+        if (cacheValue.isStub()) {
+            cacheValue.setErrorAndInvokeCallbacks(exception);
+            // Stubs are not present in the indexes, so dont need to maintain
+            // indexes here
+            return;
+        }
+
+        presentCacheSize -= cacheValue.getMessage().getBody().size();
+
+        // maintain the 2 indexes
+        // TODO: can we maintain these lazily?
+        if (maintainSeqIdIndex) {
+            MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId());
+        }
+        if (maintainTimeIndex) {
+            MapMethods.removeFromMultiMap(timeIndexOfAddition, cacheValue.getTimeOfAddition(), cacheKey);
+        }
+    }
+
+    /**
+     * Collection of old entries is simple. Just collect in insert-time order,
+     * oldest to newest.
+     */
+    protected void collectOldCacheEntries() {
+        long maxCacheSize = cfg.getMaximumCacheSize();
+
+        while (presentCacheSize > maxCacheSize && !timeIndexOfAddition.isEmpty()) {
+            Long earliestTime = timeIndexOfAddition.firstKey();
+            Set<CacheKey> oldCacheEntries = timeIndexOfAddition.get(earliestTime);
+
+            // Note: only concrete cache entries, and not stubs are in the time
+            // index. Hence there can be no callbacks pending on these cache
+            // entries. Hence safe to remove them directly.
+            for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); iter.hasNext();) {
+                CacheKey cacheKey = iter.next();
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Removing topic: " + cacheKey.getTopic() + "seq-id: " + cacheKey.getSeqId()
+                            + " from cache because its the oldest");
+                }
+                removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
+                        // maintainTimeIndex=
+                        false,
+                        // maintainSeqIdIndex=
+                        true);
+            }
+
+            timeIndexOfAddition.remove(earliestTime);
+
+        }
+    }
+
+    /**
+     * ========================================================================
+     * The rest is just simple wrapper classes.
+     * 
+     */
+
+    protected class ExceptionOnCacheKey implements CacheRequest {
+        CacheKey cacheKey;
+        Exception exception;
+
+        public ExceptionOnCacheKey(CacheKey cacheKey, Exception exception) {
+            this.cacheKey = cacheKey;
+            this.exception = exception;
+        }
+
+        /**
+         * If for some reason, an outstanding read on a cache stub fails,
+         * exception for that key is enqueued by the
+         * {@link ReadAheadScanCallback}. To handle this, we simply send error
+         * on the callbacks registered for that stub, and delete the entry from
+         * the cache
+         */
+        public void performRequest() {
+            removeMessageFromCache(cacheKey, exception,
+            // maintainTimeIndex=
+                    true,
+                    // maintainSeqIdIndex=
+                    true);
+        }
+
+    }
+
+    @SuppressWarnings("serial")
+    protected static class NoSuchSeqIdException extends Exception {
+
+        public NoSuchSeqIdException() {
+            super("No such seq-id");
+        }
+    }
+
+    @SuppressWarnings("serial")
+    protected static class ReadAheadException extends Exception {
+        public ReadAheadException() {
+            super("Readahead failed");
+        }
+    }
+
+    protected class ScanResponse implements CacheRequest {
+        CacheKey cacheKey;
+        Message message;
+
+        public ScanResponse(CacheKey cacheKey, Message message) {
+            this.cacheKey = cacheKey;
+            this.message = message;
+        }
+
+        public void performRequest() {
+            addMessageToCache(cacheKey, message, System.currentTimeMillis());
+        }
+
+    }
+
+    protected class DeliveredUntil implements CacheRequest {
+        ByteString topic;
+        Long seqId;
+
+        public DeliveredUntil(ByteString topic, Long seqId) {
+            this.topic = topic;
+            this.seqId = seqId;
+        }
+
+        public void performRequest() {
+            SortedSet<Long> orderedSeqIds = orderedIndexOnSeqId.get(topic);
+            if (orderedSeqIds == null) {
+                return;
+            }
+
+            // focus on the set of messages with seq-ids <= the one that
+            // has been delivered until
+            SortedSet<Long> headSet = orderedSeqIds.headSet(seqId + 1);
+
+            for (Iterator<Long> iter = headSet.iterator(); iter.hasNext();) {
+                Long seqId = iter.next();
+                CacheKey cacheKey = new CacheKey(topic, seqId);
+
+                if (logger.isDebugEnabled()) {
+                    logger.debug("Removing seq-id: " + cacheKey.getSeqId() + " topic: "
+                            + cacheKey.getTopic().toStringUtf8()
+                            + " from cache because every subscriber has moved past");
+                }
+
+                removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
+                        // maintainTimeIndex=
+                        true,
+                        // maintainSeqIdIndex=
+                        false);
+                iter.remove();
+            }
+
+            if (orderedSeqIds.isEmpty()) {
+                orderedIndexOnSeqId.remove(topic);
+            }
+        }
+    }
+
+    protected class ScanRequestWrapper implements CacheRequest {
+        ScanRequest request;
+
+        public ScanRequestWrapper(ScanRequest request) {
+            this.request = request;
+        }
+
+        /**
+         * To handle a scan request, we first try to do readahead (which might
+         * cause a range read to be issued to the underlying persistence
+         * manager). The readahead will put a stub in the cache, if the message
+         * is not already present in the cache. The scan callback that is part
+         * of the scan request is added to this stub, and will be called later
+         * when the message arrives as a result of the range scan issued to the
+         * underlying persistence manager.
+         */
+
+        public void performRequest() {
+
+            RangeScanRequest readAheadRequest = doReadAhead(request);
+
+            // Read ahead must have installed at least a stub for us, so this
+            // can't be null
+            CacheValue cacheValue = cache.get(new CacheKey(request.getTopic(), request.getStartSeqId()));
+
+            // Add our callback to the stub. If the cache value was already a
+            // concrete message, the callback will be called right away
+            cacheValue.addCallback(request.getCallback(), request.getCtx());
+
+            if (readAheadRequest != null) {
+                realPersistenceManager.scanMessages(readAheadRequest);
+            }
+        }
+    }
+
+    protected class ShutdownCacheRequest implements CacheRequest {
+        // This is a simple type of CacheRequest we will enqueue when
+        // the PubSubServer is shut down and we want to stop the ReadAheadCache
+        // thread.
+        public void performRequest() {
+            keepRunning = false;
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public interface ScanCallback {
+
+    enum ReasonForFinish {
+        NO_MORE_MESSAGES, SIZE_LIMIT_EXCEEDED, NUM_MESSAGES_LIMIT_EXCEEDED
+    };
+
+    /**
+     * This method is called when a message is read from the persistence layer
+     * as part of a scan. The message just read is handed to this listener which
+     * can then take the desired action on it. The return value from the method
+     * indicates whether the scan should continue or not.
+     * 
+     * @param ctx
+     *            The context for the callback
+     * @param message
+     *            The message just scanned from the log
+     * @return true if the scan should continue, false otherwise
+     */
+    public void messageScanned(Object ctx, Message message);
+
+    /**
+     * This method is called when the scan finishes
+     * 
+     * 
+     * @param ctx
+     * @param reason
+     */
+
+    public abstract void scanFinished(Object ctx, ReasonForFinish reason);
+
+    /**
+     * This method is called when the operation failed due to some reason. The
+     * reason for failure is passed in.
+     * 
+     * @param ctx
+     *            The context for the callback
+     * @param exception
+     *            The reason for the failure of the scan
+     */
+    public abstract void scanFailed(Object ctx, Exception exception);
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+public class ScanCallbackWithContext {
+    ScanCallback scanCallback;
+    Object ctx;
+
+    public ScanCallbackWithContext(ScanCallback callback, Object ctx) {
+        this.scanCallback = callback;
+        this.ctx = ctx;
+    }
+
+    public ScanCallback getScanCallback() {
+        return scanCallback;
+    }
+
+    public Object getCtx() {
+        return ctx;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+/**
+ * Encapsulates a request for reading a single message. The message on the given
+ * topic <b>at</b> the given seqId is scanned. A call-back {@link ScanCallback}
+ * is provided. When the message is scanned, the
+ * {@link ScanCallback#messageScanned(Object, Message)} method is called. Since
+ * there is only 1 record to be scanned the
+ * {@link ScanCallback#operationFinished(Object)} method may not be called since
+ * its redundant.
+ * {@link ScanCallback#scanFailed(Object, org.apache.hedwig.exceptions.PubSubException)}
+ * method is called in case of error.
+ * 
+ */
+public class ScanRequest {
+    ByteString topic;
+    long startSeqId;
+    ScanCallback callback;
+    Object ctx;
+
+    public ScanRequest(ByteString topic, long startSeqId, ScanCallback callback, Object ctx) {
+        this.topic = topic;
+        this.startSeqId = startSeqId;
+        this.callback = callback;
+        this.ctx = ctx;
+    }
+
+    public ByteString getTopic() {
+        return topic;
+    }
+
+    public long getStartSeqId() {
+        return startSeqId;
+    }
+
+    public ScanCallback getCallback() {
+        return callback;
+    }
+
+    public Object getCtx() {
+        return ctx;
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.jboss.netty.channel.Channel;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
+import org.apache.hedwig.util.Callback;
+
+public class ChannelTracker implements ChannelDisconnectListener {
+    HashMap<TopicSubscriber, Channel> topicSub2Channel = new HashMap<TopicSubscriber, Channel>();
+    HashMap<Channel, List<TopicSubscriber>> channel2TopicSubs = new HashMap<Channel, List<TopicSubscriber>>();
+    Subscriber subscriber;
+
+    public ChannelTracker(Subscriber subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    static Callback<Void> noOpCallback = new Callback<Void>() {
+        public void operationFailed(Object ctx, PubSubException exception) {
+        };
+
+        public void operationFinished(Object ctx, Void resultOfOperation) {
+        };
+    };
+
+    public synchronized void channelDisconnected(Channel channel) {
+        List<TopicSubscriber> topicSubs = channel2TopicSubs.remove(channel);
+
+        if (topicSubs == null) {
+            return;
+        }
+
+        for (TopicSubscriber topicSub : topicSubs) {
+            topicSub2Channel.remove(topicSub);
+            subscriber.asyncCloseSubscription(topicSub.getTopic(), topicSub.getSubscriberId(), noOpCallback, null);
+        }
+    }
+
+    public synchronized void subscribeSucceeded(TopicSubscriber topicSubscriber, Channel channel)
+            throws TopicBusyException {
+
+        if (!channel.isConnected()) {
+            // channel got disconnected while we were processing the
+            // subscribe request, nothing much we can do in this case
+            return;
+        }
+
+        if (topicSub2Channel.containsKey(topicSubscriber)) {
+            TopicBusyException pse = new PubSubException.TopicBusyException(
+                    "subscription for this topic, subscriberId is already being served on a different channel");
+            throw pse;
+        }
+
+        topicSub2Channel.put(topicSubscriber, channel);
+
+        List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel);
+
+        if (topicSubs == null) {
+            topicSubs = new LinkedList<TopicSubscriber>();
+            channel2TopicSubs.put(channel, topicSubs);
+        }
+        topicSubs.add(topicSubscriber);
+
+    }
+
+    public synchronized void aboutToUnsubscribe(ByteString topic, ByteString subscriberId) {
+        TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
+
+        Channel channel = topicSub2Channel.remove(topicSub);
+
+        if (channel != null) {
+            List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel);
+            if (topicSubs != null) {
+                topicSubs.remove(topicSub);
+            }
+        }
+    }
+
+    public synchronized void checkChannelMatches(ByteString topic, ByteString subscriberId, Channel channel)
+            throws PubSubException {
+        Channel subscribedChannel = getChannel(topic, subscriberId);
+
+        if (subscribedChannel == null) {
+            throw new PubSubException.ClientNotSubscribedException(
+                    "Can't start delivery since client is not subscribed");
+        }
+
+        if (subscribedChannel != channel) {
+            throw new PubSubException.TopicBusyException(
+                    "Can't start delivery since client is subscribed on a different channel");
+        }
+
+    }
+
+    public synchronized Channel getChannel(ByteString topic, ByteString subscriberId) {
+        return topicSub2Channel.get(new TopicSubscriber(topic, subscriberId));
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import java.io.File;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Log4JLoggerFactory;
+
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.PubSubServer;
+import org.apache.hedwig.server.netty.PubSubServerPipelineFactory;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+
+public class HedwigProxy {
+    static final Logger logger = Logger.getLogger(HedwigProxy.class);
+
+    HedwigClient client;
+    ServerSocketChannelFactory serverSocketChannelFactory;
+    ChannelGroup allChannels; 
+    Map<OperationType, Handler> handlers;
+    ProxyConfiguration cfg;
+
+    public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler)
+            throws InterruptedException {
+        this.cfg = cfg;
+
+        ThreadGroup tg = new ThreadGroup("hedwigproxy") {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                exceptionHandler.uncaughtException(t, e);
+            }
+        };
+
+        final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
+
+        new Thread(tg, new Runnable() {
+            @Override
+            public void run() {
+                client = new HedwigClient(cfg);
+                
+                serverSocketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+                        Executors.newCachedThreadPool());
+                initializeHandlers();
+                initializeNetty();
+
+                queue.offer(true);
+            }
+        }).start();
+
+        queue.take();
+    }
+
+    public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
+        this(conf, new TerminateJVMExceptionHandler());
+    }
+
+    protected void initializeHandlers() {
+        handlers = new HashMap<OperationType, Handler>();
+        ChannelTracker tracker = new ChannelTracker(client.getSubscriber());
+
+        handlers.put(OperationType.PUBLISH, new ProxyPublishHander(client.getPublisher()));
+        handlers.put(OperationType.SUBSCRIBE, new ProxySubscribeHandler(client.getSubscriber(), tracker));
+        handlers.put(OperationType.UNSUBSCRIBE, new ProxyUnsubscribeHandler(client.getSubscriber(), tracker));
+        handlers.put(OperationType.CONSUME, new ProxyConsumeHandler(client.getSubscriber()));
+        handlers.put(OperationType.STOP_DELIVERY, new ProxyStopDeliveryHandler(client.getSubscriber(), tracker));
+        handlers.put(OperationType.START_DELIVERY, new ProxyStartDeliveryHandler(client.getSubscriber(), tracker));
+
+    }
+
+    protected void initializeNetty() {
+        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+        allChannels = new DefaultChannelGroup("hedwigproxy");
+        ServerBootstrap bootstrap = new ServerBootstrap(serverSocketChannelFactory);
+        UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, false);
+        PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, null, cfg
+                .getMaximumMessageSize());
+
+        bootstrap.setPipelineFactory(pipeline);
+        bootstrap.setOption("child.tcpNoDelay", true);
+        bootstrap.setOption("child.keepAlive", true);
+        bootstrap.setOption("reuseAddress", true);
+
+        // Bind and start to accept incoming connections.
+        allChannels.add(bootstrap.bind(new InetSocketAddress(cfg.getProxyPort())));
+        logger.info("Going into receive loop");
+    }
+
+    public void shutdown() {
+        allChannels.close().awaitUninterruptibly();
+        client.stop();
+        serverSocketChannelFactory.releaseExternalResources();
+    }
+
+    // the following method only exists for unit-testing purposes, should go
+    // away once we make start delivery totally server-side
+    public Handler getStartDeliveryHandler(){
+        return handlers.get(OperationType.START_DELIVERY);
+    }
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+
+        logger.info("Attempting to start Hedwig Proxy");
+        ProxyConfiguration conf = new ProxyConfiguration();
+        if (args.length > 0) {
+            String confFile = args[0];
+            try {
+                conf.loadConf(new File(confFile).toURI().toURL());
+            } catch (MalformedURLException e) {
+                String msg = "Could not open configuration file: " + confFile;
+                PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_INVALID_CONF_FILE);
+            } catch (ConfigurationException e) {
+                String msg = "Malformed configuration file: " + confFile;
+                PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_MISCONFIGURED);
+            }
+            logger.info("Using configuration file " + confFile);
+        }
+        try {
+            new HedwigProxy(conf);
+        } catch (Throwable t) {
+            PubSubServer.errorMsgAndExit("Error during startup", t, PubSubServer.RC_OTHER);
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+
+public class ProxyConfiguration extends ClientConfiguration{
+
+    protected static String PROXY_PORT = "proxy_port";
+    protected static String MAX_MESSAGE_SIZE = "max_message_size";
+    
+    public int getProxyPort(){
+        return conf.getInt(PROXY_PORT, 9099);
+    }
+    
+    @Override
+    public int getMaximumMessageSize() {
+        return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
+    }
+    
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+
+public class ProxyConsumeHandler implements Handler {
+
+    static final Logger logger = Logger.getLogger(ProxyConsumeHandler.class);
+
+    Subscriber subscriber;
+
+    public ProxyConsumeHandler(Subscriber subscriber) {
+        this.subscriber = subscriber;
+    }
+
+    @Override
+    public void handleRequest(PubSubRequest request, Channel channel) {
+        if (!request.hasConsumeRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing consume request data");
+            return;
+        }
+
+        ConsumeRequest consumeRequest = request.getConsumeRequest();
+        try {
+            subscriber.consume(request.getTopic(), consumeRequest.getSubscriberId(), consumeRequest.getMsgId());
+        } catch (ClientNotSubscribedException e) {
+            // ignore
+            logger.warn("Unexpected consume request", e);
+        }
+
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxyPublishHander implements Handler {
+    Publisher publisher;
+
+    public ProxyPublishHander(Publisher publisher) {
+        this.publisher = publisher;
+    }
+
+    @Override
+    public void handleRequest(final PubSubRequest request, final Channel channel) {
+        if (!request.hasPublishRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing publish request data");
+            return;
+        }
+
+        final PublishRequest publishRequest = request.getPublishRequest();
+
+        publisher.asyncPublish(request.getTopic(), publishRequest.getMsg(), new Callback<Void>() {
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+            }
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+            }
+        }, null);
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxyStartDeliveryHandler implements Handler {
+
+    static final Logger logger = Logger.getLogger(ProxyStartDeliveryHandler.class);
+
+    Subscriber subscriber;
+    ChannelTracker tracker;
+
+    public ProxyStartDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
+        this.subscriber = subscriber;
+        this.tracker = tracker;
+    }
+
+    @Override
+    public void handleRequest(PubSubRequest request, Channel channel) {
+
+        if (!request.hasStartDeliveryRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing start delivery request data");
+            return;
+        }
+
+        final ByteString topic = request.getTopic();
+        final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
+
+        synchronized (tracker) {
+            // try {
+            // tracker.checkChannelMatches(topic, subscriberId, channel);
+            // } catch (PubSubException e) {
+            // channel.write(PubSubResponseUtils.getResponseForException(e,
+            // request.getTxnId()));
+            // return;
+            // }
+
+            final Channel subscribedChannel = tracker.getChannel(topic, subscriberId);
+            
+            if (subscribedChannel == null) {
+                channel.write(PubSubResponseUtils.getResponseForException(
+                        new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
+                        request.getTxnId()));
+                return;
+            }
+            
+            MessageHandler handler = new MessageHandler() {
+                @Override
+                public void consume(ByteString topic, ByteString subscriberId, Message msg,
+                        final Callback<Void> callback, final Object context) {
+
+                    PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(
+                            ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
+                            .setTopic(topic).setSubscriberId(subscriberId).build();
+
+                    ChannelFuture future = subscribedChannel.write(response);
+
+                    future.addListener(new ChannelFutureListener() {
+                        @Override
+                        public void operationComplete(ChannelFuture future) throws Exception {
+                            if (!future.isSuccess()) {
+                                // ignoring this failure, because this will
+                                // only happen due to channel disconnect.
+                                // Channel disconnect will in turn stop
+                                // delivery, and stop these errors
+                                return;
+                            }
+
+                            // Tell the hedwig client, that it can send me
+                            // more messages
+                            callback.operationFinished(context, null);
+                        }
+                    });
+                }
+            };
+
+            channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+
+            try {
+                subscriber.startDelivery(topic, subscriberId, handler);
+            } catch (ClientNotSubscribedException e) {
+                // This should not happen, since we already checked the correct
+                // channel and so on
+                logger.fatal("Unexpected: No subscription when attempting to start delivery", e);
+                throw new RuntimeException(e);
+            }
+            
+
+
+        }
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+
+public class ProxyStopDeliveryHandler implements Handler {
+
+    static final Logger logger = Logger.getLogger(ProxyStopDeliveryHandler.class);
+
+    Subscriber subscriber;
+    ChannelTracker tracker;
+
+    public ProxyStopDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
+        this.subscriber = subscriber;
+        this.tracker = tracker;
+    }
+
+    @Override
+    public void handleRequest(PubSubRequest request, Channel channel) {
+        if (!request.hasStopDeliveryRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing stop delivery request data");
+            return;
+        }
+
+        final ByteString topic = request.getTopic();
+        final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
+
+        synchronized (tracker) {
+            try {
+                tracker.checkChannelMatches(topic, subscriberId, channel);
+            } catch (PubSubException e) {
+                // intentionally ignore this error, since stop delivery doesn't
+                // send back a response
+                return;
+            }
+
+            try {
+                subscriber.stopDelivery(topic, subscriberId);
+            } catch (ClientNotSubscribedException e) {
+                // This should not happen, since we already checked the correct
+                // channel and so on
+                logger.warn("Unexpected: No subscription when attempting to stop delivery", e);
+            }
+        }
+
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxySubscribeHandler implements Handler, ChannelDisconnectListener {
+
+    static final Logger logger = Logger.getLogger(ProxySubscribeHandler.class);
+
+    Subscriber subscriber;
+    ChannelTracker tracker;
+
+    public ProxySubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
+        this.subscriber = subscriber;
+        this.tracker = tracker;
+    }
+
+    @Override
+    public void channelDisconnected(Channel channel) {
+        tracker.channelDisconnected(channel);
+    }
+
+    @Override
+    public void handleRequest(final PubSubRequest request, final Channel channel) {
+        if (!request.hasSubscribeRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing subscribe request data");
+            return;
+        }
+
+        SubscribeRequest subRequest = request.getSubscribeRequest();
+        final TopicSubscriber topicSubscriber = new TopicSubscriber(request.getTopic(), subRequest.getSubscriberId());
+
+        subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(), subRequest
+                .getCreateOrAttach(), new Callback<Void>() {
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+            }
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                try {
+                    tracker.subscribeSucceeded(topicSubscriber, channel);
+                } catch (TopicBusyException e) {
+                    channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId()));
+                    return;
+                }
+                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+            }
+        }, null);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.jboss.netty.channel.Channel;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxyUnsubscribeHandler implements Handler {
+
+    Subscriber subscriber;
+    ChannelTracker tracker;
+
+    public ProxyUnsubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
+        this.subscriber = subscriber;
+        this.tracker = tracker;
+    }
+
+    @Override
+    public void handleRequest(final PubSubRequest request, final Channel channel) {
+        if (!request.hasUnsubscribeRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing unsubscribe request data");
+            return;
+        }
+
+        ByteString topic = request.getTopic();
+        ByteString subscriberId = request.getUnsubscribeRequest().getSubscriberId();
+
+        synchronized (tracker) {
+
+            // Even if unsubscribe fails, the hedwig client closes the channel
+            // on which the subscription is being served. Hence better to tell
+            // the tracker beforehand that this subscription is no longer served
+            tracker.aboutToUnsubscribe(topic, subscriberId);
+
+            subscriber.asyncUnsubscribe(topic, subscriberId, new Callback<Void>() {
+                @Override
+                public void operationFailed(Object ctx, PubSubException exception) {
+                    channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                }
+
+                @Override
+                public void operationFinished(Object ctx, Void resultOfOperation) {
+                    channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+                }
+            }, null);
+        }
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.netty.HedwigClient;
+
+/**
+ * This is a hub specific implementation of the HedwigClient. All this does
+ * though is to override the HedwigSubscriber with the hub specific child class.
+ * Creating this class so we can call the protected method in the parent to set
+ * the subscriber since we don't want to expose that API to the public.
+ */
+public class HedwigHubClient extends HedwigClient {
+
+    // Constructor when we already have a ChannelFactory instantiated.
+    public HedwigHubClient(ClientConfiguration cfg, ClientSocketChannelFactory channelFactory) {
+        super(cfg, channelFactory);
+        // Override the type of HedwigSubscriber with the hub specific one.
+        setSubscriber(new HedwigHubSubscriber(this));
+    }
+
+    // Constructor when we don't have a ChannelFactory. The super constructor
+    // will create one for us.
+    public HedwigHubClient(ClientConfiguration cfg) {
+        super(cfg);
+        // Override the type of HedwigSubscriber with the hub specific one.
+        setSubscriber(new HedwigHubSubscriber(this));
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class HedwigHubClientFactory {
+
+    private final ServerConfiguration cfg;
+    private final ClientSocketChannelFactory channelFactory;
+
+    // Constructor that takes in a ServerConfiguration and a ChannelFactory
+    // so we can reuse it for all Clients created here.
+    public HedwigHubClientFactory(ServerConfiguration cfg, ClientSocketChannelFactory channelFactory) {
+        this.cfg = cfg;
+        this.channelFactory = channelFactory;
+    }
+
+    /**
+     * Manufacture a hub client whose default server to connect to is the input
+     * HedwigSocketAddress hub.
+     * 
+     * @param hub
+     *            The hub in another region to connect to.
+     */
+    HedwigHubClient create(final HedwigSocketAddress hub) {
+        // Create a hub specific version of the client to use
+        return new HedwigHubClient(new ClientConfiguration() {
+            @Override
+            protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
+                return hub;
+            }
+
+            @Override
+            public boolean isSSLEnabled() {
+                return cfg.isInterRegionSSLEnabled();
+            }
+        }, channelFactory);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is a hub specific child class of the HedwigSubscriber. The main thing is
+ * does is wrap the public subscribe/unsubscribe methods by calling the
+ * overloaded protected ones passing in a true value for the input boolean
+ * parameter isHub. That will just make sure we validate the subscriberId
+ * passed, ensuring it is of the right format either for a local or hub
+ * subscriber.
+ */
+public class HedwigHubSubscriber extends HedwigSubscriber {
+
+    public HedwigHubSubscriber(HedwigClient client) {
+        super(client);
+    }
+
+    @Override
+    public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
+            throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+            InvalidSubscriberIdException {
+        subscribe(topic, subscriberId, mode, true);
+    }
+
+    @Override
+    public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
+            Object context) {
+        asyncSubscribe(topic, subscriberId, mode, callback, context, true);
+    }
+
+    @Override
+    public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+            ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+        unsubscribe(topic, subscriberId, true);
+    }
+
+    @Override
+    public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
+            final Object context) {
+        asyncUnsubscribe(topic, subscriberId, callback, context, true);
+    }
+
+}