You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC
svn commit: r987314 [12/16] - in /hadoop/zookeeper/trunk: ./
src/contrib/hedwig/ src/contrib/hedwig/client/
src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/
src/contrib/hedwig/client/src/main/cpp/
src/contrib/hedwig/client/src/main/cp...
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ReadAheadCache.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,703 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Iterator;
+import java.util.LinkedList;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.SortedSet;
+import java.util.TreeMap;
+import java.util.TreeSet;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protoextensions.MessageIdUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.util.Callback;
+
+public class ReadAheadCache implements PersistenceManager, Runnable {
+
+ static Logger logger = Logger.getLogger(ReadAheadCache.class);
+
+ protected interface CacheRequest {
+ public void performRequest();
+ }
+
+ /**
+ * The underlying persistence manager that will be used for persistence and
+ * scanning below the cache
+ */
+ protected PersistenceManagerWithRangeScan realPersistenceManager;
+
+ /**
+ * The structure for the cache
+ */
+ protected Map<CacheKey, CacheValue> cache = new HashMap<CacheKey, CacheValue>();
+
+ /**
+ * To simplify synchronization, the cache will be maintained by a single
+ * cache maintainer thread. This is the queue that will hold requests that
+ * need to be served by this thread
+ */
+ protected BlockingQueue<CacheRequest> requestQueue = new LinkedBlockingQueue<CacheRequest>();
+
+ /**
+ * We want to keep track of when entries were added in the cache, so that we
+ * can remove them in a FIFO fashion
+ */
+ protected SortedMap<Long, Set<CacheKey>> timeIndexOfAddition = new TreeMap<Long, Set<CacheKey>>();
+
+ /**
+ * We also want to track the entries in seq-id order so that we can clean up
+ * entries after the last subscriber
+ */
+ protected Map<ByteString, SortedSet<Long>> orderedIndexOnSeqId = new HashMap<ByteString, SortedSet<Long>>();
+
+ /**
+ * We maintain an estimate of the current size of the cache, so that we know
+ * when to evict entries.
+ */
+ protected long presentCacheSize = 0;
+
+ /**
+ * One instance of a callback that we will pass to the underlying
+ * persistence manager when asking it to persist messages
+ */
+ protected PersistCallback persistCallbackInstance = new PersistCallback();
+
+ /**
+ * 2 kinds of exceptions that we will use to signal error from readahead
+ */
+ protected NoSuchSeqIdException noSuchSeqIdExceptionInstance = new NoSuchSeqIdException();
+ protected ReadAheadException readAheadExceptionInstance = new ReadAheadException();
+
+ protected ServerConfiguration cfg;
+ protected Thread cacheThread;
+ // Boolean indicating if this thread should continue running. This is used
+ // when we want to stop the thread during a PubSubServer shutdown.
+ protected boolean keepRunning = true;
+
+ /**
+ * Constructor. Starts the cache maintainer thread
+ *
+ * @param realPersistenceManager
+ */
+ public ReadAheadCache(PersistenceManagerWithRangeScan realPersistenceManager, ServerConfiguration cfg) {
+ this.realPersistenceManager = realPersistenceManager;
+ this.cfg = cfg;
+ cacheThread = new Thread(this, "CacheThread");
+ }
+
+ public ReadAheadCache start() {
+ cacheThread.start();
+ return this;
+ }
+
+ /**
+ * ========================================================================
+ * Methods of {@link PersistenceManager} that we will pass straight down to
+ * the real persistence manager.
+ */
+
+ public long getSeqIdAfterSkipping(ByteString topic, long seqId, int skipAmount) {
+ return realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, skipAmount);
+ }
+
+ public MessageSeqId getCurrentSeqIdForTopic(ByteString topic) throws ServerNotResponsibleForTopicException {
+ return realPersistenceManager.getCurrentSeqIdForTopic(topic);
+ }
+
+ /**
+ * ========================================================================
+ * Other methods of {@link PersistenceManager} that the cache needs to take
+ * some action on.
+ *
+ * 1. Persist: We pass it through to the real persistence manager but insert
+ * our callback on the return path
+ *
+ */
+ public void persistMessage(PersistRequest request) {
+ // make a new PersistRequest object so that we can insert our own
+ // callback in the middle. Assign the original request as the context
+ // for the callback.
+
+ PersistRequest newRequest = new PersistRequest(request.getTopic(), request.getMessage(),
+ persistCallbackInstance, request);
+ realPersistenceManager.persistMessage(newRequest);
+ }
+
+ /**
+ * The callback that we insert on the persist request return path. The
+ * callback simply forms a {@link PersistResponse} object and inserts it in
+ * the request queue to be handled serially by the cache maintainer thread.
+ *
+ */
+ public class PersistCallback implements Callback<Long> {
+
+ /**
+ * In case there is a failure in persisting, just pass it to the
+ * original callback
+ */
+ public void operationFailed(Object ctx, PubSubException exception) {
+ PersistRequest originalRequest = (PersistRequest) ctx;
+ Callback<Long> originalCallback = originalRequest.getCallback();
+ Object originalContext = originalRequest.getCtx();
+ originalCallback.operationFailed(originalContext, exception);
+ }
+
+ /**
+ * When the persist finishes, we first notify the original callback of
+ * success, and then opportunistically treat the message as if it just
+ * came in through a scan
+ */
+ public void operationFinished(Object ctx, Long resultOfOperation) {
+ PersistRequest originalRequest = (PersistRequest) ctx;
+
+ // Lets call the original callback first so that the publisher can
+ // hear success
+ originalRequest.getCallback().operationFinished(originalRequest.getCtx(), resultOfOperation);
+
+ // Original message that was persisted didn't have the local seq-id.
+ // Lets add that in
+ Message messageWithLocalSeqId = MessageIdUtils.mergeLocalSeqId(originalRequest.getMessage(),
+ resultOfOperation);
+
+ // Now enqueue a request to add this newly persisted message to our
+ // cache
+ CacheKey cacheKey = new CacheKey(originalRequest.getTopic(), resultOfOperation);
+
+ enqueueWithoutFailure(new ScanResponse(cacheKey, messageWithLocalSeqId));
+ }
+
+ }
+
+ /**
+ * Too complicated to deal with enqueue failures from the context of our
+ * callbacks. Its just simpler to quit and restart afresh. Moreover, this
+ * should not happen as the request queue for the cache maintainer is
+ * unbounded.
+ *
+ * @param obj
+ */
+ protected void enqueueWithoutFailure(CacheRequest obj) {
+ if (!requestQueue.offer(obj)) {
+ throw new UnexpectedError("Could not enqueue object: " + obj.toString()
+ + " to cache request queue. Exiting.");
+
+ }
+ }
+
+ /**
+ * Another method from {@link PersistenceManager}.
+ *
+ * 2. Scan - Since the scan needs to touch the cache, we will just enqueue
+ * the scan request and let the cache maintainer thread handle it.
+ */
+ public void scanSingleMessage(ScanRequest request) {
+ // Let the scan requests be serialized through the queue
+ enqueueWithoutFailure(new ScanRequestWrapper(request));
+ }
+
+ /**
+ * Another method from {@link PersistenceManager}.
+ *
+ * 3. Enqueue the request so that the cache maintainer thread can delete all
+ * message-ids older than the one specified
+ */
+ public void deliveredUntil(ByteString topic, Long seqId) {
+ enqueueWithoutFailure(new DeliveredUntil(topic, seqId));
+ }
+
+ /**
+ * Another method from {@link PersistenceManager}.
+ *
+ * Since this is a cache layer on top of an underlying persistence manager,
+ * we can just call the consumedUntil method there. The messages older than
+ * the latest one passed here won't be accessed anymore so they should just
+ * get aged out of the cache eventually. For now, there is no need to
+ * proactively remove those entries from the cache.
+ */
+ public void consumedUntil(ByteString topic, Long seqId) {
+ realPersistenceManager.consumedUntil(topic, seqId);
+ }
+
+ /**
+ * ========================================================================
+ * BEGINNING OF CODE FOR THE CACHE MAINTAINER THREAD
+ *
+ * 1. The run method. It simply dequeues from the request queue, checks the
+ * type of object and acts accordingly
+ */
+ public void run() {
+ while (keepRunning) {
+ CacheRequest obj;
+ try {
+ obj = requestQueue.take();
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ return;
+ }
+ obj.performRequest();
+ }
+
+ }
+
+ /**
+ * Stop method which will enqueue a ShutdownCacheRequest.
+ */
+ public void stop() {
+ enqueueWithoutFailure(new ShutdownCacheRequest());
+ }
+
+ /**
+ * The readahead policy is simple: We check if an entry already exists for
+ * the message being requested. If an entry exists, it means that either
+ * that message is already in the cache, or a read for that message is
+ * outstanding. In that case, we look a little ahead (by readAheadCount/2)
+ * and issue a range read of readAheadCount/2 messages. The idea is to
+ * ensure that the next readAheadCount messages are always available.
+ *
+ * @return the range scan that should be issued for read ahead
+ */
+ protected RangeScanRequest doReadAhead(ScanRequest request) {
+ ByteString topic = request.getTopic();
+ Long seqId = request.getStartSeqId();
+
+ int readAheadCount = cfg.getReadAheadCount();
+ // To prevent us from getting screwed by bad configuration
+ readAheadCount = Math.max(1, readAheadCount);
+
+ RangeScanRequest readAheadRequest = doReadAheadStartingFrom(topic, seqId, readAheadCount);
+
+ if (readAheadRequest != null) {
+ return readAheadRequest;
+ }
+
+ // start key was already there in the cache so no readahead happened,
+ // lets look a little beyond
+ seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, readAheadCount / 2);
+
+ readAheadRequest = doReadAheadStartingFrom(topic, seqId, readAheadCount / 2);
+
+ return readAheadRequest;
+ }
+
+ /**
+ * This method just checks if the provided seq-id already exists in the
+ * cache. If not, a range read of the specified amount is issued.
+ *
+ * @param topic
+ * @param seqId
+ * @param readAheadCount
+ * @return The range read that should be issued
+ */
+ protected RangeScanRequest doReadAheadStartingFrom(ByteString topic, long seqId, int readAheadCount) {
+
+ long startSeqId = seqId;
+ Queue<CacheKey> installedStubs = new LinkedList<CacheKey>();
+
+ int i = 0;
+
+ for (; i < readAheadCount; i++) {
+ CacheKey cacheKey = new CacheKey(topic, seqId);
+
+ // Even if a stub exists, it means that a scan for that is
+ // outstanding
+ if (cache.containsKey(cacheKey)) {
+ break;
+ }
+ CacheValue cacheValue = new CacheValue();
+ cache.put(cacheKey, cacheValue);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding stub for seq-id: " + seqId + " topic: " + topic.toStringUtf8());
+ }
+ installedStubs.add(cacheKey);
+
+ seqId = realPersistenceManager.getSeqIdAfterSkipping(topic, seqId, 1);
+ }
+
+ // so how many did we decide to readahead
+ if (i == 0) {
+ // no readahead, hence return false
+ return null;
+ }
+
+ long readAheadSizeLimit = cfg.getReadAheadSizeBytes();
+ ReadAheadScanCallback callback = new ReadAheadScanCallback(installedStubs, topic);
+ RangeScanRequest rangeScanRequest = new RangeScanRequest(topic, startSeqId, i, readAheadSizeLimit, callback,
+ null);
+
+ return rangeScanRequest;
+
+ }
+
+ /**
+ * This is the callback that is used for the range scans.
+ */
+ protected class ReadAheadScanCallback implements ScanCallback {
+ Queue<CacheKey> installedStubs;
+ ByteString topic;
+
+ /**
+ * Constructor
+ *
+ * @param installedStubs
+ * The list of stubs that were installed for this range scan
+ * @param topic
+ */
+ public ReadAheadScanCallback(Queue<CacheKey> installedStubs, ByteString topic) {
+ this.installedStubs = installedStubs;
+ this.topic = topic;
+ }
+
+ public void messageScanned(Object ctx, Message message) {
+
+ // Any message we read is potentially useful for us, so lets first
+ // enqueue it
+ CacheKey cacheKey = new CacheKey(topic, message.getMsgId().getLocalComponent());
+ enqueueWithoutFailure(new ScanResponse(cacheKey, message));
+
+ // Now lets see if this message is the one we were expecting
+ CacheKey expectedKey = installedStubs.peek();
+
+ if (expectedKey == null) {
+ // Was not expecting any more messages to come in, but they came
+ // in so we will keep them
+ return;
+ }
+
+ if (expectedKey.equals(cacheKey)) {
+ // what we got is what we expected, dequeue it so we get the
+ // next expected one
+ installedStubs.poll();
+ return;
+ }
+
+ // If reached here, what we scanned was not what we were expecting.
+ // This means that we have wrong stubs installed in the cache. We
+ // should remove them, so that whoever is waiting on them can retry.
+ // This shouldn't be happening usually
+ logger.warn("Unexpected message seq-id: " + message.getMsgId().getLocalComponent() + " on topic: "
+ + topic.toStringUtf8() + " from readahead scan, was expecting seq-id: " + expectedKey.seqId
+ + " topic: " + expectedKey.topic.toStringUtf8() + " installedStubs: " + installedStubs);
+ enqueueDeleteOfRemainingStubs(noSuchSeqIdExceptionInstance);
+
+ }
+
+ public void scanFailed(Object ctx, Exception exception) {
+ enqueueDeleteOfRemainingStubs(exception);
+ }
+
+ public void scanFinished(Object ctx, ReasonForFinish reason) {
+ // If the scan finished because no more messages are present, its ok
+ // to leave the stubs in place because they will get filled in as
+ // new publishes happen. However, if the scan finished due to some
+ // other reason, e.g., read ahead size limit was reached, we want to
+ // delete the stubs, so that when the time comes, we can schedule
+ // another readahead request.
+ if (reason != ReasonForFinish.NO_MORE_MESSAGES) {
+ enqueueDeleteOfRemainingStubs(readAheadExceptionInstance);
+ }
+ }
+
+ private void enqueueDeleteOfRemainingStubs(Exception reason) {
+ CacheKey installedStub;
+ while ((installedStub = installedStubs.poll()) != null) {
+ enqueueWithoutFailure(new ExceptionOnCacheKey(installedStub, reason));
+ }
+ }
+ }
+
+ protected static class HashSetCacheKeyFactory implements Factory<Set<CacheKey>> {
+ protected static HashSetCacheKeyFactory instance = new HashSetCacheKeyFactory();
+
+ public Set<CacheKey> newInstance() {
+ return new HashSet<CacheKey>();
+ }
+ }
+
+ protected static class TreeSetLongFactory implements Factory<SortedSet<Long>> {
+ protected static TreeSetLongFactory instance = new TreeSetLongFactory();
+
+ public SortedSet<Long> newInstance() {
+ return new TreeSet<Long>();
+ }
+ }
+
+ /**
+ * For adding the message to the cache, we do some bookeeping such as the
+ * total size of cache, order in which entries were added etc. If the size
+ * of the cache has exceeded our budget, old entries are collected.
+ *
+ * @param cacheKey
+ * @param message
+ */
+ protected void addMessageToCache(CacheKey cacheKey, Message message, long currTime) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Adding msg (topic: " + cacheKey.getTopic().toStringUtf8() + ", seq-id: "
+ + message.getMsgId().getLocalComponent() + ") to readahead cache");
+ }
+
+ CacheValue cacheValue;
+
+ if ((cacheValue = cache.get(cacheKey)) == null) {
+ cacheValue = new CacheValue();
+ cache.put(cacheKey, cacheValue);
+ }
+
+ // update the cache size
+ presentCacheSize += message.getBody().size();
+
+ // maintain the time index of addition
+ MapMethods.addToMultiMap(timeIndexOfAddition, currTime, cacheKey, HashSetCacheKeyFactory.instance);
+
+ // maintain the index of seq-id
+ MapMethods.addToMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId(),
+ TreeSetLongFactory.instance);
+
+ // finally add the message to the cache
+ cacheValue.setMessageAndInvokeCallbacks(message, currTime);
+
+ // if overgrown, collect old entries
+ collectOldCacheEntries();
+ }
+
+ protected void removeMessageFromCache(CacheKey cacheKey, Exception exception, boolean maintainTimeIndex,
+ boolean maintainSeqIdIndex) {
+ CacheValue cacheValue = cache.remove(cacheKey);
+
+ if (cacheValue == null) {
+ return;
+ }
+
+ if (cacheValue.isStub()) {
+ cacheValue.setErrorAndInvokeCallbacks(exception);
+ // Stubs are not present in the indexes, so dont need to maintain
+ // indexes here
+ return;
+ }
+
+ presentCacheSize -= cacheValue.getMessage().getBody().size();
+
+ // maintain the 2 indexes
+ // TODO: can we maintain these lazily?
+ if (maintainSeqIdIndex) {
+ MapMethods.removeFromMultiMap(orderedIndexOnSeqId, cacheKey.getTopic(), cacheKey.getSeqId());
+ }
+ if (maintainTimeIndex) {
+ MapMethods.removeFromMultiMap(timeIndexOfAddition, cacheValue.getTimeOfAddition(), cacheKey);
+ }
+ }
+
+ /**
+ * Collection of old entries is simple. Just collect in insert-time order,
+ * oldest to newest.
+ */
+ protected void collectOldCacheEntries() {
+ long maxCacheSize = cfg.getMaximumCacheSize();
+
+ while (presentCacheSize > maxCacheSize && !timeIndexOfAddition.isEmpty()) {
+ Long earliestTime = timeIndexOfAddition.firstKey();
+ Set<CacheKey> oldCacheEntries = timeIndexOfAddition.get(earliestTime);
+
+ // Note: only concrete cache entries, and not stubs are in the time
+ // index. Hence there can be no callbacks pending on these cache
+ // entries. Hence safe to remove them directly.
+ for (Iterator<CacheKey> iter = oldCacheEntries.iterator(); iter.hasNext();) {
+ CacheKey cacheKey = iter.next();
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing topic: " + cacheKey.getTopic() + "seq-id: " + cacheKey.getSeqId()
+ + " from cache because its the oldest");
+ }
+ removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
+ // maintainTimeIndex=
+ false,
+ // maintainSeqIdIndex=
+ true);
+ }
+
+ timeIndexOfAddition.remove(earliestTime);
+
+ }
+ }
+
+ /**
+ * ========================================================================
+ * The rest is just simple wrapper classes.
+ *
+ */
+
+ protected class ExceptionOnCacheKey implements CacheRequest {
+ CacheKey cacheKey;
+ Exception exception;
+
+ public ExceptionOnCacheKey(CacheKey cacheKey, Exception exception) {
+ this.cacheKey = cacheKey;
+ this.exception = exception;
+ }
+
+ /**
+ * If for some reason, an outstanding read on a cache stub fails,
+ * exception for that key is enqueued by the
+ * {@link ReadAheadScanCallback}. To handle this, we simply send error
+ * on the callbacks registered for that stub, and delete the entry from
+ * the cache
+ */
+ public void performRequest() {
+ removeMessageFromCache(cacheKey, exception,
+ // maintainTimeIndex=
+ true,
+ // maintainSeqIdIndex=
+ true);
+ }
+
+ }
+
+ @SuppressWarnings("serial")
+ protected static class NoSuchSeqIdException extends Exception {
+
+ public NoSuchSeqIdException() {
+ super("No such seq-id");
+ }
+ }
+
+ @SuppressWarnings("serial")
+ protected static class ReadAheadException extends Exception {
+ public ReadAheadException() {
+ super("Readahead failed");
+ }
+ }
+
+ protected class ScanResponse implements CacheRequest {
+ CacheKey cacheKey;
+ Message message;
+
+ public ScanResponse(CacheKey cacheKey, Message message) {
+ this.cacheKey = cacheKey;
+ this.message = message;
+ }
+
+ public void performRequest() {
+ addMessageToCache(cacheKey, message, System.currentTimeMillis());
+ }
+
+ }
+
+ protected class DeliveredUntil implements CacheRequest {
+ ByteString topic;
+ Long seqId;
+
+ public DeliveredUntil(ByteString topic, Long seqId) {
+ this.topic = topic;
+ this.seqId = seqId;
+ }
+
+ public void performRequest() {
+ SortedSet<Long> orderedSeqIds = orderedIndexOnSeqId.get(topic);
+ if (orderedSeqIds == null) {
+ return;
+ }
+
+ // focus on the set of messages with seq-ids <= the one that
+ // has been delivered until
+ SortedSet<Long> headSet = orderedSeqIds.headSet(seqId + 1);
+
+ for (Iterator<Long> iter = headSet.iterator(); iter.hasNext();) {
+ Long seqId = iter.next();
+ CacheKey cacheKey = new CacheKey(topic, seqId);
+
+ if (logger.isDebugEnabled()) {
+ logger.debug("Removing seq-id: " + cacheKey.getSeqId() + " topic: "
+ + cacheKey.getTopic().toStringUtf8()
+ + " from cache because every subscriber has moved past");
+ }
+
+ removeMessageFromCache(cacheKey, readAheadExceptionInstance, //
+ // maintainTimeIndex=
+ true,
+ // maintainSeqIdIndex=
+ false);
+ iter.remove();
+ }
+
+ if (orderedSeqIds.isEmpty()) {
+ orderedIndexOnSeqId.remove(topic);
+ }
+ }
+ }
+
+ protected class ScanRequestWrapper implements CacheRequest {
+ ScanRequest request;
+
+ public ScanRequestWrapper(ScanRequest request) {
+ this.request = request;
+ }
+
+ /**
+ * To handle a scan request, we first try to do readahead (which might
+ * cause a range read to be issued to the underlying persistence
+ * manager). The readahead will put a stub in the cache, if the message
+ * is not already present in the cache. The scan callback that is part
+ * of the scan request is added to this stub, and will be called later
+ * when the message arrives as a result of the range scan issued to the
+ * underlying persistence manager.
+ */
+
+ public void performRequest() {
+
+ RangeScanRequest readAheadRequest = doReadAhead(request);
+
+ // Read ahead must have installed at least a stub for us, so this
+ // can't be null
+ CacheValue cacheValue = cache.get(new CacheKey(request.getTopic(), request.getStartSeqId()));
+
+ // Add our callback to the stub. If the cache value was already a
+ // concrete message, the callback will be called right away
+ cacheValue.addCallback(request.getCallback(), request.getCtx());
+
+ if (readAheadRequest != null) {
+ realPersistenceManager.scanMessages(readAheadRequest);
+ }
+ }
+ }
+
+ protected class ShutdownCacheRequest implements CacheRequest {
+ // This is a simple type of CacheRequest we will enqueue when
+ // the PubSubServer is shut down and we want to stop the ReadAheadCache
+ // thread.
+ public void performRequest() {
+ keepRunning = false;
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallback.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,63 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+public interface ScanCallback {
+
+ enum ReasonForFinish {
+ NO_MORE_MESSAGES, SIZE_LIMIT_EXCEEDED, NUM_MESSAGES_LIMIT_EXCEEDED
+ };
+
+ /**
+ * This method is called when a message is read from the persistence layer
+ * as part of a scan. The message just read is handed to this listener which
+ * can then take the desired action on it. The return value from the method
+ * indicates whether the scan should continue or not.
+ *
+ * @param ctx
+ * The context for the callback
+ * @param message
+ * The message just scanned from the log
+ * @return true if the scan should continue, false otherwise
+ */
+ public void messageScanned(Object ctx, Message message);
+
+ /**
+ * This method is called when the scan finishes
+ *
+ *
+ * @param ctx
+ * @param reason
+ */
+
+ public abstract void scanFinished(Object ctx, ReasonForFinish reason);
+
+ /**
+ * This method is called when the operation failed due to some reason. The
+ * reason for failure is passed in.
+ *
+ * @param ctx
+ * The context for the callback
+ * @param exception
+ * The reason for the failure of the scan
+ */
+ public abstract void scanFailed(Object ctx, Exception exception);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanCallbackWithContext.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+public class ScanCallbackWithContext {
+ ScanCallback scanCallback;
+ Object ctx;
+
+ public ScanCallbackWithContext(ScanCallback callback, Object ctx) {
+ this.scanCallback = callback;
+ this.ctx = ctx;
+ }
+
+ public ScanCallback getScanCallback() {
+ return scanCallback;
+ }
+
+ public Object getCtx() {
+ return ctx;
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/persistence/ScanRequest.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.persistence;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+
+/**
+ * Encapsulates a request for reading a single message. The message on the given
+ * topic <b>at</b> the given seqId is scanned. A call-back {@link ScanCallback}
+ * is provided. When the message is scanned, the
+ * {@link ScanCallback#messageScanned(Object, Message)} method is called. Since
+ * there is only 1 record to be scanned the
+ * {@link ScanCallback#operationFinished(Object)} method may not be called since
+ * its redundant.
+ * {@link ScanCallback#scanFailed(Object, org.apache.hedwig.exceptions.PubSubException)}
+ * method is called in case of error.
+ *
+ */
+public class ScanRequest {
+ ByteString topic;
+ long startSeqId;
+ ScanCallback callback;
+ Object ctx;
+
+ public ScanRequest(ByteString topic, long startSeqId, ScanCallback callback, Object ctx) {
+ this.topic = topic;
+ this.startSeqId = startSeqId;
+ this.callback = callback;
+ this.ctx = ctx;
+ }
+
+ public ByteString getTopic() {
+ return topic;
+ }
+
+ public long getStartSeqId() {
+ return startSeqId;
+ }
+
+ public ScanCallback getCallback() {
+ return callback;
+ }
+
+ public Object getCtx() {
+ return ctx;
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ChannelTracker.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,124 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import java.util.HashMap;
+import java.util.LinkedList;
+import java.util.List;
+
+import org.jboss.netty.channel.Channel;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
+import org.apache.hedwig.util.Callback;
+
+public class ChannelTracker implements ChannelDisconnectListener {
+ HashMap<TopicSubscriber, Channel> topicSub2Channel = new HashMap<TopicSubscriber, Channel>();
+ HashMap<Channel, List<TopicSubscriber>> channel2TopicSubs = new HashMap<Channel, List<TopicSubscriber>>();
+ Subscriber subscriber;
+
+ public ChannelTracker(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ static Callback<Void> noOpCallback = new Callback<Void>() {
+ public void operationFailed(Object ctx, PubSubException exception) {
+ };
+
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ };
+ };
+
+ public synchronized void channelDisconnected(Channel channel) {
+ List<TopicSubscriber> topicSubs = channel2TopicSubs.remove(channel);
+
+ if (topicSubs == null) {
+ return;
+ }
+
+ for (TopicSubscriber topicSub : topicSubs) {
+ topicSub2Channel.remove(topicSub);
+ subscriber.asyncCloseSubscription(topicSub.getTopic(), topicSub.getSubscriberId(), noOpCallback, null);
+ }
+ }
+
+ public synchronized void subscribeSucceeded(TopicSubscriber topicSubscriber, Channel channel)
+ throws TopicBusyException {
+
+ if (!channel.isConnected()) {
+ // channel got disconnected while we were processing the
+ // subscribe request, nothing much we can do in this case
+ return;
+ }
+
+ if (topicSub2Channel.containsKey(topicSubscriber)) {
+ TopicBusyException pse = new PubSubException.TopicBusyException(
+ "subscription for this topic, subscriberId is already being served on a different channel");
+ throw pse;
+ }
+
+ topicSub2Channel.put(topicSubscriber, channel);
+
+ List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel);
+
+ if (topicSubs == null) {
+ topicSubs = new LinkedList<TopicSubscriber>();
+ channel2TopicSubs.put(channel, topicSubs);
+ }
+ topicSubs.add(topicSubscriber);
+
+ }
+
+ public synchronized void aboutToUnsubscribe(ByteString topic, ByteString subscriberId) {
+ TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
+
+ Channel channel = topicSub2Channel.remove(topicSub);
+
+ if (channel != null) {
+ List<TopicSubscriber> topicSubs = channel2TopicSubs.get(channel);
+ if (topicSubs != null) {
+ topicSubs.remove(topicSub);
+ }
+ }
+ }
+
+ public synchronized void checkChannelMatches(ByteString topic, ByteString subscriberId, Channel channel)
+ throws PubSubException {
+ Channel subscribedChannel = getChannel(topic, subscriberId);
+
+ if (subscribedChannel == null) {
+ throw new PubSubException.ClientNotSubscribedException(
+ "Can't start delivery since client is not subscribed");
+ }
+
+ if (subscribedChannel != channel) {
+ throw new PubSubException.TopicBusyException(
+ "Can't start delivery since client is subscribed on a different channel");
+ }
+
+ }
+
+ public synchronized Channel getChannel(ByteString topic, ByteString subscriberId) {
+ return topicSub2Channel.get(new TopicSubscriber(topic, subscriberId));
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/HedwigProxy.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,159 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import java.io.File;
+import java.lang.Thread.UncaughtExceptionHandler;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.LinkedBlockingQueue;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Log4JLoggerFactory;
+
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.PubSubServer;
+import org.apache.hedwig.server.netty.PubSubServerPipelineFactory;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+
+public class HedwigProxy {
+ static final Logger logger = Logger.getLogger(HedwigProxy.class);
+
+ HedwigClient client;
+ ServerSocketChannelFactory serverSocketChannelFactory;
+ ChannelGroup allChannels;
+ Map<OperationType, Handler> handlers;
+ ProxyConfiguration cfg;
+
+ public HedwigProxy(final ProxyConfiguration cfg, final UncaughtExceptionHandler exceptionHandler)
+ throws InterruptedException {
+ this.cfg = cfg;
+
+ ThreadGroup tg = new ThreadGroup("hedwigproxy") {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ exceptionHandler.uncaughtException(t, e);
+ }
+ };
+
+ final LinkedBlockingQueue<Boolean> queue = new LinkedBlockingQueue<Boolean>();
+
+ new Thread(tg, new Runnable() {
+ @Override
+ public void run() {
+ client = new HedwigClient(cfg);
+
+ serverSocketChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(),
+ Executors.newCachedThreadPool());
+ initializeHandlers();
+ initializeNetty();
+
+ queue.offer(true);
+ }
+ }).start();
+
+ queue.take();
+ }
+
+ public HedwigProxy(ProxyConfiguration conf) throws InterruptedException {
+ this(conf, new TerminateJVMExceptionHandler());
+ }
+
+ protected void initializeHandlers() {
+ handlers = new HashMap<OperationType, Handler>();
+ ChannelTracker tracker = new ChannelTracker(client.getSubscriber());
+
+ handlers.put(OperationType.PUBLISH, new ProxyPublishHander(client.getPublisher()));
+ handlers.put(OperationType.SUBSCRIBE, new ProxySubscribeHandler(client.getSubscriber(), tracker));
+ handlers.put(OperationType.UNSUBSCRIBE, new ProxyUnsubscribeHandler(client.getSubscriber(), tracker));
+ handlers.put(OperationType.CONSUME, new ProxyConsumeHandler(client.getSubscriber()));
+ handlers.put(OperationType.STOP_DELIVERY, new ProxyStopDeliveryHandler(client.getSubscriber(), tracker));
+ handlers.put(OperationType.START_DELIVERY, new ProxyStartDeliveryHandler(client.getSubscriber(), tracker));
+
+ }
+
+ protected void initializeNetty() {
+ InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+ allChannels = new DefaultChannelGroup("hedwigproxy");
+ ServerBootstrap bootstrap = new ServerBootstrap(serverSocketChannelFactory);
+ UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, false);
+ PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, null, cfg
+ .getMaximumMessageSize());
+
+ bootstrap.setPipelineFactory(pipeline);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+
+ // Bind and start to accept incoming connections.
+ allChannels.add(bootstrap.bind(new InetSocketAddress(cfg.getProxyPort())));
+ logger.info("Going into receive loop");
+ }
+
+ public void shutdown() {
+ allChannels.close().awaitUninterruptibly();
+ client.stop();
+ serverSocketChannelFactory.releaseExternalResources();
+ }
+
+ // the following method only exists for unit-testing purposes, should go
+ // away once we make start delivery totally server-side
+ public Handler getStartDeliveryHandler(){
+ return handlers.get(OperationType.START_DELIVERY);
+ }
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ logger.info("Attempting to start Hedwig Proxy");
+ ProxyConfiguration conf = new ProxyConfiguration();
+ if (args.length > 0) {
+ String confFile = args[0];
+ try {
+ conf.loadConf(new File(confFile).toURI().toURL());
+ } catch (MalformedURLException e) {
+ String msg = "Could not open configuration file: " + confFile;
+ PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_INVALID_CONF_FILE);
+ } catch (ConfigurationException e) {
+ String msg = "Malformed configuration file: " + confFile;
+ PubSubServer.errorMsgAndExit(msg, e, PubSubServer.RC_MISCONFIGURED);
+ }
+ logger.info("Using configuration file " + confFile);
+ }
+ try {
+ new HedwigProxy(conf);
+ } catch (Throwable t) {
+ PubSubServer.errorMsgAndExit("Error during startup", t, PubSubServer.RC_OTHER);
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConfiguration.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,36 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+
+public class ProxyConfiguration extends ClientConfiguration{
+
+ protected static String PROXY_PORT = "proxy_port";
+ protected static String MAX_MESSAGE_SIZE = "max_message_size";
+
+ public int getProxyPort(){
+ return conf.getInt(PROXY_PORT, 9099);
+ }
+
+ @Override
+ public int getMaximumMessageSize() {
+ return conf.getInt(MAX_MESSAGE_SIZE, 1258291); /* 1.2M */
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyConsumeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,57 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+
+public class ProxyConsumeHandler implements Handler {
+
+ static final Logger logger = Logger.getLogger(ProxyConsumeHandler.class);
+
+ Subscriber subscriber;
+
+ public ProxyConsumeHandler(Subscriber subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void handleRequest(PubSubRequest request, Channel channel) {
+ if (!request.hasConsumeRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing consume request data");
+ return;
+ }
+
+ ConsumeRequest consumeRequest = request.getConsumeRequest();
+ try {
+ subscriber.consume(request.getTopic(), consumeRequest.getSubscriberId(), consumeRequest.getMsgId());
+ } catch (ClientNotSubscribedException e) {
+ // ignore
+ logger.warn("Unexpected consume request", e);
+ }
+
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyPublishHander.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,62 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.client.api.Publisher;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PublishRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxyPublishHander implements Handler {
+ Publisher publisher;
+
+ public ProxyPublishHander(Publisher publisher) {
+ this.publisher = publisher;
+ }
+
+ @Override
+ public void handleRequest(final PubSubRequest request, final Channel channel) {
+ if (!request.hasPublishRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing publish request data");
+ return;
+ }
+
+ final PublishRequest publishRequest = request.getPublishRequest();
+
+ publisher.asyncPublish(request.getTopic(), publishRequest.getMsg(), new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+ }
+ }, null);
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStartDeliveryHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,129 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.MessageHandler;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxyStartDeliveryHandler implements Handler {
+
+ static final Logger logger = Logger.getLogger(ProxyStartDeliveryHandler.class);
+
+ Subscriber subscriber;
+ ChannelTracker tracker;
+
+ public ProxyStartDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
+ this.subscriber = subscriber;
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void handleRequest(PubSubRequest request, Channel channel) {
+
+ if (!request.hasStartDeliveryRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing start delivery request data");
+ return;
+ }
+
+ final ByteString topic = request.getTopic();
+ final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
+
+ synchronized (tracker) {
+ // try {
+ // tracker.checkChannelMatches(topic, subscriberId, channel);
+ // } catch (PubSubException e) {
+ // channel.write(PubSubResponseUtils.getResponseForException(e,
+ // request.getTxnId()));
+ // return;
+ // }
+
+ final Channel subscribedChannel = tracker.getChannel(topic, subscriberId);
+
+ if (subscribedChannel == null) {
+ channel.write(PubSubResponseUtils.getResponseForException(
+ new PubSubException.ClientNotSubscribedException("no subscription to start delivery on"),
+ request.getTxnId()));
+ return;
+ }
+
+ MessageHandler handler = new MessageHandler() {
+ @Override
+ public void consume(ByteString topic, ByteString subscriberId, Message msg,
+ final Callback<Void> callback, final Object context) {
+
+ PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(
+ ProtocolVersion.VERSION_ONE).setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(msg)
+ .setTopic(topic).setSubscriberId(subscriberId).build();
+
+ ChannelFuture future = subscribedChannel.write(response);
+
+ future.addListener(new ChannelFutureListener() {
+ @Override
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (!future.isSuccess()) {
+ // ignoring this failure, because this will
+ // only happen due to channel disconnect.
+ // Channel disconnect will in turn stop
+ // delivery, and stop these errors
+ return;
+ }
+
+ // Tell the hedwig client, that it can send me
+ // more messages
+ callback.operationFinished(context, null);
+ }
+ });
+ }
+ };
+
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+
+ try {
+ subscriber.startDelivery(topic, subscriberId, handler);
+ } catch (ClientNotSubscribedException e) {
+ // This should not happen, since we already checked the correct
+ // channel and so on
+ logger.fatal("Unexpected: No subscription when attempting to start delivery", e);
+ throw new RuntimeException(e);
+ }
+
+
+
+ }
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyStopDeliveryHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,73 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+
+public class ProxyStopDeliveryHandler implements Handler {
+
+ static final Logger logger = Logger.getLogger(ProxyStopDeliveryHandler.class);
+
+ Subscriber subscriber;
+ ChannelTracker tracker;
+
+ public ProxyStopDeliveryHandler(Subscriber subscriber, ChannelTracker tracker) {
+ this.subscriber = subscriber;
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void handleRequest(PubSubRequest request, Channel channel) {
+ if (!request.hasStopDeliveryRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing stop delivery request data");
+ return;
+ }
+
+ final ByteString topic = request.getTopic();
+ final ByteString subscriberId = request.getStartDeliveryRequest().getSubscriberId();
+
+ synchronized (tracker) {
+ try {
+ tracker.checkChannelMatches(topic, subscriberId, channel);
+ } catch (PubSubException e) {
+ // intentionally ignore this error, since stop delivery doesn't
+ // send back a response
+ return;
+ }
+
+ try {
+ subscriber.stopDelivery(topic, subscriberId);
+ } catch (ClientNotSubscribedException e) {
+ // This should not happen, since we already checked the correct
+ // channel and so on
+ logger.warn("Unexpected: No subscription when attempting to stop delivery", e);
+ }
+ }
+
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxySubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,82 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.TopicBusyException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxySubscribeHandler implements Handler, ChannelDisconnectListener {
+
+ static final Logger logger = Logger.getLogger(ProxySubscribeHandler.class);
+
+ Subscriber subscriber;
+ ChannelTracker tracker;
+
+ public ProxySubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
+ this.subscriber = subscriber;
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void channelDisconnected(Channel channel) {
+ tracker.channelDisconnected(channel);
+ }
+
+ @Override
+ public void handleRequest(final PubSubRequest request, final Channel channel) {
+ if (!request.hasSubscribeRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing subscribe request data");
+ return;
+ }
+
+ SubscribeRequest subRequest = request.getSubscribeRequest();
+ final TopicSubscriber topicSubscriber = new TopicSubscriber(request.getTopic(), subRequest.getSubscriberId());
+
+ subscriber.asyncSubscribe(topicSubscriber.getTopic(), subRequest.getSubscriberId(), subRequest
+ .getCreateOrAttach(), new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ try {
+ tracker.subscribeSucceeded(topicSubscriber, channel);
+ } catch (TopicBusyException e) {
+ channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId()));
+ return;
+ }
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+ }
+ }, null);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/proxy/ProxyUnsubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,74 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.proxy;
+
+import org.jboss.netty.channel.Channel;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.api.Subscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.util.Callback;
+
+public class ProxyUnsubscribeHandler implements Handler {
+
+ Subscriber subscriber;
+ ChannelTracker tracker;
+
+ public ProxyUnsubscribeHandler(Subscriber subscriber, ChannelTracker tracker) {
+ this.subscriber = subscriber;
+ this.tracker = tracker;
+ }
+
+ @Override
+ public void handleRequest(final PubSubRequest request, final Channel channel) {
+ if (!request.hasUnsubscribeRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing unsubscribe request data");
+ return;
+ }
+
+ ByteString topic = request.getTopic();
+ ByteString subscriberId = request.getUnsubscribeRequest().getSubscriberId();
+
+ synchronized (tracker) {
+
+ // Even if unsubscribe fails, the hedwig client closes the channel
+ // on which the subscription is being served. Hence better to tell
+ // the tracker beforehand that this subscription is no longer served
+ tracker.aboutToUnsubscribe(topic, subscriberId);
+
+ subscriber.asyncUnsubscribe(topic, subscriberId, new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+ }
+ }, null);
+ }
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClient.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,48 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.client.netty.HedwigClient;
+
+/**
+ * This is a hub specific implementation of the HedwigClient. All this does
+ * though is to override the HedwigSubscriber with the hub specific child class.
+ * Creating this class so we can call the protected method in the parent to set
+ * the subscriber since we don't want to expose that API to the public.
+ */
+public class HedwigHubClient extends HedwigClient {
+
+ // Constructor when we already have a ChannelFactory instantiated.
+ public HedwigHubClient(ClientConfiguration cfg, ClientSocketChannelFactory channelFactory) {
+ super(cfg, channelFactory);
+ // Override the type of HedwigSubscriber with the hub specific one.
+ setSubscriber(new HedwigHubSubscriber(this));
+ }
+
+ // Constructor when we don't have a ChannelFactory. The super constructor
+ // will create one for us.
+ public HedwigHubClient(ClientConfiguration cfg) {
+ super(cfg);
+ // Override the type of HedwigSubscriber with the hub specific one.
+ setSubscriber(new HedwigHubSubscriber(this));
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubClientFactory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,60 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+
+import org.apache.hedwig.client.conf.ClientConfiguration;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public class HedwigHubClientFactory {
+
+ private final ServerConfiguration cfg;
+ private final ClientSocketChannelFactory channelFactory;
+
+ // Constructor that takes in a ServerConfiguration and a ChannelFactory
+ // so we can reuse it for all Clients created here.
+ public HedwigHubClientFactory(ServerConfiguration cfg, ClientSocketChannelFactory channelFactory) {
+ this.cfg = cfg;
+ this.channelFactory = channelFactory;
+ }
+
+ /**
+ * Manufacture a hub client whose default server to connect to is the input
+ * HedwigSocketAddress hub.
+ *
+ * @param hub
+ * The hub in another region to connect to.
+ */
+ HedwigHubClient create(final HedwigSocketAddress hub) {
+ // Create a hub specific version of the client to use
+ return new HedwigHubClient(new ClientConfiguration() {
+ @Override
+ protected HedwigSocketAddress getDefaultServerHedwigSocketAddress() {
+ return hub;
+ }
+
+ @Override
+ public boolean isSSLEnabled() {
+ return cfg.isInterRegionSSLEnabled();
+ }
+ }, channelFactory);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/regions/HedwigHubSubscriber.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,70 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.regions;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
+import org.apache.hedwig.client.netty.HedwigClient;
+import org.apache.hedwig.client.netty.HedwigSubscriber;
+import org.apache.hedwig.exceptions.PubSubException.ClientAlreadySubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.ClientNotSubscribedException;
+import org.apache.hedwig.exceptions.PubSubException.CouldNotConnectException;
+import org.apache.hedwig.exceptions.PubSubException.ServiceDownException;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
+import org.apache.hedwig.util.Callback;
+
+/**
+ * This is a hub specific child class of the HedwigSubscriber. The main thing is
+ * does is wrap the public subscribe/unsubscribe methods by calling the
+ * overloaded protected ones passing in a true value for the input boolean
+ * parameter isHub. That will just make sure we validate the subscriberId
+ * passed, ensuring it is of the right format either for a local or hub
+ * subscriber.
+ */
+public class HedwigHubSubscriber extends HedwigSubscriber {
+
+ public HedwigHubSubscriber(HedwigClient client) {
+ super(client);
+ }
+
+ @Override
+ public void subscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode)
+ throws CouldNotConnectException, ClientAlreadySubscribedException, ServiceDownException,
+ InvalidSubscriberIdException {
+ subscribe(topic, subscriberId, mode, true);
+ }
+
+ @Override
+ public void asyncSubscribe(ByteString topic, ByteString subscriberId, CreateOrAttach mode, Callback<Void> callback,
+ Object context) {
+ asyncSubscribe(topic, subscriberId, mode, callback, context, true);
+ }
+
+ @Override
+ public void unsubscribe(ByteString topic, ByteString subscriberId) throws CouldNotConnectException,
+ ClientNotSubscribedException, ServiceDownException, InvalidSubscriberIdException {
+ unsubscribe(topic, subscriberId, true);
+ }
+
+ @Override
+ public void asyncUnsubscribe(final ByteString topic, final ByteString subscriberId, final Callback<Void> callback,
+ final Object context) {
+ asyncUnsubscribe(topic, subscriberId, callback, context, true);
+ }
+
+}