You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:51 UTC

[41/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
deleted file mode 100644
index 4c23d5c..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.DebugUtil;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.message.MessageUtil;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSubscriber;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-
-/**
- * Implementation of hedwig specific implementation. <br/>
- * JMS VIOLATION: This implementation creates a single backend hedwig connection PER session - and
- * DOES NOT share multiple sessoins on top of a single connection.
- * <p/>
- * This is a wilful violation of JMS specification, but exists only because Hedwig does not have
- * any notion to support this. <br/>
- * Once hedwig does allow for session multiplexing, we will need to revisit this (or create a new impl)
- * to take into account the changes.
- *
- */
-public class HedwigMessagingSessionFacade implements MessagingSessionFacade, MessageHandler {
-
-    private static final Logger logger = LoggerFactory.getLogger(HedwigMessagingSessionFacade.class);
-
-            // We simulate noLocal through the connection - which will be shared across sessions.
-    private final HedwigConnectionImpl connection;
-    private final SessionImpl session;
-    private HedwigClient hedwigClient;
-    private volatile boolean stopped = false;
-
-    /*
-     Hedwig server has a ack-until-N approach to acknoledgements : that is, if we acknowledge message N,
-     all previous N-1 message are also
-     acknowledged.
-     But hedwig-client DOES NOT support this : particularly in context of throttling.
-
-     So, when we are in CLIENT_ACKNOWLEDGE mode and NOT in transacted session, I am modifying the behavior
-     to mirror expectation of both
-     hedwig client and server here in SessionImpl itself (instead of facade where this probably belong better).
-
-     This approach does not seem to work fine due to implicit assumptions in hedwig client ... I am
-     modifying it in following way :
-     a) For each message receieved, maintain it in List.
-     b) Acknowledging a message means traversing this list to find message with same seq-id : and
-     acknowledge ALL message until that in the list.
-     Since hedwig does ack until, inctead of individual ack, this violation of JMS spec is consistent with hedwig.
-     Note that even though hedwig does ack until, hedwig client on other hand DOES NOT ! It will
-     throttle connection if we do not ack individually ...
-     sigh :-(
-      */
-    private final List<SessionImpl.ReceivedMessage> unAckMessageList = new LinkedList<SessionImpl.ReceivedMessage>();
-
-    // Both of these synchronized on deliveryStartInfoSet.
-    private final Set<DeliveryStartInfo> deliveryStartInfoSet = new HashSet<DeliveryStartInfo>(32);
-    private final Set<DeliveryStartInfo> subscribeInfoSet = new HashSet<DeliveryStartInfo>(32);
-
-    private static final class DeliveryStartInfo {
-        private final String topicName;
-        private final String subscriberId;
-
-        private DeliveryStartInfo(String subscriberId, String topicName) {
-            this.subscriberId = subscriberId;
-            this.topicName = topicName;
-        }
-
-        @Override
-        public boolean equals(Object o) {
-            if (this == o) return true;
-            if (o == null || getClass() != o.getClass()) return false;
-
-            DeliveryStartInfo that = (DeliveryStartInfo) o;
-
-            if (subscriberId != null ? !subscriberId.equals(that.subscriberId) : that.subscriberId != null)
-                return false;
-            if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false;
-
-            return true;
-        }
-
-        @Override
-        public int hashCode() {
-            int result = topicName != null ? topicName.hashCode() : 0;
-            result = 31 * result + (subscriberId != null ? subscriberId.hashCode() : 0);
-            return result;
-        }
-    }
-
-
-    public HedwigMessagingSessionFacade(HedwigConnectionImpl connection, SessionImpl session) throws JMSException {
-        this.connection = connection;
-        this.session = session;
-        // always create client ...
-        final ClientConfiguration cfg = connection.getHedwigClientConfig();
-        if (null == cfg) throw new JMSException("Unable to fetch client config ?");
-        this.hedwigClient = new HedwigClient(cfg);
-        resetStartInfoSet();
-    }
-
-    @Override
-    public void start() throws JMSException {
-        if (!connection.isInStartMode()) throw new JMSException("Connection not yet started ?");
-        if (logger.isTraceEnabled()) logger.trace("Creating HedwigClient");
-        // create only if there is need for it.
-        if (null == this.hedwigClient) {
-            this.hedwigClient = new HedwigClient(connection.getHedwigClientConfig());
-            resetStartInfoSet();
-        }
-        this.stopped = false;
-    }
-
-    @Override
-    public void stop() {
-        // stopping does not inhibit send.
-        if (logger.isTraceEnabled()) logger.trace("Stopping HedwigClient");
-        /*
-        HedwigClient client = this.hedwigClient;
-        this.hedwigClient = null;
-        client.close();
-        */
-        this.stopped = true;
-    }
-
-
-    @Override
-    public void close() {
-        HedwigClient client = this.hedwigClient;
-        resetStartInfoSet();
-
-        this.stopped = true;
-        this.hedwigClient = null;
-        if (logger.isTraceEnabled()) logger.trace("Closing HedwigClient");
-        client.close();
-    }
-
-    private void resetStartInfoSet(){
-        synchronized (deliveryStartInfoSet){
-            deliveryStartInfoSet.clear();
-            subscribeInfoSet.clear();
-        }
-    }
-
-    @Override
-    public DestinationType findDestinationType(String destination) throws JMSException {
-        // TODO: For now, we support ONLY topic's, so always returning that.
-        return DestinationType.TOPIC;
-    }
-
-    @Override
-    public DestinationType findDestinationType(Destination destination) throws JMSException {
-        if (destination instanceof Topic) return DestinationType.TOPIC;
-        if (destination instanceof Queue) return DestinationType.QUEUE;
-
-        // TODO: For now, we support ONLY topic's, so always returning that when unknown.
-        return DestinationType.TOPIC;
-    }
-
-    @Override
-    public TopicPublisher createTopicPublisher(Destination destination) throws JMSException {
-        return new TopicPublisherImpl(this, session, null != destination ?
-            session.createTopic(session.toName(destination)) : null);
-    }
-
-    @Override
-    public TopicSubscriber createTopicSubscriber(Destination destination) throws JMSException {
-        session.subscriberCreated();
-        connection.initConnectionClientID();
-        return new TopicSubscriberImpl(session, session.createTopic(session.toName(destination)),
-                session.createSubscriberId(SessionImpl.generateRandomString()), true);
-    }
-
-    @Override
-    public TopicSubscriber createTopicSubscriber(Destination destination,
-                                                 String messageSelector, boolean noLocal) throws JMSException {
-        session.subscriberCreated();
-        connection.initConnectionClientID();
-        return new TopicSubscriberImpl(session,
-                session.createTopic(session.toName(destination)),
-                session.createSubscriberId(SessionImpl.generateRandomString()), messageSelector, noLocal, true);
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId) throws JMSException {
-        if (null != session.getMessageListener()) {
-            throw new JMSException("Message listener is set - not other form of message receipt can be used");
-        }
-        session.subscriberCreated();
-
-        TopicSubscriberImpl subscriber = new TopicSubscriberImpl(session, topic, subscribedId, false);
-        subscriber.start();
-        return subscriber;
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId,
-                                                   String messageSelector, boolean noLocal) throws JMSException {
-        if (null != session.getMessageListener()) {
-            throw new JMSException("Message listener is set - not other form of message receipt can be used");
-        }
-        session.subscriberCreated();
-        connection.initConnectionClientID();
-
-        return new TopicSubscriberImpl(session, topic, subscribedId, messageSelector, noLocal, false);
-    }
-
-    /*
-    @Override
-    public void unsubscribe(String subscriberId) throws JMSException {
-        throw new JMSException("Hedwig requires BOTH topic name and subscriberId to unsubscribe -
-        unlike JMS. Need to figure this out.");
-    }
-    */
-
-    // Note: order SENSITIVE !!
-    @Override
-    public void registerUnAcknowledgedMessage(SessionImpl.ReceivedMessage message) {
-        synchronized (unAckMessageList){
-            unAckMessageList.add(message);
-        }
-    }
-
-    @Override
-    // public void acknowledge(String topicName, String subscriberId, String jmsMessageID)
-    public void acknowledge(MessageImpl message) throws JMSException {
-        if (this.stopped || null == hedwigClient)
-          throw new javax.jms.IllegalStateException("session in stopped or closed state, cant acknowledge message");
-
-        /*
-         This approach does not seem to work fine due to implicit assumptions in hedwig client ...
-         I am modifying it in following way :
-         a) For each message receieved, maintain it in List.
-         b) Acknowledging a message means traversing this list to find message with same seq-id :
-         and acknowledge ALL message until that in the list.
-         Since hedwig does ack until, inctead of individual ack, this violation of JMS spec is consistent with hedwig.
-         Note that even though hedwig does ack until, hedwig client on other hand DOES NOT ! It will
-          throttle connection if we do not ack individually ...
-         sigh :-(
-          */
-        // sendAcknowledge(topicName, subscriberId, seqId);
-
-        LinkedList<SessionImpl.ReceivedMessage> ackList = new LinkedList<SessionImpl.ReceivedMessage>();
-        synchronized (unAckMessageList){
-            // Should I simply copy and release ?
-            ListIterator<SessionImpl.ReceivedMessage> iter = unAckMessageList.listIterator();
-
-            boolean found = false;
-            while (iter.hasNext()){
-                if (iter.next().originalMessage.getServerJmsMessageId().equals(message.getServerJmsMessageId())){
-                    found = true;
-                    break;
-                }
-            }
-
-            // probably already acknowledged ?
-            if (!found) return ;
-            while (iter.hasPrevious()){
-                ackList.addFirst(iter.previous());
-                iter.remove();
-            }
-        }
-
-        // Now acknowledge the messages in ackList by running its runnable.
-        if (logger.isTraceEnabled()) {
-            logger.trace("facade acknowledge ackList (" + ackList.size() + ") ... " + ackList);
-        }
-        for (SessionImpl.ReceivedMessage msg : ackList){
-            try {
-                msg.originalMessage.getAckRunnable().run();
-            } catch (Exception ex){
-                // Ignore any exception thrown.
-                if (logger.isDebugEnabled()) {
-                    logger.debug("Ignoring exception thrown while acknowledging messages", ex);
-                }
-            }
-        }
-
-    }
-
-    private void sendAcknowledge(String topicName, String subscriberId, PubSubProtocol.MessageSeqId seqId)
-        throws JMSException {
-
-        if (logger.isTraceEnabled()) logger.trace("Acknowledging " +
-            MessageUtil.generateJMSMessageIdFromSeqId(seqId) + " for " + topicName + " by " + subscriberId);
-        try {
-            hedwigClient.getSubscriber().consume(ByteString.copyFromUtf8(topicName),
-                ByteString.copyFromUtf8(subscriberId), seqId);
-        } catch (PubSubException.ClientNotSubscribedException e) {
-            JMSException jEx = new JMSException("Client not subscribed .. " + e);
-            jEx.setLinkedException(e);
-            throw jEx;
-        }
-    }
-
-
-    public void subscribeToTopic(String topicName, String subscribedId) throws JMSException {
-        if (null == hedwigClient)
-          throw new javax.jms.IllegalStateException("session in closed state, cant subscribe to topic " + topicName);
-
-        final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
-        final boolean start;
-        synchronized (deliveryStartInfoSet){
-            start =  ! subscribeInfoSet.contains(info);
-
-            if (start) {
-                subscribeInfoSet.add(info);
-            }
-        }
-
-        if (! start) {
-            if (logger.isDebugEnabled()) logger.debug("Client already subscribed ?");
-            return ;
-        }
-
-        try {
-            SubscriptionOptions opts = SubscriptionOptions.newBuilder()
-                .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
-            hedwigClient.getSubscriber().subscribe(ByteString.copyFromUtf8(topicName),
-                    ByteString.copyFromUtf8(subscribedId), opts);
-        } catch (PubSubException.CouldNotConnectException e) {
-            JMSException je = new JMSException("receive failed, could not connect .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        } catch (PubSubException.ClientAlreadySubscribedException e) {
-            JMSException je = new JMSException("receive failed, already subscribed .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        } catch (PubSubException.ServiceDownException e) {
-            JMSException je = new JMSException("receive failed, hedwig service down .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        } catch (InvalidSubscriberIdException e) {
-            JMSException je = new JMSException("receive failed, invalid subscriber .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        }
-    }
-
-    public void unsubscribeFromTopic(String topicName, String subscribedId) throws JMSException {
-        if (null == hedwigClient)
-          throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
-
-        // Also implies removal of delivery, right ?
-        final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
-        synchronized (deliveryStartInfoSet){
-            deliveryStartInfoSet.remove(info);
-            subscribeInfoSet.remove(info);
-        }
-
-        try {
-            hedwigClient.getSubscriber().unsubscribe(ByteString.copyFromUtf8(topicName),
-                ByteString.copyFromUtf8(subscribedId));
-        } catch (PubSubException.CouldNotConnectException e) {
-            JMSException je = new JMSException("receive failed, could not connect .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        } catch (PubSubException.ServiceDownException e) {
-            JMSException je = new JMSException("receive failed, hedwig service down .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        } catch (InvalidSubscriberIdException e) {
-            JMSException je = new JMSException("receive failed, invalid subscriber .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        } catch (PubSubException.ClientNotSubscribedException e) {
-            JMSException je = new JMSException("receive failed, client not subscribed .. " + e);
-            je.setLinkedException(e);
-            throw je;
-        }
-    }
-
-    public void stopTopicDelivery(String topicName, String subscribedId) throws JMSException {
-        if (null == hedwigClient)
-          throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
-
-        DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
-        synchronized (deliveryStartInfoSet){
-            deliveryStartInfoSet.remove(info);
-        }
-
-        try {
-            hedwigClient.getSubscriber().stopDelivery(ByteString.copyFromUtf8(topicName),
-                ByteString.copyFromUtf8(subscribedId));
-        } catch (PubSubException.ClientNotSubscribedException e) {
-            if (logger.isTraceEnabled()) logger.trace("Client not subscribed or already unsubscribed ? ", e);
-        }
-    }
-
-    public void startTopicDelivery(String topicName, String subscribedId) throws JMSException {
-        if (null == hedwigClient)
-          throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
-
-        final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
-        final boolean start;
-        synchronized (deliveryStartInfoSet){
-            start =  ! deliveryStartInfoSet.contains(info);
-
-            if (start) {
-                deliveryStartInfoSet.add(info);
-            }
-        }
-
-        if (! start) {
-            if (logger.isDebugEnabled()) logger.debug("Client already started delivery ?");
-            return ;
-        }
-
-        try {
-            if (logger.isTraceEnabled()) logger.trace("Start topic delivery for " + topicName +
-                ", subscriberId " + subscribedId);
-            hedwigClient.getSubscriber().startDelivery(ByteString.copyFromUtf8(topicName),
-                ByteString.copyFromUtf8(subscribedId), this);
-            if (logger.isTraceEnabled()) logger.trace("Start topic delivery for " + topicName +
-                ", subscriberId " + subscribedId + " DONE");
-        } catch (PubSubException.ClientNotSubscribedException e) {
-            if (logger.isDebugEnabled()) logger.debug("Client not subscribed or already unsubscribed ? ", e);
-        } catch (AlreadyStartDeliveryException e) {
-            if (logger.isDebugEnabled()) logger.debug("Client already started delivery ? ", e);
-        }
-    }
-
-    @Override
-    public void deliver(ByteString topic, ByteString subscriberId, PubSubProtocol.Message msg,
-                        final Callback<Void> callback, final Object context) {
-        // Deliver the message to the session.
-
-        if (this.stopped) {
-            if (logger.isDebugEnabled()) logger.debug("Ignoring message while in stopped mode .. topic - " +
-                topic.toStringUtf8() + ", subscriber - " + subscriberId.toStringUtf8() + ", msg - " + msg);
-            return ;
-        }
-
-        if (logger.isTraceEnabled()) logger.trace("recieved message from server : topic - " +
-                topic.toStringUtf8() + ", subscriber - " + subscriberId.toStringUtf8() + ", msg - " + msg);
-
-        // I am assuming that we can defer the acknowledgement of the message ...
-        final String topicName = topic.toStringUtf8();
-        final String sid = subscriberId.toStringUtf8();
-        final PubSubProtocol.MessageSeqId seqId = msg.getMsgId();
-        final Runnable ack = new Runnable(){
-            public void run() {
-                callback.operationFinished(context, null);
-                // Only when auto-send is NOT enabled.
-                if (! connection.getHedwigClientConfig().isAutoSendConsumeMessageEnabled()) {
-                    try {
-                        sendAcknowledge(topicName, sid, seqId);
-                    } catch (JMSException e) {
-                        if (logger.isDebugEnabled()) {
-                            logger.debug("Unable to send acknowledgement ... " + topicName + ", " +
-                                sid + ", seqId : " + seqId);
-                            DebugUtil.dumpJMSStacktrace(logger, e);
-                        }
-                    }
-                }
-            }
-        };
-
-        try {
-            if (logger.isTraceEnabled()) logger.trace("Pushing to session " + session);
-
-            MessageImpl messageImpl = MessageUtil.processHedwigMessage(session, msg, topicName, sid, ack);
-            session.messageReceived(messageImpl, DestinationType.TOPIC);
-        } catch (JMSException e) {
-            // Unable to process the incoming message - log and ignore ?
-            if (logger.isDebugEnabled()) {
-                logger.debug("Unable to consume message");
-                DebugUtil.dumpJMSStacktrace(logger, e);
-            }
-        }
-    }
-
-    public String getSubscriberId(TopicSubscriber topicSubscriber) throws JMSException {
-        if (! (topicSubscriber instanceof TopicSubscriberImpl) )
-          throw new JMSException("TopicSubscriber not instanceof of TopicSubscriberImpl ? " +
-              topicSubscriber.getClass());
-
-        return ((TopicSubscriberImpl) topicSubscriber).getSubscriberId();
-    }
-
-    @Override
-    public boolean enqueueReceivedMessage(MessageConsumer messageConsumer, SessionImpl.ReceivedMessage receivedMessage,
-                                          boolean addFirst) throws JMSException {
-        if (! (messageConsumer instanceof TopicSubscriberImpl) )
-          throw new JMSException("TopicSubscriber not instanceof of TopicSubscriberImpl ? " +
-              messageConsumer.getClass());
-
-        return ((TopicSubscriberImpl) messageConsumer).enqueueReceivedMessage(receivedMessage, addFirst);
-    }
-
-    public Publisher getPublisher() throws javax.jms.IllegalStateException {
-        if (null == hedwigClient)
-          throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
-        return hedwigClient.getPublisher();
-    }
-
-    public String publish(String topicName, MessageImpl message) throws JMSException {
-        try {
-            PubSubProtocol.PublishResponse response = getPublisher().publish(
-                ByteString.copyFromUtf8(topicName), message.generateHedwigMessage());
-            PubSubProtocol.MessageSeqId seqId =
-                (null != response && response.hasPublishedMsgId() ? response.getPublishedMsgId() : null);
-            if (null == seqId){
-                // if (logger.isDebugEnabled())
-                // logger.debug("Unexpected NOT to receive the sequence id in response to publish " + response);
-                logger.warn("Unexpected NOT to receive the sequence id in response to publish " + response);
-                return null;
-            }
-
-            return MessageUtil.generateJMSMessageIdFromSeqId(seqId);
-        } catch (PubSubException.CouldNotConnectException e) {
-            JMSException jmsEx = new JMSException("Cant publish to " + topicName + " .. " + e);
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        } catch (PubSubException.ServiceDownException e) {
-            JMSException jmsEx = new JMSException("Cant publish to " + topicName + " .. " + e);
-            jmsEx.setLinkedException(e);
-            throw jmsEx;
-        }
-    }
-
-    // Queue methods which are NOT supported yet.
-    @Override
-    public QueueSender createQueueSender(Destination destination) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public QueueReceiver createQueueReceiver(Destination destination) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public QueueReceiver createQueueReceiver(Destination destination, String messageSelector,
-                                             boolean noLocal) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public String getSubscriberId(QueueReceiver queueReceiver) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public void stopQueueDelivery(String queueName, String subscribedId) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public void startQueueDelivery(String queueName, String subscriberId) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public TemporaryTopic createTemporaryTopic() throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-    @Override
-    public TemporaryQueue createTemporaryQueue() throws JMSException {
-        throw new JMSException("hedwig does not support queues yet");
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java
deleted file mode 100644
index a13e259..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.selector.Node;
-import org.apache.hedwig.jms.selector.ParseException;
-import org.apache.hedwig.jms.selector.SelectorParser;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
-/**
- * Base class for consumers ...
- */
-public abstract class MessageConsumerImpl implements MessageConsumer {
-    private final String messageSelector;
-    private final Node selectorAst;
-    // volatile to prevent need to lock and ensure visibility of mods across threads.
-    private volatile MessageListener messageListener;
-
-    protected MessageConsumerImpl(String msgSelector) throws InvalidSelectorException {
-        {
-            msgSelector = null != msgSelector ? msgSelector.trim() : null;
-            this.messageSelector = (null == msgSelector || 0 == msgSelector.length()) ?
-                null : msgSelector;
-        }
-        try {
-            this.selectorAst = null == this.messageSelector ?
-                null : SelectorParser.parseMessageSelector(this.messageSelector);
-        } catch (ParseException pEx) {
-            InvalidSelectorException jmsEx =
-                new InvalidSelectorException("Unable to parse selector '" + this.messageSelector + "'");
-            jmsEx.setLinkedException(pEx);
-            throw jmsEx;
-        }
-    }
-
-    @Override
-    public String getMessageSelector() {
-        return messageSelector;
-    }
-
-    public Node getSelectorAst() {
-        return selectorAst;
-    }
-
-    @Override
-    public MessageListener getMessageListener() {
-        return messageListener;
-    }
-
-    @Override
-    public void setMessageListener(MessageListener messageListener) throws JMSException {
-        this.messageListener = messageListener;
-    }
-
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java
deleted file mode 100644
index caf4b3e..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.SessionImpl;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-
-/**
- *
- */
-public abstract class MessageProducerImpl implements MessageProducer {
-
-    static final int DEFAULT_PRIORITY = 4;
-
-    private final SessionImpl session;
-
-    // We dont really use this - since we always populate message-id : found in response of publish.
-    private boolean disableMessageID = false;
-    // We can support this, but dont - will overly complicate some aspects of the code : deferring for now
-    // (we will need to pass this around along all failure paths).
-    private boolean disableMessageTimestamp = false;
-    // Hedwig supports only PERSISTENT mode, so setting to anytihng else will just cause it to be ignored.
-    private int deliveryMode = DeliveryMode.PERSISTENT;
-    // Hedwig does not support priorities, so everything is at default priority !
-    // this does not influence actual message delivery.
-    private int defaultPriority = DEFAULT_PRIORITY;
-    // Hedwig does not support TTL (iirc), so we allow setting/querying this, but it has no
-    // actual impact on the message delivery/expiry.
-    private long timeToLive = 0;
-
-    protected MessageProducerImpl(SessionImpl session) {
-        this.session = session;
-    }
-
-    @Override
-    public void setDisableMessageID(boolean disableMessageID) throws JMSException {
-        this.disableMessageID = disableMessageID;
-    }
-
-    @Override
-    public boolean getDisableMessageID() throws JMSException {
-        return disableMessageID;
-    }
-
-    protected SessionImpl getSession() {
-        return session;
-    }
-
-
-    @Override
-    public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
-        this.disableMessageTimestamp = disableMessageTimestamp;
-    }
-
-    @Override
-    public boolean getDisableMessageTimestamp() throws JMSException {
-        return disableMessageTimestamp;
-    }
-
-    @Override
-    public void setDeliveryMode(int deliveryMode) throws JMSException {
-        if (DeliveryMode.NON_PERSISTENT != deliveryMode &&
-            DeliveryMode.PERSISTENT != deliveryMode) {
-            throw new JMSException("Invalid delivery mode specified : " + deliveryMode);
-        }
-
-        // if (DeliveryMode.NON_PERSISTENT == deliveryMode)
-        // throw new JMSException("non-persistent delivery mode is not yet supported");
-        this.deliveryMode = deliveryMode;
-    }
-
-    @Override
-    public int getDeliveryMode() throws JMSException {
-        return deliveryMode;
-    }
-
-
-    @Override
-    public void setPriority(int defaultPriority) throws JMSException {
-        // Not supported, we simply allow it to be set and retrieved ...
-        this.defaultPriority = defaultPriority;
-    }
-
-    @Override
-    public int getPriority() throws JMSException {
-        return defaultPriority;
-    }
-
-
-    @Override
-    public void setTimeToLive(long timeToLive) throws JMSException {
-        this.timeToLive = timeToLive;
-    }
-
-    @Override
-    public long getTimeToLive() throws JMSException {
-        return timeToLive;
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java
deleted file mode 100644
index 2beeea7..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.ConnectionImpl;
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-/**
- * Queue specific impl
- */
-public class QueueSessionImpl extends SessionImpl implements QueueSession {
-
-    public QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException {
-        super(connection, transacted, acknowledgeMode);
-    }
-
-    @Override
-    public QueueReceiver createReceiver(Queue queue) throws JMSException {
-        return super.createReceiverImpl(queue);
-    }
-
-    @Override
-    public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
-        return super.createReceiverImpl(queue, messageSelector);
-    }
-
-    @Override
-    public QueueSender createSender(Queue queue) throws JMSException {
-        return super.createSenderImpl(queue);
-    }
-
-    // JMS requires these methods cant be called on QueueSession.
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId) throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
-    }
-
-    @Override
-    public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId, String messageSelector,
-                                                   boolean noLocal) throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
-    }
-
-    @Override
-    public TemporaryTopic createTemporaryTopic() throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
-    }
-
-    @Override
-    public void unsubscribe(String subscribedId) throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
-    }
-
-  @Override
-    public Topic createTopic(String topicName) throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java
deleted file mode 100644
index 23dfb54..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.message.MessageUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-
-/**
- *
- */
-public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher {
-
-    private static final Logger logger = LoggerFactory.getLogger(TopicPublisherImpl.class);
-
-    private final HedwigMessagingSessionFacade facade;
-    private final Topic topic;
-
-    public TopicPublisherImpl(HedwigMessagingSessionFacade facade, SessionImpl session, Topic topic) {
-        super(session);
-        this.facade = facade;
-        this.topic = topic;
-    }
-
-    @Override
-    public Topic getTopic() throws JMSException {
-        return topic;
-    }
-
-    @Override
-    public void publish(Message message) throws JMSException {
-        if (null == getTopic()) throw new UnsupportedOperationException("Need to specify topic");
-        publish(getTopic(), message);
-    }
-
-    @Override
-    public void publish(Topic topic, Message message) throws JMSException {
-        publish(topic, message, getDeliveryMode(), getPriority(), getTimeToLive());
-    }
-
-    @Override
-    public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
-        if (null == getTopic()) throw new UnsupportedOperationException("Need to specify topic");
-        publish(getTopic(), message, deliveryMode, priority, timeToLive);
-    }
-
-    // all publish/send methods delegate to this ...
-    @Override
-    public void publish(final Topic topic, final Message message, final int deliveryMode,
-                        final int priority, final long timeToLive) throws JMSException {
-
-        // Simulating this in provider ...
-        // if (0 != timeToLive) throw new JMSException("We do not support TTL for messages right now.
-        // Specified TTL : " + timeToLive);
-
-        if (MessageProducerImpl.DEFAULT_PRIORITY != priority) {
-            if (logger.isInfoEnabled())
-              logger.info("We do not support message priorities right now. Specified priority : " + priority);
-        }
-        if (DeliveryMode.PERSISTENT != deliveryMode) {
-            if (logger.isInfoEnabled())
-              logger.info("We support only PERSISTENT delivery mode. Unsupported mode : " + deliveryMode);
-        }
-
-        if (null == topic){
-            throw new InvalidDestinationException("Topic must be specified to publish");
-        }
-
-        final MessageImpl copiedMessageImpl;
-        if (message instanceof MessageImpl) copiedMessageImpl = MessageUtil.createCloneForDispatch(
-            getSession(), (MessageImpl) message, topic.getTopicName(), null);
-        else copiedMessageImpl = MessageUtil.createMessageCopy(getSession(), message);
-
-        // Note: Ensure that we set properties below on both message (user input) and copiedMessageImpl
-        // (the cloned/copied message).
-        // We are doing set on both instead of set followed by close/copy to prevent cases where message
-        // implementation drops
-        // headers (like our own impl earlier !)
-
-        // priority ...
-        {
-            // Set the message priority
-            // 3.4.10 JMSPriority "When a message is sent, this field is ignored. After completion of
-            // the send, it holds the value specified by the method sending the message."
-            // On other hand, we have
-            // 3.4.12 Overriding Message Header Fields : "JMS permits an administrator to configure
-            // JMS to override the client-specified
-            // values for JMSDeliveryMode, JMSExpiration and JMSPriority. If this is done, the header
-            // field value must reflect the
-            // administratively specified value."
-            // For now, to unblock testcases, setting to msgPriority :-) Actually, I think we should
-            // set it to Message.DEFAULT_PRIORITY ...
-            message.setJMSPriority(priority);
-            copiedMessageImpl.setJMSPriority(priority);
-            // message.setJMSPriority(Message.DEFAULT_PRIORITY);
-            // copiedMessageImpl.setJMSPriority(Message.DEFAULT_PRIORITY);
-        }
-
-        // delivery mode ...
-        {
-
-            // 3.4.2 JMSDeliveryMode "The JMSDeliveryMode header field contains the delivery mode
-            // specified when the message was sent.
-            // When a message is sent, this field is ignored. After completion of the send, it holds
-            // the delivery mode specified by the sending method."
-            message.setJMSDeliveryMode(deliveryMode);
-            copiedMessageImpl.setJMSDeliveryMode(deliveryMode);
-        }
-
-        // destination ...
-        {
-            // 3.4.1 JMSDestination "The JMSDestination header field contains the destination to which
-            // the message is being sent.
-            // When a message is sent, this field is ignored. After completion of the send, it holds
-            // the destination object
-            // specified by the sending method. When a message is received, its destination value
-            // must be equivalent to the
-            // value assigned when it was sent."
-            message.setJMSDestination(getSession().createTopic(topic.getTopicName()));
-            copiedMessageImpl.setJMSDestination(getSession().createTopic(topic.getTopicName()));
-        }
-
-        {
-            // 3.4.4 JMSTimestamp
-            // "The JMSTimestamp header field contains the time a message was handed off to a provider to be sent.
-            // It is not the time the message was actually transmitted because the actual send may occur later
-            // due to transactions or other client side queueing of messages."
-            final long timestamp = SessionImpl.currentTimeMillis();
-            message.setJMSTimestamp(timestamp);
-            copiedMessageImpl.setJMSTimestamp(timestamp);
-        }
-
-        if (timeToLive > 0) {
-            final long expiryTime = SessionImpl.currentTimeMillis() + timeToLive;
-            message.setJMSExpiration(expiryTime);
-            copiedMessageImpl.setJMSExpiration(expiryTime);
-        }
-        else {
-            // no expiry.
-            message.setJMSExpiration(0);
-        }
-
-
-        if (getSession().getTransacted()){
-            // enqueue if within transactions.
-            getSession().enqueuePublishWithinTransaction(topic.getTopicName(), copiedMessageImpl, message);
-            return ;
-        }
-
-        if (logger.isTraceEnabled()) logger.trace("Publishing message ... recepient " + topic.getTopicName());
-        // facade.getPublisher().publish(ByteString.copyFromUtf8(topic.getTopicName()),
-        // copiedMessageImpl.generateHedwigMessage(this));
-        String msgId = facade.publish(topic.getTopicName(), copiedMessageImpl);
-        getSession().addToLocallyPublishedMessageIds(msgId);
-        if (message instanceof MessageImpl) ((MessageImpl) message).setJMSMessageIDInternal(msgId);
-        else message.setJMSMessageID(msgId);
-
-        if (logger.isTraceEnabled()) logger.trace("Publishing message ... recepient " +
-            topic.getTopicName() + ", msgId : " + msgId + " DONE");
-
-        // This is not required, we already do this as part of copiedMessageImpl.generateHedwigMessage()
-        // message.setJMSTimestamp(SessionImpl.currentTimeMillis());
-
-    }
-
-    @Override
-    public Destination getDestination() throws JMSException {
-        return topic;
-    }
-
-    @Override
-    public void close() throws JMSException {
-        // This will be a noop actually ... session.close() takes care of closing the publisher.
-    }
-
-    @Override
-    public void send(Message message) throws JMSException {
-        publish(message);
-    }
-
-    @Override
-    public void send(Destination destination, Message message) throws JMSException {
-        if (!(destination instanceof Topic))
-          throw new JMSException("Expected destination to be a Topic : " + destination);
-        publish((Topic) destination, message);
-    }
-
-    @Override
-    public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
-        publish(message, deliveryMode, priority, timeToLive);
-    }
-
-    @Override
-    public void send(Destination destination, Message message, int deliveryMode,
-                     int priority, long timeToLive) throws JMSException {
-        if (!(destination instanceof Topic))
-          throw new JMSException("Expected destination to be a Topic : " + destination);
-
-        publish((Topic) destination, message, deliveryMode, priority, timeToLive);
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java
deleted file mode 100644
index e96f998..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.ConnectionImpl;
-import org.apache.hedwig.jms.SessionImpl;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.TemporaryQueue;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-/**
- * Topic specific impl
- */
-public class TopicSessionImpl extends SessionImpl implements TopicSession {
-
-    public TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException {
-        super(connection, transacted, acknowledgeMode);
-    }
-
-    @Override
-    public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
-        return super.createSubscriberImpl(topic);
-    }
-
-    @Override
-    public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
-        return super.createSubscriberImpl(topic, messageSelector, noLocal);
-    }
-
-    @Override
-    public TopicPublisher createPublisher(Topic topic) throws JMSException {
-        return super.createPublisherImpl(topic);
-    }
-
-    @Override
-    public TemporaryQueue createTemporaryQueue() throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
-    }
-
-    @Override
-    public Queue createQueue(String queueName) throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue) throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
-    }
-
-    @Override
-    public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
-        throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java
deleted file mode 100644
index 4a51e8d..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements.  See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership.  The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.DebugUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Subscriber to a topic.
- *
- */
-public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber {
-
-    private static final Logger logger = LoggerFactory.getLogger(TopicSubscriberImpl.class);
-
-    private final SessionImpl session;
-    private final Topic topic;
-    private final String subscriberId;
-    private final boolean noLocal;
-
-    private final boolean forceUnsubscribe;
-    private volatile boolean registered = false;
-    private boolean closed = false;
-
-    // Any publically exposed object MUST NOT rely on 'this' for its locking semantics unless it is
-    // explicitly exposing this behavior.
-    private final Object lockObject = new Object();
-    private final LinkedList<SessionImpl.ReceivedMessage> pendingMessageList
-        = new LinkedList<SessionImpl.ReceivedMessage>();
-
-    public TopicSubscriberImpl(SessionImpl session, Topic topic, String subscriberId,
-                               boolean forceUnsubscribe) throws JMSException {
-        super(null);
-        this.session = session;
-        this.topic = topic;
-        this.subscriberId = subscriberId;
-        // default is false right ?
-        this.noLocal = false;
-        this.forceUnsubscribe = forceUnsubscribe;
-
-        // I am not sure if we have to register with session immediately on create or not ...
-        registerWithSession();
-    }
-
-    public TopicSubscriberImpl(SessionImpl session, Topic topic, String subscriberId,
-                               String messageSelector, boolean noLocal, boolean forceUnsubscribe) throws JMSException {
-        super(messageSelector);
-        this.session = session;
-        this.topic = topic;
-        this.subscriberId = subscriberId;
-
-        this.noLocal = noLocal;
-        this.forceUnsubscribe = forceUnsubscribe;
-
-        if (null == getSelectorAst()){
-            // Only if NOT empty string - treat empty string as null selector spec.
-            if (null != messageSelector && 0 != messageSelector.trim().length()){
-                throw new InvalidSelectorException("Invalid selector specified '" + messageSelector + "'");
-            }
-        }
-        else {
-            session.registerTopicSubscriptionInfo(new SessionImpl.TopicSubscription(topic.getTopicName(),
-                subscriberId), getSelectorAst());
-        }
-
-        // I am not sure if we have to register with session immediately on create or not ...
-        registerWithSession();
-    }
-
-    @Override
-    public Topic getTopic() {
-        return topic;
-    }
-
-    @Override
-    public boolean getNoLocal() {
-        return noLocal;
-    }
-
-    public String getSubscriberId() {
-        return subscriberId;
-    }
-
-    @Override
-    public void setMessageListener(MessageListener messageListener) throws JMSException {
-        super.setMessageListener(messageListener);
-        registerWithSession();
-    }
-
-    private void registerWithSession() throws JMSException {
-
-        // Fail fast ... volatile perf hit is ok in comparison to rest.
-        if (this.registered) return ;
-
-        final boolean register;
-        synchronized (lockObject){
-            // if (closed) throw new JMSException("Already closed");
-            if (closed) return ;
-
-            if (!this.registered) {
-                this.registered = true;
-                register = true;
-            }
-            else register = false;
-        }
-        if (register) this.session.registerTopicSubscriber(this);
-    }
-
-    @Override
-    public Message receive() throws JMSException {
-        return receive(0);
-    }
-
-
-    @Override
-    public Message receive(final long maxTimeout) throws JMSException {
-        return receiveImpl(maxTimeout, true);
-    }
-
-    private Message receiveImpl(final long maxTimeout, boolean canWait) throws JMSException {
-        final long waitTimeout;
-        final long startTime;
-
-        // periodically wake up !
-        if (canWait){
-            if (maxTimeout <= 0) waitTimeout = 1000;
-            else {
-                long duration = maxTimeout / 16;
-                if (duration <= 0) duration = 1;
-                waitTimeout = duration;
-            }
-            startTime = SessionImpl.currentTimeMillis();
-        }
-        else {
-            waitTimeout = 0;
-            startTime = 0;
-        }
-
-        registerWithSession();
-
-        // check before lock ...
-        if (null != getMessageListener()) {
-          throw new javax.jms.IllegalStateException(
-                  "There is a message listener already subscribed for this subscriber");
-        }
-
-        final SessionImpl.ReceivedMessage message;
-        final List<SessionImpl.ReceivedMessage> ackList = new ArrayList<SessionImpl.ReceivedMessage>(4);
-
-        synchronized (lockObject){
-
-outer:
-            while (true) {
-
-                // Should we ignore cached messages instead of this ?
-                // Once closed, wont help much anyway, right ?
-                if (closed) {
-                    message = null;
-                    break outer;
-                }
-
-                // While we waited, it could have been set.
-                if (null != getMessageListener()) {
-                  throw new javax.jms.IllegalStateException(
-                          "There is a message listener already subscribed for this subscriber");
-                }
-
-                while (canWait && pendingMessageList.isEmpty()){
-
-                    // Should we ignore cached messages instead of this ?
-                    // Once closed, wont help much anyway, right ?
-                    if (closed) {
-                        message = null;
-                        break outer;
-                    }
-
-                    if (0 != maxTimeout && startTime + maxTimeout < SessionImpl.currentTimeMillis()) {
-                        message = null;
-                        break outer;
-                    }
-
-                    try {
-                        lockObject.wait(waitTimeout);
-                    } catch (InterruptedException iEx){
-                        JMSException jEx = new JMSException("Interrupted .. " + iEx);
-                        jEx.setLinkedException(iEx);
-                        throw jEx;
-                    }
-                }
-
-
-                if (pendingMessageList.isEmpty()) {
-                    message = null;
-                    break outer;
-                }
-                SessionImpl.ReceivedMessage tmessage = pendingMessageList.remove();
-                ackList.add(tmessage);
-
-                if (noLocal){
-                    if (session.isLocallyPublished(tmessage.originalMessage.getJMSMessageID())){
-                        // find next message.
-                        continue;
-                    }
-                }
-                if (session.isMessageExpired(tmessage.originalMessage)) continue;
-                // use this message then.
-                message = tmessage;
-                break;
-            }
-        }
-
-        if (logger.isTraceEnabled()) logger.trace("Acklist receive (" + ackList.size() + ") ... " + ackList);
-        for (SessionImpl.ReceivedMessage ackMessage : ackList){
-            session.handleAutomaticMessageAcknowledgement(ackMessage, this);
-        }
-
-        if (logger.isTraceEnabled()) logger.trace("receive response " + (null != message ? message.msg : null));
-        return null != message ? message.msg : null;
-   }
-
-    @Override
-    public Message receiveNoWait() throws JMSException {
-        return receiveImpl(0, false);
-    }
-
-    @Override
-    public void close() throws JMSException {
-
-        final boolean unregister;
-        final boolean unsubscribe;
-
-        synchronized (lockObject){
-            if (closed) return ;
-            closed = true;
-
-            // This means that we drop all pending messages ...
-            // gc friendly.
-            pendingMessageList.clear();
-
-            unregister = registered;
-            this.registered = false;
-
-            unsubscribe = this.forceUnsubscribe;
-        }
-
-        if (unregister) this.session.unregisterTopicSubscriber(this);
-
-        // this.session.stopTopicDelivery(topic.getTopicName(), subscriberId);
-        if (unsubscribe) session.unsubscribeFromTopic(topic.getTopicName(), subscriberId);
-
-        // nothing else to be done ...
-    }
-
-    boolean enqueueReceivedMessage(SessionImpl.ReceivedMessage receivedMessage, final boolean addFirst) {
-        if (logger.isTraceEnabled())
-          logger.trace("Enqueing message " + receivedMessage + " to subscriber " + subscriberId +
-              " for topic " + topic.toString() + ", addFirst : " + addFirst);
-
-        String infoMsg = null;
-        String traceMsg = null;
-        synchronized (lockObject){
-            // ignore
-            if (closed) return false;
-            // If number of buffered messages > some max limit, evict them - else we run out of memory !
-            if (pendingMessageList.size() > SessionImpl.MAX_SUBSCRIBER_BUFFERED_MESSAGES) {
-                // simply discard it with an error logged.
-                infoMsg = "Discarding " + pendingMessageList.size() + " messages since there are no consumers for them";
-                pendingMessageList.clear();
-            }
-
-            // Note: Selector evaluation will happen in SessionImpl.
-            // if (!selectorMatched(receivedMessage)) return false;
-
-            if (addFirst) pendingMessageList.addFirst(receivedMessage);
-            else pendingMessageList.add(receivedMessage);
-
-            lockObject.notifyAll();
-            if (logger.isTraceEnabled()) traceMsg = "pendingMessageList (" + pendingMessageList.size() +
-                ") : \n" + pendingMessageList + "\n---\n next : " + pendingMessageList.getFirst();
-        }
-
-        if (null != infoMsg) logger.info(infoMsg);
-        if (logger.isTraceEnabled() && null != traceMsg) logger.trace(traceMsg);
-
-        return true;
-    }
-
-    public void start() {
-        try {
-            registerWithSession();
-        } catch (JMSException jEx){
-            // ignore.
-            DebugUtil.dumpJMSStacktrace(logger, jEx);
-        }
-    }
-}

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html
deleted file mode 100644
index fe6c1e1..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
-   Licensed to the Apache Software Foundation (ASF) under one or more
-   contributor license agreements.  See the NOTICE file distributed with
-   this work for additional information regarding copyright ownership.
-   The ASF licenses this file to You under the Apache License, Version 2.0
-   (the "License"); you may not use this file except in compliance with
-   the License.  You may obtain a copy of the License at
-
-       http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
--->
-
-
-Contains all the implementation which interacts directly with Hedwig (except for message parsing
-which is in message package). <br/>
-The does not, typically, adhere to JMS MT-constraints - and needs to be MT-safe : it can be invoked
-by underlying hedwig thread-pools and by client JMS invocations concurrently. <br/>
-
-Primarily provides :
-<ul>
-  <li>The HedwigConnectionImpl which is (by default) looked up via JNDI. This bootstraps access to rest of system.</li>
-  <li>The default MessagingSessionFacade implementation for Hedwig.</li>
-  <li>Associated implementations relevant to the classes exposed by the Facade - Topic handling
-    (no support for Queue yet), etc</li>
-</ul>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/protobuf/JmsHeader.proto
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/protobuf/JmsHeader.proto b/hedwig-client-jms/src/main/protobuf/JmsHeader.proto
deleted file mode 100644
index 2338587..0000000
--- a/hedwig-client-jms/src/main/protobuf/JmsHeader.proto
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *     http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.hedwig.jms.message.header";
-option optimize_for = SPEED;
-// change ?
-package Hedwig.Jms.Header;
-
-enum ProtocolVersion{
-    VERSION_ONE = 1;
-}
-
-message JmsValue {
-    enum ValueType {
-        BOOLEAN = 1;
-        BYTE = 2;
-        SHORT = 3;
-        INT = 4;
-        LONG = 5;
-        FLOAT = 6;
-        DOUBLE = 7;
-        STRING = 8;
-        // raw bytes. (custom correlation id, for example, uses this : though we dont support it right now).
-        BYTES = 9;
-    };
-
-    required ValueType type = 1;
-
-    optional bool booleanValue = 2;
-    optional sint32 byteValue = 3;
-    optional sint32 shortValue = 4;
-    optional sint32 intValue = 5;
-    optional sint64 longValue = 6;
-    optional float floatValue = 7;
-    optional double doubleValue = 8;
-    optional string stringValue = 9;
-    optional bytes bytesValue = 10;
-}
-
-

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/resources/findbugsExclude.xml b/hedwig-client-jms/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index bae9e09..0000000
--- a/hedwig-client-jms/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<!--
-    Licensed to the Apache Software Foundation (ASF) under one
-    or more contributor license agreements.  See the NOTICE file
-    distributed with this work for additional information
-    regarding copyright ownership.  The ASF licenses this file
-    to you under the Apache License, Version 2.0 (the
-    "License"); you may not use this file except in compliance
-    with the License.  You may obtain a copy of the License at
-
-      http://www.apache.org/licenses/LICENSE-2.0
-
-   Unless required by applicable law or agreed to in writing, software
-   distributed under the License is distributed on an "AS IS" BASIS,
-   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
-   See the License for the specific language governing permissions and
-   limitations under the License.
-//-->
-<FindBugsFilter>
-  <Match>
-    <!-- generated code, we can't be held responsible for findbugs in it //-->
-    <Or>
-      <Class name="~org\.apache\.hedwig\.jms\.message\.header\.JmsHeader.*" />
-      <Class name="~org\.apache\.hedwig\.jms\.selector\.SelectorParser.*" />
-      <Class name="~org\.apache\.hedwig\.jms\.selector\.SimpleCharStream.*" />
-      <Class name="~org\.apache\.hedwig\.jms\.selector\.ParseException.*" />
-      <Class name="~org\.apache\.hedwig\.jms\.selector\.SimpleNode.*" />
-      <Class name="~org\.apache\.hedwig\.jms\.selector\.TokenMgrError.*" />
-    </Or>
-  </Match>
-  <Match>
-    <Or>
-      <Class name="~org\.apache\.hedwig\.jms\.selector\.ValueComparisonFunction.*" />
-      <Class name="~org\.apache\.hedwig\.jms\.selector\.LogicalComparisonFunction.*" />
-    </Or>
-    <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
-  </Match>
-  <Match>
-    <Class name="~org\.apache\.hedwig\.jms\.selector\.PropertyExprFunction.*" />
-    <Bug pattern="BX_UNBOXING_IMMEDIATELY_REBOXED" />
-  </Match>
-  <Match>
-    <Class name="~org\.apache\.hedwig\.jms\.message\.MessageUtil" />
-    <Or>
-      <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
-      <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
-    </Or>
-  </Match>
-</FindBugsFilter>

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/resources/log4j.properties b/hedwig-client-jms/src/main/resources/log4j.properties
deleted file mode 100644
index 27d78f1..0000000
--- a/hedwig-client-jms/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements.  See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership.  The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License.  You may obtain a copy of the License at
-#
-#   http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied.  See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#
-
-# log4j.rootLogger=trace, CONSOLE
-# log4j.rootLogger=info, CONSOLE
-log4j.rootLogger=off, CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=off
-# log4j.appender.CONSOLE.Threshold=info
-# log4j.appender.CONSOLE.Threshold=trace
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-log4j.logger.org.apache=OFF
-# log4j.logger.org.apache=INFO
-# log4j.logger.org.apache=TRACE
-

http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java
deleted file mode 100644
index f4c0f7e..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements.  See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License.  You may obtain a copy of the License at
- *
- *      http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import junit.framework.TestCase;
-import org.apache.hedwig.JmsTestBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Enforces a test case to run for only an allotted time to prevent them from
- * hanging and breaking the whole testing.
- */
-
-public abstract class AutoFailTestSupport extends JmsTestBase {
-    public static final int EXIT_SUCCESS = 0;
-    public static final int EXIT_ERROR = 1;
-    private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class);
-
-    private long maxTestTime = 5 * 60 * 1000; // 5 mins by default
-    private Thread autoFailThread;
-
-    private boolean verbose = true;
-    private boolean useAutoFail; // Disable auto fail by default
-    private AtomicBoolean isTestSuccess;
-
-    protected void setUp() throws Exception {
-        // Runs the auto fail thread before performing any setup
-        if (isAutoFail()) {
-            startAutoFailThread();
-        }
-        super.setUp();
-    }
-
-    protected void tearDown() throws Exception {
-        super.tearDown();
-
-        // Stops the auto fail thread only after performing any clean up
-        stopAutoFailThread();
-    }
-
-    /**
-     * Manually start the auto fail thread. To start it automatically, just set
-     * the auto fail to true before calling any setup methods. As a rule, this
-     * method is used only when you are not sure, if the setUp and tearDown
-     * method is propagated correctly.
-     */
-    public void startAutoFailThread() {
-        setAutoFail(true);
-        isTestSuccess = new AtomicBoolean(false);
-        autoFailThread = new Thread(new Runnable() {
-            public void run() {
-                try {
-                    // Wait for test to finish succesfully
-                    Thread.sleep(getMaxTestTime());
-                } catch (InterruptedException e) {
-                    // This usually means the test was successful
-                } finally {
-                    // Check if the test was able to tear down succesfully,
-                    // which usually means, it has finished its run.
-                    if (!isTestSuccess.get()) {
-                        LOG.error("Test case has exceeded the maximum allotted time to run of: "
-                                  + getMaxTestTime() + " ms.");
-                        dumpAllThreads(getName());
-                        System.exit(EXIT_ERROR);
-                    }
-                }
-            }
-        }, "AutoFailThread");
-
-        if (verbose) {
-            LOG.info("Starting auto fail thread...");
-        }
-
-        LOG.info("Starting auto fail thread...");
-        autoFailThread.start();
-    }
-
-    /**
-     * Manually stops the auto fail thread. As a rule, this method is used only
-     * when you are not sure, if the setUp and tearDown method is propagated
-     * correctly.
-     */
-    public void stopAutoFailThread() {
-        if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) {
-            isTestSuccess.set(true);
-
-            if (verbose) {
-                LOG.info("Stopping auto fail thread...");
-            }
-
-            LOG.info("Stopping auto fail thread...");
-            autoFailThread.interrupt();
-        }
-    }
-
-    /**
-     * Sets the auto fail value. As a rule, this should be used only before any
-     * setup methods is called to automatically enable the auto fail thread in
-     * the setup method of the test case.
-     *
-     * @param val
-     */
-    public void setAutoFail(boolean val) {
-        this.useAutoFail = val;
-    }
-
-    public boolean isAutoFail() {
-        return this.useAutoFail;
-    }
-
-    /**
-     * The assigned value will only be reflected when the auto fail thread has
-     * started its run. Value is in milliseconds.
-     *
-     * @param val
-     */
-    public void setMaxTestTime(long val) {
-        this.maxTestTime = val;
-    }
-
-    public long getMaxTestTime() {
-        return this.maxTestTime;
-    }
-
-    public static void dumpAllThreads(String prefix) {
-        Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
-        for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
-            System.err.println(prefix + " " + stackEntry.getKey());
-            for(StackTraceElement element : stackEntry.getValue()) {
-                System.err.println("     " + element);
-            }
-        }
-    }
-}