You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@zookeeper.apache.org by ma...@apache.org on 2010/08/19 23:25:22 UTC

svn commit: r987314 [10/16] - in /hadoop/zookeeper/trunk: ./ src/contrib/hedwig/ src/contrib/hedwig/client/ src/contrib/hedwig/client/src/ src/contrib/hedwig/client/src/main/ src/contrib/hedwig/client/src/main/cpp/ src/contrib/hedwig/client/src/main/cp...

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/delivery/FIFODeliveryManager.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,561 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.delivery;
+
+import java.util.HashMap;
+import java.util.HashSet;
+import java.util.Map;
+import java.util.Queue;
+import java.util.Set;
+import java.util.SortedMap;
+import java.util.TreeMap;
+import java.util.concurrent.BlockingQueue;
+import java.util.concurrent.ConcurrentLinkedQueue;
+import java.util.concurrent.LinkedBlockingQueue;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.log4j.Logger;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.ProtocolVersion;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protocol.PubSubProtocol.StatusCode;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.UnexpectedError;
+import org.apache.hedwig.server.persistence.Factory;
+import org.apache.hedwig.server.persistence.MapMethods;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.ScanCallback;
+import org.apache.hedwig.server.persistence.ScanRequest;
+import org.apache.hedwig.server.subscriptions.MessageFilter;
+
+public class FIFODeliveryManager implements Runnable, DeliveryManager {
+
+    protected static final Logger logger = Logger.getLogger(FIFODeliveryManager.class);
+
+    protected interface DeliveryManagerRequest {
+        public void performRequest();
+    }
+
+    /**
+     * the main queue that the single-threaded delivery manager works off of
+     */
+    BlockingQueue<DeliveryManagerRequest> requestQueue = new LinkedBlockingQueue<DeliveryManagerRequest>();
+
+    /**
+     * The queue of all subscriptions that are facing a transient error either
+     * in scanning from the persistence manager, or in sending to the consumer
+     */
+    Queue<ActiveSubscriberState> retryQueue = new ConcurrentLinkedQueue<ActiveSubscriberState>();
+
+    /**
+     * Stores a mapping from topic to the delivery pointers on the topic. The
+     * delivery pointers are stored in a sorted map from seq-id to the set of
+     * subscribers at that seq-id
+     */
+    Map<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>> perTopicDeliveryPtrs;
+
+    /**
+     * Mapping from delivery end point to the subscriber state that we are
+     * serving at that end point. This prevents us e.g., from serving two
+     * subscriptions to the same endpoint
+     */
+    Map<TopicSubscriber, ActiveSubscriberState> subscriberStates;
+
+    private PersistenceManager persistenceMgr;
+
+    private ServerConfiguration cfg;
+
+    // Boolean indicating if this thread should continue running. This is used
+    // when we want to stop the thread during a PubSubServer shutdown.
+    protected boolean keepRunning = true;
+
+    public FIFODeliveryManager(PersistenceManager persistenceMgr, ServerConfiguration cfg) {
+        this.persistenceMgr = persistenceMgr;
+        perTopicDeliveryPtrs = new HashMap<ByteString, SortedMap<Long, Set<ActiveSubscriberState>>>();
+        subscriberStates = new HashMap<TopicSubscriber, ActiveSubscriberState>();
+        new Thread(this, "DeliveryManagerThread").start();
+        this.cfg = cfg;
+    }
+
+    /**
+     * ===================================================================== Our
+     * usual enqueue function, stop if error because of unbounded queue, should
+     * never happen
+     * 
+     */
+    protected void enqueueWithoutFailure(DeliveryManagerRequest request) {
+        if (!requestQueue.offer(request)) {
+            throw new UnexpectedError("Could not enqueue object: " + request + " to delivery manager request queue.");
+        }
+    }
+
+    /**
+     * ====================================================================
+     * Public interface of the delivery manager
+     */
+
+    /**
+     * Tells the delivery manager to start sending out messages for a particular
+     * subscription
+     * 
+     * @param topic
+     * @param subscriberId
+     * @param seqIdToStartFrom
+     *            Message sequence-id from where delivery should be started
+     * @param endPoint
+     *            The delivery end point to which send messages to
+     * @param filter
+     *            Only messages passing this filter should be sent to this
+     *            subscriber
+     * @param isHubSubscriber
+     *            There are some seq-id intricacies. To a hub subscriber, we
+     *            should send only a subset of the seq-id vector
+     */
+    public void startServingSubscription(ByteString topic, ByteString subscriberId, MessageSeqId seqIdToStartFrom,
+            DeliveryEndPoint endPoint, MessageFilter filter, boolean isHubSubscriber) {
+
+        ActiveSubscriberState subscriber = new ActiveSubscriberState(topic, subscriberId, seqIdToStartFrom
+                .getLocalComponent() - 1, endPoint, filter, isHubSubscriber);
+
+        enqueueWithoutFailure(subscriber);
+    }
+
+    public void stopServingSubscriber(ByteString topic, ByteString subscriberId) {
+        ActiveSubscriberState subState = subscriberStates.get(new TopicSubscriber(topic, subscriberId));
+
+        if (subState != null) {
+            stopServingSubscriber(subState);
+        }
+    }
+
+    /**
+     * Due to some error or disconnection or unsusbcribe, someone asks us to
+     * stop serving a particular endpoint
+     * 
+     * @param endPoint
+     */
+    protected void stopServingSubscriber(ActiveSubscriberState subscriber) {
+        enqueueWithoutFailure(new StopServingSubscriber(subscriber));
+    }
+
+    /**
+     * Instructs the delivery manager to backoff on the given subscriber and
+     * retry sending after some time
+     * 
+     * @param subscriber
+     */
+
+    public void retryErroredSubscriberAfterDelay(ActiveSubscriberState subscriber) {
+
+        subscriber.setLastScanErrorTime(System.currentTimeMillis());
+
+        if (!retryQueue.offer(subscriber)) {
+            throw new UnexpectedError("Could not enqueue to delivery manager retry queue");
+        }
+    }
+
+    /**
+     * Instructs the delivery manager to move the delivery pointer for a given
+     * subscriber
+     * 
+     * @param subscriber
+     * @param prevSeqId
+     * @param newSeqId
+     */
+    public void moveDeliveryPtrForward(ActiveSubscriberState subscriber, long prevSeqId, long newSeqId) {
+        enqueueWithoutFailure(new DeliveryPtrMove(subscriber, prevSeqId, newSeqId));
+    }
+
+    /*
+     * ==========================================================================
+     * == End of public interface, internal machinery begins.
+     */
+    public void run() {
+        while (keepRunning) {
+            DeliveryManagerRequest request = null;
+
+            try {
+                // We use a timeout of 1 second, so that we can wake up once in
+                // a while to check if there is something in the retry queue.
+                request = requestQueue.poll(1, TimeUnit.SECONDS);
+            } catch (InterruptedException e) {
+                Thread.currentThread().interrupt();
+            }
+
+            // First retry any subscriptions that had failed and need a retry
+            retryErroredSubscribers();
+
+            if (request == null) {
+                continue;
+            }
+
+            request.performRequest();
+
+        }
+    }
+
+    /**
+     * Stop method which will enqueue a ShutdownDeliveryManagerRequest.
+     */
+    public void stop() {
+        enqueueWithoutFailure(new ShutdownDeliveryManagerRequest());
+    }
+
+    protected void retryErroredSubscribers() {
+        long lastInterestingFailureTime = System.currentTimeMillis() - cfg.getScanBackoffPeriodMs();
+        ActiveSubscriberState subscriber;
+
+        while ((subscriber = retryQueue.peek()) != null) {
+            if (subscriber.getLastScanErrorTime() > lastInterestingFailureTime) {
+                // Not enough time has elapsed yet, will retry later
+                // Since the queue is fifo, no need to check later items
+                return;
+            }
+
+            // retry now
+            subscriber.deliverNextMessage();
+            retryQueue.poll();
+        }
+    }
+
+    protected void removeDeliveryPtr(ActiveSubscriberState subscriber, Long seqId, boolean isAbsenceOk,
+            boolean pruneTopic) {
+
+        assert seqId != null;
+
+        // remove this subscriber from the delivery pointers data structure
+        ByteString topic = subscriber.getTopic();
+        SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
+
+        if (deliveryPtrs == null && !isAbsenceOk) {
+            throw new UnexpectedError("No delivery pointers found while disconnecting " + "channel for topic:" + topic);
+        }
+
+        if (!MapMethods.removeFromMultiMap(deliveryPtrs, seqId, subscriber) && !isAbsenceOk) {
+
+            throw new UnexpectedError("Could not find subscriber:" + subscriber + " at the expected delivery pointer");
+        }
+
+        if (pruneTopic && deliveryPtrs.isEmpty()) {
+            perTopicDeliveryPtrs.remove(topic);
+        }
+
+    }
+
+    protected long getMinimumSeqId(ByteString topic) {
+        SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = perTopicDeliveryPtrs.get(topic);
+
+        if (deliveryPtrs == null || deliveryPtrs.isEmpty()) {
+            return Long.MAX_VALUE - 1;
+        }
+        return deliveryPtrs.firstKey();
+    }
+
+    protected void addDeliveryPtr(ActiveSubscriberState subscriber, Long seqId) {
+
+        // If this topic doesn't exist in the per-topic delivery pointers table,
+        // create an entry for it
+        SortedMap<Long, Set<ActiveSubscriberState>> deliveryPtrs = MapMethods.getAfterInsertingIfAbsent(
+                perTopicDeliveryPtrs, subscriber.getTopic(), TreeMapLongToSetSubscriberFactory.instance);
+
+        MapMethods.addToMultiMap(deliveryPtrs, seqId, subscriber, HashMapSubscriberFactory.instance);
+    }
+
+    public class ActiveSubscriberState implements ScanCallback, DeliveryCallback, DeliveryManagerRequest {
+        ByteString topic;
+        ByteString subscriberId;
+        long lastLocalSeqIdDelivered;
+        boolean connected = true;
+        DeliveryEndPoint deliveryEndPoint;
+        long lastScanErrorTime = -1;
+        long localSeqIdDeliveringNow;
+        long lastSeqIdCommunicatedExternally;
+        // TODO make use of these variables
+        MessageFilter filter;
+        boolean isHubSubscriber;
+        final static int SEQ_ID_SLACK = 10;
+
+        public ActiveSubscriberState(ByteString topic, ByteString subscriberId, long lastLocalSeqIdDelivered,
+                DeliveryEndPoint deliveryEndPoint, MessageFilter filter, boolean isHubSubscriber) {
+            this.topic = topic;
+            this.subscriberId = subscriberId;
+            this.lastLocalSeqIdDelivered = lastLocalSeqIdDelivered;
+            this.deliveryEndPoint = deliveryEndPoint;
+            this.filter = filter;
+            this.isHubSubscriber = isHubSubscriber;
+        }
+
+        public void setNotConnected() {
+            this.connected = false;
+            deliveryEndPoint.close();
+        }
+
+        public ByteString getTopic() {
+            return topic;
+        }
+
+        public long getLastLocalSeqIdDelivered() {
+            return lastLocalSeqIdDelivered;
+        }
+
+        public long getLastScanErrorTime() {
+            return lastScanErrorTime;
+        }
+
+        public void setLastScanErrorTime(long lastScanErrorTime) {
+            this.lastScanErrorTime = lastScanErrorTime;
+        }
+
+        protected boolean isConnected() {
+            return connected;
+        }
+
+        public void deliverNextMessage() {
+
+            if (!isConnected()) {
+                return;
+            }
+
+            localSeqIdDeliveringNow = persistenceMgr.getSeqIdAfterSkipping(topic, lastLocalSeqIdDelivered, 1);
+
+            ScanRequest scanRequest = new ScanRequest(topic, localSeqIdDeliveringNow,
+            /* callback= */this, /* ctx= */null);
+
+            persistenceMgr.scanSingleMessage(scanRequest);
+        }
+
+        /**
+         * ===============================================================
+         * {@link ScanCallback} methods
+         */
+
+        public void messageScanned(Object ctx, Message message) {
+            if (!connected) {
+                return;
+            }
+
+            // We're using a simple all-to-all network topology, so no region
+            // should ever need to forward messages to any other region.
+            // Otherwise, with the current logic, messages will end up
+            // ping-pong-ing back and forth between regions with subscriptions
+            // to each other without termination (or in any other cyclic
+            // configuration).
+            if (isHubSubscriber && !message.getSrcRegion().equals(cfg.getMyRegionByteString())) {
+                sendingFinished();
+                return;
+            }
+
+            /**
+             * The method below will invoke our sendingFinished() method when
+             * done
+             */
+            PubSubResponse response = PubSubResponse.newBuilder().setProtocolVersion(ProtocolVersion.VERSION_ONE)
+                    .setStatusCode(StatusCode.SUCCESS).setTxnId(0).setMessage(message).build();
+
+            deliveryEndPoint.send(response, //
+                    // callback =
+                    this);
+
+        }
+
+        public void scanFailed(Object ctx, Exception exception) {
+            if (!connected) {
+                return;
+            }
+
+            // wait for some time and then retry
+            retryErroredSubscriberAfterDelay(this);
+        }
+
+        public void scanFinished(Object ctx, ReasonForFinish reason) {
+            // no-op
+        }
+
+        /**
+         * ===============================================================
+         * {@link DeliveryCallback} methods
+         */
+        public void sendingFinished() {
+            if (!connected) {
+                return;
+            }
+
+            lastLocalSeqIdDelivered = localSeqIdDeliveringNow;
+            
+            if (lastLocalSeqIdDelivered > lastSeqIdCommunicatedExternally + SEQ_ID_SLACK){
+                // Note: The order of the next 2 statements is important. We should
+                // submit a request to change our delivery pointer only *after* we
+                // have actually changed it. Otherwise, there is a race condition
+                // with removal of this channel, w.r.t, maintaining the deliveryPtrs
+                // tree map.
+                long prevId = lastSeqIdCommunicatedExternally;
+                lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+                moveDeliveryPtrForward(this, prevId, lastLocalSeqIdDelivered);
+            }
+            deliverNextMessage();
+        }
+        
+        public long getLastSeqIdCommunicatedExternally() {
+            return lastSeqIdCommunicatedExternally;
+        }
+            
+
+        public void permanentErrorOnSend() {
+            stopServingSubscriber(this);
+        }
+
+        public void transientErrorOnSend() {
+            retryErroredSubscriberAfterDelay(this);
+        }
+
+        /**
+         * ===============================================================
+         * {@link DeliveryManagerRequest} methods
+         */
+        public void performRequest() {
+
+            // Put this subscriber in the channel to subscriber mapping
+            ActiveSubscriberState prevSubscriber = subscriberStates.put(new TopicSubscriber(topic, subscriberId), this);
+
+            if (prevSubscriber != null) {
+                stopServingSubscriber(prevSubscriber);
+            }
+
+            lastSeqIdCommunicatedExternally = lastLocalSeqIdDelivered;
+            addDeliveryPtr(this, lastLocalSeqIdDelivered);
+            
+            deliverNextMessage();
+        };
+
+        @Override
+        public String toString() {
+            StringBuilder sb = new StringBuilder();
+            sb.append("Topic: ");
+            sb.append(topic.toStringUtf8());
+            sb.append("DeliveryPtr: ");
+            sb.append(lastLocalSeqIdDelivered);
+            return sb.toString();
+
+        }
+    }
+
+    protected class StopServingSubscriber implements DeliveryManagerRequest {
+        ActiveSubscriberState subscriber;
+
+        public StopServingSubscriber(ActiveSubscriberState subscriber) {
+            this.subscriber = subscriber;
+        }
+
+        @Override
+        public void performRequest() {
+
+            // This will automatically stop delivery, and disconnect the channel
+            subscriber.setNotConnected();
+
+            // if the subscriber has moved on, a move request for its delivery
+            // pointer must be pending in the request queue. Note that the
+            // subscriber first changes its delivery pointer and then submits a
+            // request to move so this works.
+            removeDeliveryPtr(subscriber, subscriber.getLastSeqIdCommunicatedExternally(), //
+                    // isAbsenceOk=
+                    true,
+                    // pruneTopic=
+                    true);
+        }
+
+    }
+
+    protected class DeliveryPtrMove implements DeliveryManagerRequest {
+
+        ActiveSubscriberState subscriber;
+        Long oldSeqId;
+        Long newSeqId;
+
+        public DeliveryPtrMove(ActiveSubscriberState subscriber, Long oldSeqId, Long newSeqId) {
+            this.subscriber = subscriber;
+            this.oldSeqId = oldSeqId;
+            this.newSeqId = newSeqId;
+        }
+
+        @Override
+        public void performRequest() {
+            ByteString topic = subscriber.getTopic();
+            long prevMinSeqId = getMinimumSeqId(topic);
+
+            if (subscriber.isConnected()) {
+                removeDeliveryPtr(subscriber, oldSeqId, //
+                        // isAbsenceOk=
+                        false,
+                        // pruneTopic=
+                        false);
+
+                addDeliveryPtr(subscriber, newSeqId);
+            } else {
+                removeDeliveryPtr(subscriber, oldSeqId, //
+                        // isAbsenceOk=
+                        true,
+                        // pruneTopic=
+                        true);
+            }
+
+            long nowMinSeqId = getMinimumSeqId(topic);
+
+            if (nowMinSeqId > prevMinSeqId) {
+                persistenceMgr.deliveredUntil(topic, nowMinSeqId);
+            }
+        }
+    }
+
+    protected class ShutdownDeliveryManagerRequest implements DeliveryManagerRequest {
+        // This is a simple type of Request we will enqueue when the
+        // PubSubServer is shut down and we want to stop the DeliveryManager
+        // thread.
+        public void performRequest() {
+            keepRunning = false;
+        }
+    }
+
+    /**
+     * ====================================================================
+     * 
+     * Dumb factories for our map methods
+     */
+    protected static class TreeMapLongToSetSubscriberFactory implements
+            Factory<SortedMap<Long, Set<ActiveSubscriberState>>> {
+        static TreeMapLongToSetSubscriberFactory instance = new TreeMapLongToSetSubscriberFactory();
+
+        @Override
+        public SortedMap<Long, Set<ActiveSubscriberState>> newInstance() {
+            return new TreeMap<Long, Set<ActiveSubscriberState>>();
+        }
+    }
+
+    protected static class HashMapSubscriberFactory implements Factory<Set<ActiveSubscriberState>> {
+        static HashMapSubscriberFactory instance = new HashMapSubscriberFactory();
+
+        @Override
+        public Set<ActiveSubscriberState> newInstance() {
+            return new HashSet<ActiveSubscriberState>();
+        }
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/BaseHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+import org.apache.hedwig.util.HedwigSocketAddress;
+
+public abstract class BaseHandler implements Handler{
+
+    protected TopicManager topicMgr;
+    protected ServerConfiguration cfg;
+
+    protected BaseHandler(TopicManager tm, ServerConfiguration cfg) {
+        this.topicMgr = tm;
+        this.cfg = cfg;
+    }
+
+
+    public void handleRequest(final PubSubRequest request, final Channel channel) {
+        topicMgr.getOwner(request.getTopic(), request.getShouldClaim(),
+                new Callback<HedwigSocketAddress>() {
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                    }
+
+                    @Override
+                    public void operationFinished(Object ctx, HedwigSocketAddress owner) {
+                        if (!owner.equals(cfg.getServerAddr())) {
+                            channel.write(PubSubResponseUtils.getResponseForException(
+                                    new ServerNotResponsibleForTopicException(owner.toString()), request.getTxnId()));
+                            return;
+                        }
+                        handleRequestAtOwner(request, channel);
+                    }
+                }, null);
+    }
+
+    public abstract void handleRequestAtOwner(PubSubRequest request, Channel channel);
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ChannelDisconnectListener.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,29 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+public interface ChannelDisconnectListener {
+    
+    /**
+     * Act on a particular channel being disconnected
+     * @param channel
+     */
+    public void channelDisconnected(Channel channel);
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/ConsumeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,64 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.ConsumeRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class ConsumeHandler extends BaseHandler {
+
+    SubscriptionManager sm;
+    Callback<Void> noopCallback = new NoopCallback<Void>();
+
+    class NoopCallback<T> implements Callback<T> {
+        @Override
+        public void operationFailed(Object ctx, PubSubException exception) {
+        }
+
+        public void operationFinished(Object ctx, T resultOfOperation) {
+        };
+    }
+
+    @Override
+    public void handleRequestAtOwner(PubSubRequest request, Channel channel) {
+        if (!request.hasConsumeRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing consume request data");
+            return;
+        }
+
+        ConsumeRequest consumeRequest = request.getConsumeRequest();
+
+        sm.setConsumeSeqIdForSubscriber(request.getTopic(), consumeRequest.getSubscriberId(),
+                consumeRequest.getMsgId(), noopCallback, null);
+
+    }
+
+    public ConsumeHandler(TopicManager tm, SubscriptionManager sm, ServerConfiguration cfg) {
+        super(tm, cfg);
+        this.sm = sm;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/Handler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,37 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+
+public interface Handler {
+    
+    /**
+     * Handle a request synchronously or asynchronously. After handling the
+     * request, the appropriate response should be written on the given channel
+     * 
+     * @param request
+     *            The request to handle
+     * 
+     * @param channel
+     *            The channel on which to write the response
+     */
+    public void handleRequest(final PubSubRequest request, final Channel channel);
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/PublishHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,68 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.Message;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.persistence.PersistRequest;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class PublishHandler extends BaseHandler {
+
+    private PersistenceManager persistenceMgr;
+
+    public PublishHandler(TopicManager topicMgr, PersistenceManager persistenceMgr, ServerConfiguration cfg) {
+        super(topicMgr, cfg);
+        this.persistenceMgr = persistenceMgr;
+    }
+
+    @Override
+    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+        if (!request.hasPublishRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing publish request data");
+            return;
+        }
+
+        Message msgToSerialize = Message.newBuilder(request.getPublishRequest().getMsg()).setSrcRegion(
+                cfg.getMyRegionByteString()).build();
+
+        PersistRequest persistRequest = new PersistRequest(request.getTopic(), msgToSerialize,
+                new Callback<Long>() {
+                    @Override
+                    public void operationFailed(Object ctx, PubSubException exception) {
+                        channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+                    }
+
+                    @Override
+                    public void operationFinished(Object ctx, Long resultOfOperation) {
+                        channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+                    }
+                }, null);
+
+        persistenceMgr.persistMessage(persistRequest);
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/SubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,153 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import java.util.concurrent.ConcurrentHashMap;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFutureListener;
+
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.client.data.TopicSubscriber;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.exceptions.PubSubException.ServerNotResponsibleForTopicException;
+import org.apache.hedwig.protocol.PubSubProtocol.MessageSeqId;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.protoextensions.SubscriptionStateUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.ChannelEndPoint;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.subscriptions.TrueFilter;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class SubscribeHandler extends BaseHandler implements ChannelDisconnectListener{
+    static Logger logger = Logger.getLogger(SubscribeHandler.class);
+
+    private DeliveryManager deliveryMgr;
+    private PersistenceManager persistenceMgr;
+    private SubscriptionManager subMgr;
+    ConcurrentHashMap<TopicSubscriber, Channel> sub2Channel;
+    ConcurrentHashMap<Channel, TopicSubscriber> channel2sub;
+
+    public SubscribeHandler(TopicManager topicMgr, DeliveryManager deliveryManager, PersistenceManager persistenceMgr,
+            SubscriptionManager subMgr, ServerConfiguration cfg) {
+        super(topicMgr, cfg);
+        this.deliveryMgr = deliveryManager;
+        this.persistenceMgr = persistenceMgr;
+        this.subMgr = subMgr;
+        sub2Channel = new ConcurrentHashMap<TopicSubscriber, Channel>();
+        channel2sub = new ConcurrentHashMap<Channel, TopicSubscriber>();
+    }
+
+    public void channelDisconnected(Channel channel) {
+        // Evils of synchronized programming: there is a race between a channel
+        // getting disconnected, and us adding it to the maps when a subscribe
+        // succeeds
+        synchronized (channel) {
+            TopicSubscriber topicSub = channel2sub.remove(channel);
+            if (topicSub != null) {
+                sub2Channel.remove(topicSub);
+            }
+        }
+    }
+
+    @Override
+    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+
+        if (!request.hasSubscribeRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing subscribe request data");
+            return;
+        }
+
+        final ByteString topic = request.getTopic();
+
+        MessageSeqId seqId;
+        try {
+            seqId = persistenceMgr.getCurrentSeqIdForTopic(topic);
+        } catch (ServerNotResponsibleForTopicException e) {
+            channel.write(PubSubResponseUtils.getResponseForException(e, request.getTxnId())).addListener(
+                    ChannelFutureListener.CLOSE);
+            return;
+        }
+
+        final SubscribeRequest subRequest = request.getSubscribeRequest();
+        final ByteString subscriberId = subRequest.getSubscriberId();
+
+        MessageSeqId lastSeqIdPublished = MessageSeqId.newBuilder(seqId).setLocalComponent(seqId.getLocalComponent()).build();
+
+        subMgr.serveSubscribeRequest(topic, subRequest, lastSeqIdPublished, new Callback<MessageSeqId>() {
+
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId())).addListener(
+                        ChannelFutureListener.CLOSE);
+            }
+
+            @Override
+            public void operationFinished(Object ctx, MessageSeqId resultOfOperation) {
+
+                TopicSubscriber topicSub = new TopicSubscriber(topic, subscriberId);
+
+                // race with channel getting disconnected while we are adding it
+                // to the 2 maps
+                synchronized (channel) {
+                    if (!channel.isConnected()) {
+                        // channel got disconnected while we were processing the
+                        // subscribe request,
+                        // nothing much we can do in this case
+                        return;
+                    }
+
+                    if (null != sub2Channel.putIfAbsent(topicSub, channel)) {
+                        // there was another channel mapped to this sub
+                        PubSubException pse = new PubSubException.TopicBusyException(
+                                "subscription for this topic, subscriberId is already being served on a different channel");
+                        channel.write(PubSubResponseUtils.getResponseForException(pse, request.getTxnId()))
+                                .addListener(ChannelFutureListener.CLOSE);
+                        return;
+                    } else {
+                        // channel2sub is just a cache, so we can add to it
+                        // without synchronization
+                        channel2sub.put(channel, topicSub);
+                    }
+                }
+                // First write success and then tell the delivery manager,
+                // otherwise the first message might go out before the response
+                // to the subscribe
+                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+
+                // want to start 1 ahead of the consume ptr
+                MessageSeqId seqIdToStartFrom = MessageSeqId.newBuilder(resultOfOperation).setLocalComponent(
+                        resultOfOperation.getLocalComponent() + 1).build();
+                deliveryMgr.startServingSubscription(topic, subscriberId, seqIdToStartFrom,
+                        new ChannelEndPoint(channel), TrueFilter.instance(), SubscriptionStateUtils
+                                .isHubSubscriber(subRequest.getSubscriberId()));
+            }
+        }, null);
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/handlers/UnsubscribeHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,72 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.handlers;
+
+import org.jboss.netty.channel.Channel;
+import com.google.protobuf.ByteString;
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubRequest;
+import org.apache.hedwig.protocol.PubSubProtocol.UnsubscribeRequest;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.netty.UmbrellaHandler;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.util.Callback;
+
+public class UnsubscribeHandler extends BaseHandler {
+    SubscriptionManager subMgr;
+    DeliveryManager deliveryMgr;
+
+    public UnsubscribeHandler(TopicManager tm, ServerConfiguration cfg, SubscriptionManager subMgr,
+            DeliveryManager deliveryMgr) {
+        super(tm, cfg);
+        this.subMgr = subMgr;
+        this.deliveryMgr = deliveryMgr;
+    }
+
+    @Override
+    public void handleRequestAtOwner(final PubSubRequest request, final Channel channel) {
+        if (!request.hasUnsubscribeRequest()) {
+            UmbrellaHandler.sendErrorResponseToMalformedRequest(channel, request.getTxnId(),
+                    "Missing unsubscribe request data");
+            return;
+        }
+
+        final UnsubscribeRequest unsubRequest = request.getUnsubscribeRequest();
+        final ByteString topic = request.getTopic();
+        final ByteString subscriberId = unsubRequest.getSubscriberId();
+
+        subMgr.unsubscribe(topic, subscriberId, new Callback<Void>() {
+            @Override
+            public void operationFailed(Object ctx, PubSubException exception) {
+                channel.write(PubSubResponseUtils.getResponseForException(exception, request.getTxnId()));
+            }
+
+            @Override
+            public void operationFinished(Object ctx, Void resultOfOperation) {
+                deliveryMgr.stopServingSubscriber(topic, subscriberId);
+                channel.write(PubSubResponseUtils.getSuccessResponse(request.getTxnId()));
+
+            }
+        }, null);
+
+    }
+
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServer.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,364 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import java.io.File;
+import java.io.IOException;
+import java.net.InetSocketAddress;
+import java.net.MalformedURLException;
+import java.util.Collections;
+import java.util.HashMap;
+import java.util.Map;
+import java.util.concurrent.Executors;
+import java.util.concurrent.ScheduledExecutorService;
+import java.util.concurrent.SynchronousQueue;
+
+import org.apache.bookkeeper.client.BookKeeper;
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.log4j.Logger;
+import org.apache.zookeeper.KeeperException;
+import org.apache.zookeeper.WatchedEvent;
+import org.apache.zookeeper.Watcher;
+import org.apache.zookeeper.ZooKeeper;
+import org.jboss.netty.bootstrap.ServerBootstrap;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.channel.group.DefaultChannelGroup;
+import org.jboss.netty.channel.socket.ClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.ServerSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioClientSocketChannelFactory;
+import org.jboss.netty.channel.socket.nio.NioServerSocketChannelFactory;
+import org.jboss.netty.logging.InternalLoggerFactory;
+import org.jboss.netty.logging.Log4JLoggerFactory;
+
+import org.apache.hedwig.exceptions.PubSubException;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.server.common.ServerConfiguration;
+import org.apache.hedwig.server.common.TerminateJVMExceptionHandler;
+import org.apache.hedwig.server.delivery.DeliveryManager;
+import org.apache.hedwig.server.delivery.FIFODeliveryManager;
+import org.apache.hedwig.server.handlers.ConsumeHandler;
+import org.apache.hedwig.server.handlers.Handler;
+import org.apache.hedwig.server.handlers.PublishHandler;
+import org.apache.hedwig.server.handlers.SubscribeHandler;
+import org.apache.hedwig.server.handlers.UnsubscribeHandler;
+import org.apache.hedwig.server.persistence.BookkeeperPersistenceManager;
+import org.apache.hedwig.server.persistence.LocalDBPersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManager;
+import org.apache.hedwig.server.persistence.PersistenceManagerWithRangeScan;
+import org.apache.hedwig.server.persistence.ReadAheadCache;
+import org.apache.hedwig.server.regions.HedwigHubClientFactory;
+import org.apache.hedwig.server.regions.RegionManager;
+import org.apache.hedwig.server.ssl.SslServerContextFactory;
+import org.apache.hedwig.server.subscriptions.AbstractSubscriptionManager;
+import org.apache.hedwig.server.subscriptions.InMemorySubscriptionManager;
+import org.apache.hedwig.server.subscriptions.SubscriptionManager;
+import org.apache.hedwig.server.subscriptions.ZkSubscriptionManager;
+import org.apache.hedwig.server.topics.TopicManager;
+import org.apache.hedwig.server.topics.TrivialOwnAllTopicManager;
+import org.apache.hedwig.server.topics.ZkTopicManager;
+import org.apache.hedwig.util.ConcurrencyUtils;
+import org.apache.hedwig.util.Either;
+import org.apache.hedwig.zookeeper.SafeAsyncCallback;
+
+public class PubSubServer {
+
+    static Logger logger = Logger.getLogger(PubSubServer.class);
+
+    // Netty related variables
+    ServerSocketChannelFactory serverChannelFactory;
+    ClientSocketChannelFactory clientChannelFactory;
+    ServerConfiguration conf;
+    ChannelGroup allChannels;
+
+    // Manager components that make up the PubSubServer
+    PersistenceManager pm;
+    DeliveryManager dm;
+    TopicManager tm;
+    SubscriptionManager sm;
+    RegionManager rm;
+
+    ZooKeeper zk; // null if we are in standalone mode
+    BookKeeper bk; // null if we are in standalone mode
+
+    // we use this to prevent long stack chains from building up in callbacks
+    ScheduledExecutorService scheduler;
+
+    protected PersistenceManager instantiatePersistenceManager(TopicManager topicMgr) throws IOException,
+            InterruptedException {
+
+        PersistenceManagerWithRangeScan underlyingPM;
+
+        if (conf.isStandalone()) {
+
+            underlyingPM = LocalDBPersistenceManager.instance();
+
+        } else {
+            try {
+                bk = new BookKeeper(zk, clientChannelFactory);
+            } catch (KeeperException e) {
+                logger.error("Could not instantiate bookkeeper client", e);
+                throw new IOException(e);
+            }
+            underlyingPM = new BookkeeperPersistenceManager(bk, zk, topicMgr, conf, scheduler);
+
+        }
+
+        PersistenceManager pm = underlyingPM;
+
+        if (conf.getReadAheadEnabled()) {
+            pm = new ReadAheadCache(underlyingPM, conf).start();
+        }
+
+        return pm;
+    }
+
+    protected SubscriptionManager instantiateSubscriptionManager(TopicManager tm, PersistenceManager pm) {
+        if (conf.isStandalone()) {
+            return new InMemorySubscriptionManager(tm, pm, conf, scheduler);
+        } else {
+            return new ZkSubscriptionManager(zk, tm, pm, conf, scheduler);
+        }
+
+    }
+
+    protected RegionManager instantiateRegionManager(PersistenceManager pm, ScheduledExecutorService scheduler) {
+        return new RegionManager(pm, conf, zk, scheduler, new HedwigHubClientFactory(conf, clientChannelFactory));
+    }
+
+    protected void instantiateZookeeperClient() throws IOException {
+        if (!conf.isStandalone()) {
+            zk = new ZooKeeper(conf.getZkHost(), conf.getZkTimeout(), new Watcher() {
+                @Override
+                public void process(WatchedEvent event) {
+                }
+            });
+        }
+    }
+
+    protected TopicManager instantiateTopicManager() throws IOException {
+        TopicManager tm;
+
+        if (conf.isStandalone()) {
+            tm = new TrivialOwnAllTopicManager(conf, scheduler);
+        } else {
+            try {
+                tm = new ZkTopicManager(zk, conf, scheduler);
+            } catch (PubSubException e) {
+                logger.error("Could not instantiate zk-topic manager", e);
+                throw new IOException(e);
+            }
+        }
+        return tm;
+    }
+
+    protected Map<OperationType, Handler> initializeNettyHandlers(TopicManager tm, DeliveryManager dm,
+            PersistenceManager pm, SubscriptionManager sm) {
+        Map<OperationType, Handler> handlers = new HashMap<OperationType, Handler>();
+        handlers.put(OperationType.PUBLISH, new PublishHandler(tm, pm, conf));
+        handlers.put(OperationType.SUBSCRIBE, new SubscribeHandler(tm, dm, pm, sm, conf));
+        handlers.put(OperationType.UNSUBSCRIBE, new UnsubscribeHandler(tm, conf, sm, dm));
+        handlers.put(OperationType.CONSUME, new ConsumeHandler(tm, sm, conf));
+        handlers = Collections.unmodifiableMap(handlers);
+        return handlers;
+    }
+
+    protected void initializeNetty(SslServerContextFactory sslFactory, Map<OperationType, Handler> handlers) {
+        boolean isSSLEnabled = (sslFactory != null) ? true : false;
+        InternalLoggerFactory.setDefaultFactory(new Log4JLoggerFactory());
+        ServerBootstrap bootstrap = new ServerBootstrap(serverChannelFactory);
+        UmbrellaHandler umbrellaHandler = new UmbrellaHandler(allChannels, handlers, isSSLEnabled);
+        PubSubServerPipelineFactory pipeline = new PubSubServerPipelineFactory(umbrellaHandler, sslFactory, conf
+                .getMaximumMessageSize());
+
+        bootstrap.setPipelineFactory(pipeline);
+        bootstrap.setOption("child.tcpNoDelay", true);
+        bootstrap.setOption("child.keepAlive", true);
+        bootstrap.setOption("reuseAddress", true);
+
+        // Bind and start to accept incoming connections.
+        allChannels.add(bootstrap.bind(isSSLEnabled ? new InetSocketAddress(conf.getSSLServerPort())
+                : new InetSocketAddress(conf.getServerPort())));
+        logger.info("Going into receive loop");
+    }
+
+    public void shutdown() {
+        // TODO: tell bk to close logs
+
+        // Shutdown the ZooKeeper and BookKeeper clients only if we are
+        // not in stand-alone mode.
+        try {
+            if (zk != null)
+                zk.close();
+            if (bk != null)
+                bk.halt();
+        } catch (InterruptedException e) {
+            logger.error("Error while closing ZooKeeper client!");
+        }
+
+        // Stop the RegionManager.
+        rm.stop();
+        
+        // Stop the DeliveryManager and ReadAheadCache threads (if
+        // applicable).
+        // TODO: It'd be cleaner and more general to modify the interfaces to
+        // include a stop method. If the specific implementation starts threads,
+        // then the stop method should take care of that clean up.
+        if (pm instanceof ReadAheadCache) {
+            ((ReadAheadCache) pm).stop();
+        }
+        if (dm instanceof FIFODeliveryManager) {
+            ((FIFODeliveryManager) dm).stop();
+        }
+
+        // Stop the SubscriptionManager if needed.
+        if (sm instanceof AbstractSubscriptionManager) {
+            ((AbstractSubscriptionManager) sm).stop();
+        }
+        
+        // Close and release the Netty channels and resources
+        allChannels.close().awaitUninterruptibly();
+        serverChannelFactory.releaseExternalResources();
+        clientChannelFactory.releaseExternalResources();
+        scheduler.shutdown();
+    }
+
+    /**
+     * Starts the hedwig server on the given port
+     * 
+     * @param port
+     * @throws ConfigurationException
+     *             if there is something wrong with the given configuration
+     * @throws IOException
+     * @throws InterruptedException
+     * @throws ConfigurationException
+     */
+    public PubSubServer(final ServerConfiguration conf, final Thread.UncaughtExceptionHandler exceptionHandler)
+            throws Exception {
+
+        // First validate the conf
+        this.conf = conf;
+        conf.validate();
+
+        // We need a custom thread group, so that we can override the uncaught
+        // exception method
+        ThreadGroup tg = new ThreadGroup("hedwig") {
+            @Override
+            public void uncaughtException(Thread t, Throwable e) {
+                exceptionHandler.uncaughtException(t, e);
+            }
+        };
+        // ZooKeeper threads register their own handler. But if some work that
+        // we do in ZK threads throws an exception, we want our handler to be
+        // called, not theirs.
+        SafeAsyncCallback.setUncaughtExceptionHandler(exceptionHandler);
+
+        final SynchronousQueue<Either<Object, Exception>> queue = new SynchronousQueue<Either<Object, Exception>>();
+
+        new Thread(tg, new Runnable() {
+            @Override
+            public void run() {
+                try {
+                    // Since zk is needed by almost everyone,try to see if we
+                    // need that first
+                    scheduler = Executors.newSingleThreadScheduledExecutor();
+                    serverChannelFactory = new NioServerSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+                            .newCachedThreadPool());
+                    clientChannelFactory = new NioClientSocketChannelFactory(Executors.newCachedThreadPool(), Executors
+                            .newCachedThreadPool());
+
+                    instantiateZookeeperClient();
+                    tm = instantiateTopicManager();
+                    pm = instantiatePersistenceManager(tm);
+                    dm = new FIFODeliveryManager(pm, conf);
+                    sm = instantiateSubscriptionManager(tm, pm);
+                    rm = instantiateRegionManager(pm, scheduler);
+                    sm.addListener(rm);
+
+                    allChannels = new DefaultChannelGroup("hedwig");
+                    // Initialize the Netty Handlers (used by the
+                    // UmbrellaHandler) once so they can be shared by
+                    // both the SSL and non-SSL channels.
+                    Map<OperationType, Handler> handlers = initializeNettyHandlers(tm, dm, pm, sm);
+                    // Initialize Netty for the regular non-SSL channels
+                    initializeNetty(null, handlers);
+                    if (conf.isSSLEnabled()) {
+                        initializeNetty(new SslServerContextFactory(conf), handlers);
+                    }
+                } catch (Exception e) {
+                    ConcurrencyUtils.put(queue, Either.right(e));
+                    return;
+                }
+
+                ConcurrencyUtils.put(queue, Either.of(new Object(), (Exception) null));
+            }
+
+        }).start();
+
+        Either<Object, Exception> either = ConcurrencyUtils.take(queue);
+        if (either.left() == null) {
+            throw either.right();
+        }
+    }
+
+    public PubSubServer(ServerConfiguration conf) throws Exception {
+        this(conf, new TerminateJVMExceptionHandler());
+    }
+
+    /**
+     * 
+     * @param msg
+     * @param rc
+     *            : code to exit with
+     */
+    public static void errorMsgAndExit(String msg, Throwable t, int rc) {
+        logger.fatal(msg, t);
+        System.err.println(msg);
+        System.exit(rc);
+    }
+
+    public final static int RC_INVALID_CONF_FILE = 1;
+    public final static int RC_MISCONFIGURED = 2;
+    public final static int RC_OTHER = 3;
+
+    /**
+     * @param args
+     */
+    public static void main(String[] args) {
+
+        logger.info("Attempting to start Hedwig");
+        ServerConfiguration conf = new ServerConfiguration();
+        if (args.length > 0) {
+            String confFile = args[0];
+            try {
+                conf.loadConf(new File(confFile).toURI().toURL());
+            } catch (MalformedURLException e) {
+                String msg = "Could not open configuration file: " + confFile;
+                errorMsgAndExit(msg, e, RC_INVALID_CONF_FILE);
+            } catch (ConfigurationException e) {
+                String msg = "Malformed configuration file: " + confFile;
+                errorMsgAndExit(msg, e, RC_MISCONFIGURED);
+            }
+            logger.info("Using configuration file " + confFile);
+        }
+        try {
+            new PubSubServer(conf);
+        } catch (Throwable t) {
+            errorMsgAndExit("Error during startup", t, RC_OTHER);
+        }
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/PubSubServerPipelineFactory.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,76 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+import org.jboss.netty.channel.Channels;
+import org.jboss.netty.handler.codec.frame.LengthFieldBasedFrameDecoder;
+import org.jboss.netty.handler.codec.frame.LengthFieldPrepender;
+import org.jboss.netty.handler.codec.protobuf.ProtobufDecoder;
+import org.jboss.netty.handler.codec.protobuf.ProtobufEncoder;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.server.ssl.SslServerContextFactory;
+
+public class PubSubServerPipelineFactory implements ChannelPipelineFactory {
+
+    // TODO: make these conf settings
+    final static int MAX_WORKER_THREADS = 32;
+    final static int MAX_CHANNEL_MEMORY_SIZE = 10 * 1024 * 1024;
+    final static int MAX_TOTAL_MEMORY_SIZE = 100 * 1024 * 1024;
+
+    private UmbrellaHandler uh;
+    private SslServerContextFactory sslFactory;
+    private int maxMessageSize;
+
+    /**
+     * 
+     * @param uh
+     * @param sslFactory
+     *            may be null if ssl is disabled
+     * @param cfg
+     */
+    public PubSubServerPipelineFactory(UmbrellaHandler uh, SslServerContextFactory sslFactory, int maxMessageSize) {
+        this.uh = uh;
+        this.sslFactory = sslFactory;
+        this.maxMessageSize = maxMessageSize;
+    }
+
+    public ChannelPipeline getPipeline() throws Exception {
+        ChannelPipeline pipeline = Channels.pipeline();
+        if (sslFactory != null) {
+            pipeline.addLast("ssl", new SslHandler(sslFactory.getEngine()));
+        }
+        pipeline.addLast("lengthbaseddecoder",
+                new LengthFieldBasedFrameDecoder(maxMessageSize, 0, 4, 0, 4));
+        pipeline.addLast("lengthprepender", new LengthFieldPrepender(4));
+
+        pipeline.addLast("protobufdecoder", new ProtobufDecoder(PubSubProtocol.PubSubRequest.getDefaultInstance()));
+        pipeline.addLast("protobufencoder", new ProtobufEncoder());
+
+        // pipeline.addLast("executor", new ExecutionHandler(
+        // new OrderedMemoryAwareThreadPoolExecutor(MAX_WORKER_THREADS,
+        // MAX_CHANNEL_MEMORY_SIZE, MAX_TOTAL_MEMORY_SIZE)));
+        //
+        // Dependency injection.
+        pipeline.addLast("umbrellahandler", uh);
+        return pipeline;
+    }
+}

Added: hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java
URL: http://svn.apache.org/viewvc/hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java?rev=987314&view=auto
==============================================================================
--- hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java (added)
+++ hadoop/zookeeper/trunk/src/contrib/hedwig/server/src/main/java/org/apache/hedwig/server/netty/UmbrellaHandler.java Thu Aug 19 21:25:13 2010
@@ -0,0 +1,158 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one
+ * or more contributor license agreements.  See the NOTICE file
+ * distributed with this work for additional information
+ * regarding copyright ownership.  The ASF licenses this file
+ * to you under the Apache License, Version 2.0 (the
+ * "License"); you may not use this file except in compliance
+ * with the License.  You may obtain a copy of the License at
+ *
+ *     http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.hedwig.server.netty;
+
+import java.io.IOException;
+import java.util.Map;
+
+import org.apache.log4j.Logger;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+import org.jboss.netty.channel.ChannelFutureListener;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelPipelineCoverage;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelHandler;
+import org.jboss.netty.channel.group.ChannelGroup;
+import org.jboss.netty.handler.codec.frame.CorruptedFrameException;
+import org.jboss.netty.handler.codec.frame.TooLongFrameException;
+import org.jboss.netty.handler.ssl.SslHandler;
+
+import org.apache.hedwig.exceptions.PubSubException.MalformedRequestException;
+import org.apache.hedwig.protocol.PubSubProtocol;
+import org.apache.hedwig.protocol.PubSubProtocol.OperationType;
+import org.apache.hedwig.protocol.PubSubProtocol.PubSubResponse;
+import org.apache.hedwig.protoextensions.PubSubResponseUtils;
+import org.apache.hedwig.server.handlers.ChannelDisconnectListener;
+import org.apache.hedwig.server.handlers.Handler;
+
+@ChannelPipelineCoverage("all")
+public class UmbrellaHandler extends SimpleChannelHandler {
+    static Logger logger = Logger.getLogger(UmbrellaHandler.class);
+
+    private Map<OperationType, Handler> handlers;
+    private ChannelGroup allChannels;
+    private ChannelDisconnectListener subscribeHandler;
+    private boolean isSSLEnabled = false;
+
+    public UmbrellaHandler(ChannelGroup allChannels, Map<OperationType, Handler> handlers,
+            boolean isSSLEnabled) {
+        this.allChannels = allChannels;
+        this.isSSLEnabled = isSSLEnabled;
+        this.handlers = handlers;
+        subscribeHandler = (ChannelDisconnectListener) handlers.get(OperationType.SUBSCRIBE);
+    }
+
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+        Throwable throwable = e.getCause();
+
+        // Add here if there are more exceptions we need to be able to tolerate.
+        // 1. IOException may be thrown when a channel is forcefully closed by
+        // the other end, or by the ProtobufDecoder when an invalid protobuf is
+        // received
+        // 2. TooLongFrameException is thrown by the LengthBasedDecoder if it
+        // receives a packet that is too big
+        // 3. CorruptedFramException is thrown by the LengthBasedDecoder when
+        // the length is negative etc.
+        if (throwable instanceof IOException || throwable instanceof TooLongFrameException
+                || throwable instanceof CorruptedFrameException) {
+            e.getChannel().close();
+            if (logger.isDebugEnabled()) {
+                logger.debug("Uncaught exception", throwable);
+            }
+        } else {
+            // call our uncaught exception handler, which might decide to
+            // shutdown the system
+            Thread thread = Thread.currentThread();
+            thread.getUncaughtExceptionHandler().uncaughtException(thread, throwable);
+        }
+
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        // If SSL is NOT enabled, then we can add this channel to the
+        // ChannelGroup. Otherwise, that is done when the channel is connected
+        // and the SSL handshake has completed successfully.
+        if (!isSSLEnabled) {
+            allChannels.add(ctx.getChannel());
+        }
+    }
+
+    @Override
+    public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        if (isSSLEnabled) {
+            ctx.getPipeline().get(SslHandler.class).handshake(e.getChannel()).addListener(new ChannelFutureListener() {
+                public void operationComplete(ChannelFuture future) throws Exception {
+                    if (future.isSuccess()) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("SSL handshake has completed successfully!");
+                        }
+                        allChannels.add(future.getChannel());
+                    } else {
+                        future.getChannel().close();
+                    }
+                }
+            });
+        }
+    }
+
+    @Override
+    public void channelDisconnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        Channel channel = ctx.getChannel();
+        // subscribe handler needs to know about channel disconnects
+        subscribeHandler.channelDisconnected(channel);
+        channel.close();
+    }
+
+    public static void sendErrorResponseToMalformedRequest(Channel channel, long txnId, String msg) {
+        if (logger.isDebugEnabled()) {
+            logger.debug("Malformed request from " + channel.getRemoteAddress() + " msg, = " + msg);
+        }
+        MalformedRequestException mre = new MalformedRequestException(msg);
+        PubSubResponse response = PubSubResponseUtils.getResponseForException(mre, txnId);
+        channel.write(response);
+    }
+
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+
+        if (!(e.getMessage() instanceof PubSubProtocol.PubSubRequest)) {
+            ctx.sendUpstream(e);
+            return;
+        }
+
+        PubSubProtocol.PubSubRequest request = (PubSubProtocol.PubSubRequest) e.getMessage();
+
+        Handler handler = handlers.get(request.getType());
+        Channel channel = ctx.getChannel();
+        long txnId = request.getTxnId();
+
+        if (handler == null) {
+            sendErrorResponseToMalformedRequest(channel, txnId, "Request type " + request.getType().getNumber()
+                    + " unknown");
+            return;
+        }
+
+        handler.handleRequest(request, channel);
+    }
+
+}