You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC
svn commit: r987314 [10/16] - in /hadoop/zookeeper/trunk: ./
src/contrib/hedwig/ src/contrib/hedwig/client/
src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/
src/contrib/hedwig/client/src/main/cpp/
src/contrib/hedwig/client/src/main/cp...
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.server.persistence.Factory;
+import org.apache.hedwig.server.persistence.MapMethods;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.ScanCallback;
+import org.apache.hedwig.server.persistence.ScanRequest;
+import org.apache.hedwig.server.subscriptions.MessageFilter;
+
+public class FIFODeliveryManager implements Runnable, DeliveryManager {
+
+ protected static final Logger logger = Logger.getLogger(FIFODeliveryManager.class);
+
+ protected interface DeliveryManagerRequest {
+ public void performRequest();
+ }
+
+ /**
+ * the main queue that the single-threaded delivery manager works off of
+ */
+ BlockingQueue<DeliveryManagerRequest> requestQueue = new LinkedBlockingQueue<DeliveryManagerRequest>();
+
+ /**
+ * The queue of all subscriptions that are facing a transient error either
+ * in scanning from the persistence manager, or in sending to the consumer
+ */
+ Queue<ActiveSubscriberState> retryQueue = new ConcurrentLinkedQueue<ActiveSubscriberState>();
+
+ /**
+ * Stores a mapping from topic to the delivery pointers on the topic. The
+ * delivery pointers are stored in a sorted map from seq-id to the set of
+ * subscribers at that seq-id
+ */
+ Map<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>> perTopicDeliveryPtrs;
+
+ /**
+ * Mapping from delivery end point to the subscriber state that we are
+ * serving at that end point. This prevents us e.g., from serving two
+ * subscriptions to the same endpoint
+ */
+ Map<TopicSubscriber, ActiveSubscriberState> subscriberStates;
+
+ private PersistenceManager persistenceMgr;
+
+ private ServerConfiguration cfg;
+
+ // Boolean indicating if this thread should continue running. This is used
+ // when we want to stop the thread during a PubSubServer shutdown.
+ protected boolean keepRunning = true;
+
+ public FIFODeliveryManager(PersistenceManager persistenceMgr, ServerConfiguration cfg) {
+ this.persistenceMgr = persistenceMgr;
+ perTopicDeliveryPtrs = new HashMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>>();
+ subscriberStates = new HashMap<TopicSubscriber, ActiveSubscriberState>();
+ new Thread(this, "DeliveryManagerThread").start();
+ this.cfg = cfg;
+ }
+
+ /**
+ * ===================================================================== Our
+ * usual enqueue function, stop if error because of unbounded queue, should
+ * never happen
+ *
+ */
+ protected void enqueueWithoutFailure(DeliveryManagerRequest request) {
+ if (!requestQueue.offer(request)) {
+ throw new UnexpectedError("Could not enqueue object: " + request + " to delivery manager request queue.");
+ }
+ }
+
+ /**
+ * ====================================================================
+ * Public interface of the delivery manager
+ */
+
+ /**
+ * Tells the delivery manager to start sending out messages for a particular
+ * subscription
+ *
+ * @param topic
+ * @param subscriberId
+ * @param seqIdToStartFrom
+ * Message sequence-id from where delivery should be started
+ * @param endPoint
+ * The delivery end point to which send messages to
+ * @param filter
+ * Only messages passing this filter should be sent to this
+ * subscriber
+ * @param isHubSubscriber
+ * There are some seq-id intricacies. To a hub subscriber, we
+ * should send only a subset of the seq-id vector
+ */
+ public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+ DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+
+ ActiveSubscriberState subscriber = new ActiveSubscriberState(topic, subscriberId, seqIdToStartFrom
+ .getLocalComponent() - 1, endPoint, filter, isHubSubscriber);
+
+ enqueueWithoutFailure(subscriber);
+ }
+
+ public void stopServingSubscriber(ByteString topic, ByteString subscriberId) {
+ ActiveSubscriberState subState = subscriberStates.get(new TopicSubscriber(topic, subscriberId));
+
+ if (subState != null) {
+ stopServingSubscriber(subState);
+ }
+ }
+
+ /**
+ * Due to some error or disconnection or unsusbcribe, someone asks us to
+ * stop serving a particular endpoint
+ *
+ * @param endPoint
+ */
+ protected void stopServingSubscriber(ActiveSubscriberState subscriber) {
+ enqueueWithoutFailure(new StopServingSubscriber(subscriber));
+ }
+
+ /**
+ * Instructs the delivery manager to backoff on the given subscriber and
+ * retry sending after some time
+ *
+ * @param subscriber
+ */
+
+ public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) {
+
+ subscriber.setLastScanErrorTime(System.currentTimeMillis());
+
+ if (!retryQueue.offer(subscriber)) {
+ throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
+ }
+ }
+
+ /**
+ * Instructs the delivery manager to move the delivery pointer for a given
+ * subscriber
+ *
+ * @param subscriber
+ * @param prevSeqId
+ * @param newSeqId
+ */
+ public void moveDeliveryPtrForward(ActiveSubscriberState subscriber, long prevSeqId, long newSeqId) {
+ enqueueWithoutFailure(new DeliveryPtrMove(subscriber, prevSeqId, newSeqId));
+ }
+
+ /*
+ * ==========================================================================
+ * == End of public interface, internal machinery begins.
+ */
+ public void run() {
+ while (keepRunning) {
+ DeliveryManagerRequest request = null;
+
+ try {
+ // We use a timeout of 1 second, so that we can wake up once in
+ // a while to check if there is something in the retry queue.
+ request = requestQueue.poll(1, TimeUnit.SECONDS);
+ } catch (InterruptedException e) {
+ Thread.currentThread().interrupt();
+ }
+
+ // First retry any subscriptions that had failed and need a retry
+ retryErroredSubscribers();
+
+ if (request == null) {
+ continue;
+ }
+
+ request.performRequest();
+
+ }
+ }
+
+ /**
+ * Stop method which will enqueue a ShutdownDeliveryManagerRequest.
+ */
+ public void stop() {
+ enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
+ }
+
+ protected void retryErroredSubscribers() {
+ long lastInterestingFailureTime = System.currentTimeMillis() - cfg.getScanBackoffPeriodMs();
+ ActiveSubscriberState subscriber;
+
+ while ((subscriber = retryQueue.peek()) != null) {
+ if (subscriber.getLastScanErrorTime() > lastInterestingFailureTime) {
+ // Not enough time has elapsed yet, will retry later
+ // Since the queue is fifo, no need to check later items
+ return;
+ }
+
+ // retry now
+ subscriber.deliverNextMessage();
+ retryQueue.poll();
+ }
+ }
+
+ protected void removeDeliveryPtr(ActiveSubscriberState subscriber, Long seqId, boolean isAbsenceOk,
+ boolean pruneTopic) {
+
+ assert seqId != null;
+
+ // remove this subscriber from the delivery pointers data structure
+ ByteString topic = subscriber.getTopic();
+ SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
+
+ if (deliveryPtrs == null && !isAbsenceOk) {
+ throw new UnexpectedError("No delivery pointers found while disconnecting " + "channel for topic:" + topic);
+ }
+
+ if (!MapMethods.removeFromMultiMap(deliveryPtrs, seqId, subscriber) && !isAbsenceOk) {
+
+ throw new UnexpectedError("Could not find subscriber:" + subscriber + " at the expected delivery pointer");
+ }
+
+ if (pruneTopic && deliveryPtrs.isEmpty()) {
+ perTopicDeliveryPtrs.remove(topic);
+ }
+
+ }
+
+ protected long getMinimumSeqId(ByteString topic) {
+ SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
+
+ if (deliveryPtrs == null || deliveryPtrs.isEmpty()) {
+ return Long.MAX_VALUE - 1;
+ }
+ return deliveryPtrs.firstKey();
+ }
+
+ protected void addDeliveryPtr(ActiveSubscriberState subscriber, Long seqId) {
+
+ // If this topic doesn't exist in the per-topic delivery pointers table,
+ // create an entry for it
+ SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = MapMethods.getAfterInsertingIfAbsent(
+ perTopicDeliveryPtrs, subscriber.getTopic(), TreeMapLongToSetSubscriberFactory.instance);
+
+ MapMethods.addToMultiMap(deliveryPtrs, seqId, subscriber, HashMapSubscriberFactory.instance);
+ }
+
+ public class ActiveSubscriberState implements ScanCallback, DeliveryCallback, DeliveryManagerRequest {
+ ByteString topic;
+ ByteString subscriberId;
+ long lastLocalSeqIdDelivered;
+ boolean connected = true;
+ DeliveryEndPoint deliveryEndPoint;
+ long lastScanErrorTime = -1;
+ long localSeqIdDeliveringNow;
+ long lastSeqIdCommunicatedExternally;
+ // TODO make use of these variables
+ MessageFilter filter;
+ boolean isHubSubscriber;
+ final static int SEQ_ID_SLACK = 10;
+
+ public ActiveSubscriberState(ByteString topic, ByteString subscriberId, long lastLocalSeqIdDelivered,
+ DeliveryEndPoint deliveryEndPoint, MessageFilter filter, boolean isHubSubscriber) {
+ this.topic = topic;
+ this.subscriberId = subscriberId;
+ this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
+ this.deliveryEndPoint = deliveryEndPoint;
+ this.filter = filter;
+ this.isHubSubscriber = isHubSubscriber;
+ }
+
+ public void setNotConnected() {
+ this.connected = false;
+ deliveryEndPoint.close();
+ }
+
+ public ByteString getTopic() {
+ return topic;
+ }
+
+ public long getLastLocalSeqIdDelivered() {
+ return lastLocalSeqIdDelivered;
+ }
+
+ public long getLastScanErrorTime() {
+ return lastScanErrorTime;
+ }
+
+ public void setLastScanErrorTime(long lastScanErrorTime) {
+ this.lastScanErrorTime = lastScanErrorTime;
+ }
+
+ protected boolean isConnected() {
+ return connected;
+ }
+
+ public void deliverNextMessage() {
+
+ if (!isConnected()) {
+ return;
+ }
+
+ localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
+
+ ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
+ /* callback= */this, /* ctx= */null);
+
+ persistenceMgr.scanSingleMessage(scanRequest);
+ }
+
+ /**
+ * ===============================================================
+ * {@link ScanCallback} methods
+ */
+
+ public void messageScanned(Object ctx, Message message) {
+ if (!connected) {
+ return;
+ }
+
+ // We're using a simple all-to-all network topology, so no region
+ // should ever need to forward messages to any other region.
+ // Otherwise, with the current logic, messages will end up
+ // ping-pong-ing back and forth between regions with subscriptions
+ // to each other without termination (or in any other cyclic
+ // configuration).
+ if (isHubSubscriber && !message.getSrcRegion().equals(cfg.getMyRegionByteString())) {
+ sendingFinished();
+ return;
+ }
+
+ /**
+ * The method below will invoke our sendingFinished() method when
+ * done
+ */
+ PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
+ .setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(message).build();
+
+ deliveryEndPoint.send(response, //
+ // callback =
+ this);
+
+ }
+
+ public void scanFailed(Object ctx, Exception exception) {
+ if (!connected) {
+ return;
+ }
+
+ // wait for some time and then retry
+ retryErroredSubscriberAfterDelay(this);
+ }
+
+ public void scanFinished(Object ctx, ReasonForFinish reason) {
+ // no-op
+ }
+
+ /**
+ * ===============================================================
+ * {@link DeliveryCallback} methods
+ */
+ public void sendingFinished() {
+ if (!connected) {
+ return;
+ }
+
+ lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
+
+ if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK){
+ // Note: The order of the next 2 statements is important. We should
+ // submit a request to change our delivery pointer only *after* we
+ // have actually changed it. Otherwise, there is a race condition
+ // with removal of this channel, w.r.t, maintaining the deliveryPtrs
+ // tree map.
+ long prevId = lastSeqIdCommunicatedExternally;
+ lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+ moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
+ }
+ deliverNextMessage();
+ }
+
+ public long getLastSeqIdCommunicatedExternally() {
+ return lastSeqIdCommunicatedExternally;
+ }
+
+
+ public void permanentErrorOnSend() {
+ stopServingSubscriber(this);
+ }
+
+ public void transientErrorOnSend() {
+ retryErroredSubscriberAfterDelay(this);
+ }
+
+ /**
+ * ===============================================================
+ * {@link DeliveryManagerRequest} methods
+ */
+ public void performRequest() {
+
+ // Put this subscriber in the channel to subscriber mapping
+ ActiveSubscriberState prevSubscriber = subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
+
+ if (prevSubscriber != null) {
+ stopServingSubscriber(prevSubscriber);
+ }
+
+ lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+ addDeliveryPtr(this, lastLocalSeqIdDelivered);
+
+ deliverNextMessage();
+ };
+
+ @Override
+ public String toString() {
+ StringBuilder sb = new StringBuilder();
+ sb.append("Topic: ");
+ sb.append(topic.toStringUtf8());
+ sb.append("DeliveryPtr: ");
+ sb.append(lastLocalSeqIdDelivered);
+ return sb.toString();
+
+ }
+ }
+
+ protected class StopServingSubscriber implements DeliveryManagerRequest {
+ ActiveSubscriberState subscriber;
+
+ public StopServingSubscriber(ActiveSubscriberState subscriber) {
+ this.subscriber = subscriber;
+ }
+
+ @Override
+ public void performRequest() {
+
+ // This will automatically stop delivery, and disconnect the channel
+ subscriber.setNotConnected();
+
+ // if the subscriber has moved on, a move request for its delivery
+ // pointer must be pending in the request queue. Note that the
+ // subscriber first changes its delivery pointer and then submits a
+ // request to move so this works.
+ removeDeliveryPtr(subscriber, subscriber.getLastSeqIdCommunicatedExternally(), //
+ // isAbsenceOk=
+ true,
+ // pruneTopic=
+ true);
+ }
+
+ }
+
+ protected class DeliveryPtrMove implements DeliveryManagerRequest {
+
+ ActiveSubscriberState subscriber;
+ Long oldSeqId;
+ Long newSeqId;
+
+ public DeliveryPtrMove(ActiveSubscriberState subscriber, Long oldSeqId, Long newSeqId) {
+ this.subscriber = subscriber;
+ this.oldSeqId = oldSeqId;
+ this.newSeqId = newSeqId;
+ }
+
+ @Override
+ public void performRequest() {
+ ByteString topic = subscriber.getTopic();
+ long prevMinSeqId = getMinimumSeqId(topic);
+
+ if (subscriber.isConnected()) {
+ removeDeliveryPtr(subscriber, oldSeqId, //
+ // isAbsenceOk=
+ false,
+ // pruneTopic=
+ false);
+
+ addDeliveryPtr(subscriber, newSeqId);
+ } else {
+ removeDeliveryPtr(subscriber, oldSeqId, //
+ // isAbsenceOk=
+ true,
+ // pruneTopic=
+ true);
+ }
+
+ long nowMinSeqId = getMinimumSeqId(topic);
+
+ if (nowMinSeqId > prevMinSeqId) {
+ persistenceMgr.deliveredUntil(topic, nowMinSeqId);
+ }
+ }
+ }
+
+ protected class ShutdownDeliveryManagerRequest implements DeliveryManagerRequest {
+ // This is a simple type of Request we will enqueue when the
+ // PubSubServer is shut down and we want to stop the DeliveryManager
+ // thread.
+ public void performRequest() {
+ keepRunning = false;
+ }
+ }
+
+ /**
+ * ====================================================================
+ *
+ * Dumb factories for our map methods
+ */
+ protected static class TreeMapLongToSetSubscriberFactory implements
+ Factory<SortedMap<Long, Set<ActiveSubscriberState>>> {
+ static TreeMapLongToSetSubscriberFactory instance = new TreeMapLongToSetSubscriberFactory();
+
+ @Override
+ public SortedMap<Long, Set<ActiveSubscriberState>> newInstance() {
+ return new TreeMap<Long, Set<ActiveSubscriberState>>();
+ }
+ }
+
+ protected static class HashMapSubscriberFactory implements Factory<Set<ActiveSubscriberState>> {
+ static HashMapSubscriberFactory instance = new HashMapSubscriberFactory();
+
+ @Override
+ public Set<ActiveSubscriberState> newInstance() {
+ return new HashSet<ActiveSubscriberState>();
+ }
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public abstract class BaseHandler implements Handler{
+
+ protected TopicManager topicMgr;
+ protected ServerConfiguration cfg;
+
+ protected BaseHandler(TopicManager tm, ServerConfiguration cfg) {
+ this.topicMgr = tm;
+ this.cfg = cfg;
+ }
+
+
+ public void handleRequest(final PubSubRequest request, final Channel channel) {
+ topicMgr.getOwner(request.getTopic(), request.getShouldClaim(),
+ new Callback<HedwigSocketAddress>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ }
+
+ @Override
+ public void operationFinished(Object ctx, HedwigSocketAddress owner) {
+ if (!owner.equals(cfg.getServerAddr())) {
+ channel.write(PubSubResponseUtils.getResponseForException(
+ new ServerNotResponsibleForTopicException(owner.toString()), request.getTxnId()));
+ return;
+ }
+ handleRequestAtOwner(request, channel);
+ }
+ }, null);
+ }
+
+ public abstract void handleRequestAtOwner(PubSubRequest request, Channel channel);
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+public interface ChannelDisconnectListener {
+
+ /**
+ * Act on a particular channel being disconnected
+ * @param channel
+ */
+ public void channelDisconnected(Channel channel);
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class ConsumeHandler extends BaseHandler {
+
+ SubscriptionManager sm;
+ Callback<Void> noopCallback = new NoopCallback<Void>();
+
+ class NoopCallback<T> implements Callback<T> {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ }
+
+ public void operationFinished(Object ctx, T resultOfOperation) {
+ };
+ }
+
+ @Override
+ public void handleRequestAtOwner(PubSubRequest request, Channel channel) {
+ if (!request.hasConsumeRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing consume request data");
+ return;
+ }
+
+ ConsumeRequest consumeRequest = request.getConsumeRequest();
+
+ sm.setConsumeSeqIdForSubscriber(request.getTopic(), consumeRequest.getSubscriberId(),
+ consumeRequest.getMsgId(), noopCallback, null);
+
+ }
+
+ public ConsumeHandler(TopicManager tm, SubscriptionManager sm, ServerConfiguration cfg) {
+ super(tm, cfg);
+ this.sm = sm;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+
+public interface Handler {
+
+ /**
+ * Handle a request synchronously or asynchronously. After handling the
+ * request, the appropriate response should be written on the given channel
+ *
+ * @param request
+ * The request to handle
+ *
+ * @param channel
+ * The channel on which to write the response
+ */
+ public void handleRequest(final PubSubRequest request, final Channel channel);
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.persistence.PersistRequest;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class PublishHandler extends BaseHandler {
+
+ private PersistenceManager persistenceMgr;
+
+ public PublishHandler(TopicManager topicMgr, PersistenceManager persistenceMgr, ServerConfiguration cfg) {
+ super(topicMgr, cfg);
+ this.persistenceMgr = persistenceMgr;
+ }
+
+ @Override
+ public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+ if (!request.hasPublishRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing publish request data");
+ return;
+ }
+
+ Message msgToSerialize = Message.newBuilder(request.getPublishRequest().getMsg()).setSrcRegion(
+ cfg.getMyRegionByteString()).build();
+
+ PersistRequest persistRequest = new PersistRequest(request.getTopic(), msgToSerialize,
+ new Callback<Long>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Long resultOfOperation) {
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+ }
+ }, null);
+
+ persistenceMgr.persistMessage(persistRequest);
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.ChannelEndPoint;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.subscriptions.TrueFilter;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class SubscribeHandler extends BaseHandler implements ChannelDisconnectListener{
+ static Logger logger = Logger.getLogger(SubscribeHandler.class);
+
+ private DeliveryManager deliveryMgr;
+ private PersistenceManager persistenceMgr;
+ private SubscriptionManager subMgr;
+ ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
+ ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;
+
+ public SubscribeHandler(TopicManager topicMgr, DeliveryManager deliveryManager, PersistenceManager persistenceMgr,
+ SubscriptionManager subMgr, ServerConfiguration cfg) {
+ super(topicMgr, cfg);
+ this.deliveryMgr = deliveryManager;
+ this.persistenceMgr = persistenceMgr;
+ this.subMgr = subMgr;
+ sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+ channel2sub = new ConcurrentHashMap<Channel, TopicSubscriber>();
+ }
+
+ public void channelDisconnected(Channel channel) {
+ // Evils of synchronized programming: there is a race between a channel
+ // getting disconnected, and us adding it to the maps when a subscribe
+ // succeeds
+ synchronized (channel) {
+ TopicSubscriber topicSub = channel2sub.remove(channel);
+ if (topicSub != null) {
+ sub2Channel.remove(topicSub);
+ }
+ }
+ }
+
+ @Override
+ public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+
+ if (!request.hasSubscribeRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing subscribe request data");
+ return;
+ }
+
+ final ByteString topic = request.getTopic();
+
+ MessageSeqId seqId;
+ try {
+ seqId = persistenceMgr.getCurrentSeqIdForTopic(topic);
+ } catch (ServerNotResponsibleForTopicException e) {
+ channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener(
+ ChannelFutureListener.CLOSE);
+ return;
+ }
+
+ final SubscribeRequest subRequest = request.getSubscribeRequest();
+ final ByteString subscriberId = subRequest.getSubscriberId();
+
+ MessageSeqId lastSeqIdPublished = MessageSeqId.newBuilder(seqId).setLocalComponent(seqId.getLocalComponent()).build();
+
+ subMgr.serveSubscribeRequest(topic, subRequest, lastSeqIdPublished, new Callback<MessageSeqId>() {
+
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener(
+ ChannelFutureListener.CLOSE);
+ }
+
+ @Override
+ public void operationFinished(Object ctx, MessageSeqId resultOfOperation) {
+
+ TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
+
+ // race with channel getting disconnected while we are adding it
+ // to the 2 maps
+ synchronized (channel) {
+ if (!channel.isConnected()) {
+ // channel got disconnected while we were processing the
+ // subscribe request,
+ // nothing much we can do in this case
+ return;
+ }
+
+ if (null != sub2Channel.putIfAbsent(topicSub, channel)) {
+ // there was another channel mapped to this sub
+ PubSubException pse = new PubSubException.TopicBusyException(
+ "subscription for this topic, subscriberId is already being served on a different channel");
+ channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
+ .addListener(ChannelFutureListener.CLOSE);
+ return;
+ } else {
+ // channel2sub is just a cache, so we can add to it
+ // without synchronization
+ channel2sub.put(channel, topicSub);
+ }
+ }
+ // First write success and then tell the delivery manager,
+ // otherwise the first message might go out before the response
+ // to the subscribe
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+
+ // want to start 1 ahead of the consume ptr
+ MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(resultOfOperation).setLocalComponent(
+ resultOfOperation.getLocalComponent() + 1).build();
+ deliveryMgr.startServingSubscription(topic, subscriberId, seqIdToStartFrom,
+ new ChannelEndPoint(channel), TrueFilter.instance(), SubscriptionStateUtils
+ .isHubSubscriber(subRequest.getSubscriberId()));
+ }
+ }, null);
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class UnsubscribeHandler extends BaseHandler {
+ SubscriptionManager subMgr;
+ DeliveryManager deliveryMgr;
+
+ public UnsubscribeHandler(TopicManager tm, ServerConfiguration cfg, SubscriptionManager subMgr,
+ DeliveryManager deliveryMgr) {
+ super(tm, cfg);
+ this.subMgr = subMgr;
+ this.deliveryMgr = deliveryMgr;
+ }
+
+ @Override
+ public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+ if (!request.hasUnsubscribeRequest()) {
+ UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+ "Missing unsubscribe request data");
+ return;
+ }
+
+ final UnsubscribeRequest unsubRequest = request.getUnsubscribeRequest();
+ final ByteString topic = request.getTopic();
+ final ByteString subscriberId = unsubRequest.getSubscriberId();
+
+ subMgr.unsubscribe(topic, subscriberId, new Callback<Void>() {
+ @Override
+ public void operationFailed(Object ctx, PubSubException exception) {
+ channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+ }
+
+ @Override
+ public void operationFinished(Object ctx, Void resultOfOperation) {
+ deliveryMgr.stopServingSubscriber(topic, subscriberId);
+ channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+
+ }
+ }, null);
+
+ }
+
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Log4JLoggerFactory;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.delivery.FIFODeliveryManager;
+import org.apache.hedwig.server.handlers.ConsumeHandler;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.handlers.PublishHandler;
+import org.apache.hedwig.server.handlers.SubscribeHandler;
+import org.apache.hedwig.server.handlers.UnsubscribeHandler;
+import org.apache.hedwig.server.persistence.BookkeeperPersistenceManager;
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManagerWithRangeScan;
+import org.apache.hedwig.server.persistence.ReadAheadCache;
+import org.apache.hedwig.server.regions.HedwigHubClientFactory;
+import org.apache.hedwig.server.regions.RegionManager;
+import org.apache.hedwig.server.ssl.SslServerContextFactory;
+import org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionManager;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.subscriptions.ZkSubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.server.topics.ZkTopicManager;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.zookeeper.SafeAsyncCallback;
+
+public class PubSubServer {
+
+ static Logger logger = Logger.getLogger(PubSubServer.class);
+
+ // Netty related variables
+ ServerSocketChannelFactory serverChannelFactory;
+ ClientSocketChannelFactory clientChannelFactory;
+ ServerConfiguration conf;
+ ChannelGroup allChannels;
+
+ // Manager components that make up the PubSubServer
+ PersistenceManager pm;
+ DeliveryManager dm;
+ TopicManager tm;
+ SubscriptionManager sm;
+ RegionManager rm;
+
+ ZooKeeper zk; // null if we are in standalone mode
+ BookKeeper bk; // null if we are in standalone mode
+
+ // we use this to prevent long stack chains from building up in callbacks
+ ScheduledExecutorService scheduler;
+
+ protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException,
+ InterruptedException {
+
+ PersistenceManagerWithRangeScan underlyingPM;
+
+ if (conf.isStandalone()) {
+
+ underlyingPM = LocalDBPersistenceManager.instance();
+
+ } else {
+ try {
+ bk = new BookKeeper(zk, clientChannelFactory);
+ } catch (KeeperException e) {
+ logger.error("Could not instantiate bookkeeper client", e);
+ throw new IOException(e);
+ }
+ underlyingPM = new BookkeeperPersistenceManager(bk, zk, topicMgr, conf, scheduler);
+
+ }
+
+ PersistenceManager pm = underlyingPM;
+
+ if (conf.getReadAheadEnabled()) {
+ pm = new ReadAheadCache(underlyingPM, conf).start();
+ }
+
+ return pm;
+ }
+
+ protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm) {
+ if (conf.isStandalone()) {
+ return new InMemorySubscriptionManager(tm, pm, conf, scheduler);
+ } else {
+ return new ZkSubscriptionManager(zk, tm, pm, conf, scheduler);
+ }
+
+ }
+
+ protected RegionManager instantiateRegionManager(PersistenceManager pm, ScheduledExecutorService scheduler) {
+ return new RegionManager(pm, conf, zk, scheduler, new HedwigHubClientFactory(conf, clientChannelFactory));
+ }
+
+ protected void instantiateZookeeperClient() throws IOException {
+ if (!conf.isStandalone()) {
+ zk = new ZooKeeper(conf.getZkHost(), conf.getZkTimeout(), new Watcher() {
+ @Override
+ public void process(WatchedEvent event) {
+ }
+ });
+ }
+ }
+
+ protected TopicManager instantiateTopicManager() throws IOException {
+ TopicManager tm;
+
+ if (conf.isStandalone()) {
+ tm = new TrivialOwnAllTopicManager(conf, scheduler);
+ } else {
+ try {
+ tm = new ZkTopicManager(zk, conf, scheduler);
+ } catch (PubSubException e) {
+ logger.error("Could not instantiate zk-topic manager", e);
+ throw new IOException(e);
+ }
+ }
+ return tm;
+ }
+
+ protected Map<OperationType, Handler> initializeNettyHandlers(TopicManager tm, DeliveryManager dm,
+ PersistenceManager pm, SubscriptionManager sm) {
+ Map<OperationType, Handler> handlers = new HashMap<OperationType, Handler>();
+ handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf));
+ handlers.put(OperationType.SUBSCRIBE, new SubscribeHandler(tm, dm, pm, sm, conf));
+ handlers.put(OperationType.UNSUBSCRIBE, new UnsubscribeHandler(tm, conf, sm, dm));
+ handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf));
+ handlers = Collections.unmodifiableMap(handlers);
+ return handlers;
+ }
+
+ protected void initializeNetty(SslServerContextFactory sslFactory, Map<OperationType, Handler> handlers) {
+ boolean isSSLEnabled = (sslFactory != null) ? true : false;
+ InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+ ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
+ UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, isSSLEnabled);
+ PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, sslFactory, conf
+ .getMaximumMessageSize());
+
+ bootstrap.setPipelineFactory(pipeline);
+ bootstrap.setOption("child.tcpNoDelay", true);
+ bootstrap.setOption("child.keepAlive", true);
+ bootstrap.setOption("reuseAddress", true);
+
+ // Bind and start to accept incoming connections.
+ allChannels.add(bootstrap.bind(isSSLEnabled ? new InetSocketAddress(conf.getSSLServerPort())
+ : new InetSocketAddress(conf.getServerPort())));
+ logger.info("Going into receive loop");
+ }
+
+ public void shutdown() {
+ // TODO: tell bk to close logs
+
+ // Shutdown the ZooKeeper and BookKeeper clients only if we are
+ // not in stand-alone mode.
+ try {
+ if (zk != null)
+ zk.close();
+ if (bk != null)
+ bk.halt();
+ } catch (InterruptedException e) {
+ logger.error("Error while closing ZooKeeper client!");
+ }
+
+ // Stop the RegionManager.
+ rm.stop();
+
+ // Stop the DeliveryManager and ReadAheadCache threads (if
+ // applicable).
+ // TODO: It'd be cleaner and more general to modify the interfaces to
+ // include a stop method. If the specific implementation starts threads,
+ // then the stop method should take care of that clean up.
+ if (pm instanceof ReadAheadCache) {
+ ((ReadAheadCache) pm).stop();
+ }
+ if (dm instanceof FIFODeliveryManager) {
+ ((FIFODeliveryManager) dm).stop();
+ }
+
+ // Stop the SubscriptionManager if needed.
+ if (sm instanceof AbstractSubscriptionManager) {
+ ((AbstractSubscriptionManager) sm).stop();
+ }
+
+ // Close and release the Netty channels and resources
+ allChannels.close().awaitUninterruptibly();
+ serverChannelFactory.releaseExternalResources();
+ clientChannelFactory.releaseExternalResources();
+ scheduler.shutdown();
+ }
+
+ /**
+ * Starts the hedwig server on the given port
+ *
+ * @param port
+ * @throws ConfigurationException
+ * if there is something wrong with the given configuration
+ * @throws IOException
+ * @throws InterruptedException
+ * @throws ConfigurationException
+ */
+ public PubSubServer(final ServerConfiguration conf, final Thread.UncaughtExceptionHandler exceptionHandler)
+ throws Exception {
+
+ // First validate the conf
+ this.conf = conf;
+ conf.validate();
+
+ // We need a custom thread group, so that we can override the uncaught
+ // exception method
+ ThreadGroup tg = new ThreadGroup("hedwig") {
+ @Override
+ public void uncaughtException(Thread t, Throwable e) {
+ exceptionHandler.uncaughtException(t, e);
+ }
+ };
+ // ZooKeeper threads register their own handler. But if some work that
+ // we do in ZK threads throws an exception, we want our handler to be
+ // called, not theirs.
+ SafeAsyncCallback.setUncaughtExceptionHandler(exceptionHandler);
+
+ final SynchronousQueue<Either<Object, Exception>> queue = new SynchronousQueue<Either<Object, Exception>>();
+
+ new Thread(tg, new Runnable() {
+ @Override
+ public void run() {
+ try {
+ // Since zk is needed by almost everyone,try to see if we
+ // need that first
+ scheduler = Executors.newSingleThreadScheduledExecutor();
+ serverChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+ .newCachedThreadPool());
+ clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+ .newCachedThreadPool());
+
+ instantiateZookeeperClient();
+ tm = instantiateTopicManager();
+ pm = instantiatePersistenceManager(tm);
+ dm = new FIFODeliveryManager(pm, conf);
+ sm = instantiateSubscriptionManager(tm, pm);
+ rm = instantiateRegionManager(pm, scheduler);
+ sm.addListener(rm);
+
+ allChannels = new DefaultChannelGroup("hedwig");
+ // Initialize the Netty Handlers (used by the
+ // UmbrellaHandler) once so they can be shared by
+ // both the SSL and non-SSL channels.
+ Map<OperationType, Handler> handlers = initializeNettyHandlers(tm, dm, pm, sm);
+ // Initialize Netty for the regular non-SSL channels
+ initializeNetty(null, handlers);
+ if (conf.isSSLEnabled()) {
+ initializeNetty(new SslServerContextFactory(conf), handlers);
+ }
+ } catch (Exception e) {
+ ConcurrencyUtils.put(queue, Either.right(e));
+ return;
+ }
+
+ ConcurrencyUtils.put(queue, Either.of(new Object(), (Exception) null));
+ }
+
+ }).start();
+
+ Either<Object, Exception> either = ConcurrencyUtils.take(queue);
+ if (either.left() == null) {
+ throw either.right();
+ }
+ }
+
+ public PubSubServer(ServerConfiguration conf) throws Exception {
+ this(conf, new TerminateJVMExceptionHandler());
+ }
+
+ /**
+ *
+ * @param msg
+ * @param rc
+ * : code to exit with
+ */
+ public static void errorMsgAndExit(String msg, Throwable t, int rc) {
+ logger.fatal(msg, t);
+ System.err.println(msg);
+ System.exit(rc);
+ }
+
+ public final static int RC_INVALID_CONF_FILE = 1;
+ public final static int RC_MISCONFIGURED = 2;
+ public final static int RC_OTHER = 3;
+
+ /**
+ * @param args
+ */
+ public static void main(String[] args) {
+
+ logger.info("Attempting to start Hedwig");
+ ServerConfiguration conf = new ServerConfiguration();
+ if (args.length > 0) {
+ String confFile = args[0];
+ try {
+ conf.loadConf(new File(confFile).toURI().toURL());
+ } catch (MalformedURLException e) {
+ String msg = "Could not open configuration file: " + confFile;
+ errorMsgAndExit(msg, e, RC_INVALID_CONF_FILE);
+ } catch (ConfigurationException e) {
+ String msg = "Malformed configuration file: " + confFile;
+ errorMsgAndExit(msg, e, RC_MISCONFIGURED);
+ }
+ logger.info("Using configuration file " + confFile);
+ }
+ try {
+ new PubSubServer(conf);
+ } catch (Throwable t) {
+ errorMsgAndExit("Error during startup", t, RC_OTHER);
+ }
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.server.ssl.SslServerContextFactory;
+
+public class PubSubServerPipelineFactory implements ChannelPipelineFactory {
+
+ // TODO: make these conf settings
+ final static int MAX_WORKER_THREADS = 32;
+ final static int MAX_CHANNEL_MEMORY_SIZE = 10 * 1024 * 1024;
+ final static int MAX_TOTAL_MEMORY_SIZE = 100 * 1024 * 1024;
+
+ private UmbrellaHandler uh;
+ private SslServerContextFactory sslFactory;
+ private int maxMessageSize;
+
+ /**
+ *
+ * @param uh
+ * @param sslFactory
+ * may be null if ssl is disabled
+ * @param cfg
+ */
+ public PubSubServerPipelineFactory(UmbrellaHandler uh, SslServerContextFactory sslFactory, int maxMessageSize) {
+ this.uh = uh;
+ this.sslFactory = sslFactory;
+ this.maxMessageSize = maxMessageSize;
+ }
+
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = Channels.pipeline();
+ if (sslFactory != null) {
+ pipeline.addLast("ssl", new SslHandler(sslFactory.getEngine()));
+ }
+ pipeline.addLast("lengthbaseddecoder",
+ new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
+ pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+
+ pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubRequest.getDefaultInstance()));
+ pipeline.addLast("protobufencoder", new ProtobufEncoder());
+
+ // pipeline.addLast("executor", new ExecutionHandler(
+ // new OrderedMemoryAwareThreadPoolExecutor(MAX_WORKER_THREADS,
+ // MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE)));
+ //
+ // Dependency injection.
+ pipeline.addLast("umbrellahandler", uh);
+ return pipeline;
+ }
+}
Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements. See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership. The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import org.apache.hedwig.exceptions.PubSubException.MalformedRequestException;
+import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
+import org.apache.hedwig.server.handlers.Handler;
+
+@ChannelPipelineCoverage("all")
+public class UmbrellaHandler extends SimpleChannelHandler {
+ static Logger logger = Logger.getLogger(UmbrellaHandler.class);
+
+ private Map<OperationType, Handler> handlers;
+ private ChannelGroup allChannels;
+ private ChannelDisconnectListener subscribeHandler;
+ private boolean isSSLEnabled = false;
+
+ public UmbrellaHandler(ChannelGroup allChannels, Map<OperationType, Handler> handlers,
+ boolean isSSLEnabled) {
+ this.allChannels = allChannels;
+ this.isSSLEnabled = isSSLEnabled;
+ this.handlers = handlers;
+ subscribeHandler = (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ Throwable throwable = e.getCause();
+
+ // Add here if there are more exceptions we need to be able to tolerate.
+ // 1. IOException may be thrown when a channel is forcefully closed by
+ // the other end, or by the ProtobufDecoder when an invalid protobuf is
+ // received
+ // 2. TooLongFrameException is thrown by the LengthBasedDecoder if it
+ // receives a packet that is too big
+ // 3. CorruptedFramException is thrown by the LengthBasedDecoder when
+ // the length is negative etc.
+ if (throwable instanceof IOException || throwable instanceof TooLongFrameException
+ || throwable instanceof CorruptedFrameException) {
+ e.getChannel().close();
+ if (logger.isDebugEnabled()) {
+ logger.debug("Uncaught exception", throwable);
+ }
+ } else {
+ // call our uncaught exception handler, which might decide to
+ // shutdown the system
+ Thread thread = Thread.currentThread();
+ thread.getUncaughtExceptionHandler().uncaughtException(thread, throwable);
+ }
+
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // If SSL is NOT enabled, then we can add this channel to the
+ // ChannelGroup. Otherwise, that is done when the channel is connected
+ // and the SSL handshake has completed successfully.
+ if (!isSSLEnabled) {
+ allChannels.add(ctx.getChannel());
+ }
+ }
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ if (isSSLEnabled) {
+ ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel()).addListener(new ChannelFutureListener() {
+ public void operationComplete(ChannelFuture future) throws Exception {
+ if (future.isSuccess()) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("SSL handshake has completed successfully!");
+ }
+ allChannels.add(future.getChannel());
+ } else {
+ future.getChannel().close();
+ }
+ }
+ });
+ }
+ }
+
+ @Override
+ public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ Channel channel = ctx.getChannel();
+ // subscribe handler needs to know about channel disconnects
+ subscribeHandler.channelDisconnected(channel);
+ channel.close();
+ }
+
+ public static void sendErrorResponseToMalformedRequest(Channel channel, long txnId, String msg) {
+ if (logger.isDebugEnabled()) {
+ logger.debug("Malformed request from " + channel.getRemoteAddress() + " msg, = " + msg);
+ }
+ MalformedRequestException mre = new MalformedRequestException(msg);
+ PubSubResponse response = PubSubResponseUtils.getResponseForException(mre, txnId);
+ channel.write(response);
+ }
+
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+
+ if (!(e.getMessage() instanceof PubSubProtocol.PubSubRequest)) {
+ ctx.sendUpstream(e);
+ return;
+ }
+
+ PubSubProtocol.PubSubRequest request = (PubSubProtocol.PubSubRequest) e.getMessage();
+
+ Handler handler = handlers.get(request.getType());
+ Channel channel = ctx.getChannel();
+ long txnId = request.getTxnId();
+
+ if (handler == null) {
+ sendErrorResponseToMalformedRequest(channel, txnId, "Request type " + request.getType().getNumber()
+ + " unknown");
+ return;
+ }
+
+ handler.handleRequest(request, channel);
+ }
+
+}