You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:51 UTC
[41/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
deleted file mode 100644
index 4c23d5c..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/HedwigMessagingSessionFacade.java
+++ /dev/null
@@ -1,624 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import com.google.protobuf.ByteString;
-import org.apache.hedwig.client.HedwigClient;
-import org.apache.hedwig.client.api.MessageHandler;
-import org.apache.hedwig.client.api.Publisher;
-import org.apache.hedwig.client.conf.ClientConfiguration;
-import org.apache.hedwig.client.exceptions.AlreadyStartDeliveryException;
-import org.apache.hedwig.client.exceptions.InvalidSubscriberIdException;
-import org.apache.hedwig.exceptions.PubSubException;
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.DebugUtil;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.message.MessageUtil;
-import org.apache.hedwig.protocol.PubSubProtocol;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscriptionOptions;
-import org.apache.hedwig.protocol.PubSubProtocol.SubscribeRequest.CreateOrAttach;
-import org.apache.hedwig.util.Callback;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.Destination;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSubscriber;
-import java.util.HashSet;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.ListIterator;
-import java.util.Set;
-
-/**
- * Implementation of hedwig specific implementation. <br/>
- * JMS VIOLATION: This implementation creates a single backend hedwig connection PER session - and
- * DOES NOT share multiple sessoins on top of a single connection.
- * <p/>
- * This is a wilful violation of JMS specification, but exists only because Hedwig does not have
- * any notion to support this. <br/>
- * Once hedwig does allow for session multiplexing, we will need to revisit this (or create a new impl)
- * to take into account the changes.
- *
- */
-public class HedwigMessagingSessionFacade implements MessagingSessionFacade, MessageHandler {
-
- private static final Logger logger = LoggerFactory.getLogger(HedwigMessagingSessionFacade.class);
-
- // We simulate noLocal through the connection - which will be shared across sessions.
- private final HedwigConnectionImpl connection;
- private final SessionImpl session;
- private HedwigClient hedwigClient;
- private volatile boolean stopped = false;
-
- /*
- Hedwig server has a ack-until-N approach to acknoledgements : that is, if we acknowledge message N,
- all previous N-1 message are also
- acknowledged.
- But hedwig-client DOES NOT support this : particularly in context of throttling.
-
- So, when we are in CLIENT_ACKNOWLEDGE mode and NOT in transacted session, I am modifying the behavior
- to mirror expectation of both
- hedwig client and server here in SessionImpl itself (instead of facade where this probably belong better).
-
- This approach does not seem to work fine due to implicit assumptions in hedwig client ... I am
- modifying it in following way :
- a) For each message receieved, maintain it in List.
- b) Acknowledging a message means traversing this list to find message with same seq-id : and
- acknowledge ALL message until that in the list.
- Since hedwig does ack until, inctead of individual ack, this violation of JMS spec is consistent with hedwig.
- Note that even though hedwig does ack until, hedwig client on other hand DOES NOT ! It will
- throttle connection if we do not ack individually ...
- sigh :-(
- */
- private final List<SessionImpl.ReceivedMessage> unAckMessageList = new LinkedList<SessionImpl.ReceivedMessage>();
-
- // Both of these synchronized on deliveryStartInfoSet.
- private final Set<DeliveryStartInfo> deliveryStartInfoSet = new HashSet<DeliveryStartInfo>(32);
- private final Set<DeliveryStartInfo> subscribeInfoSet = new HashSet<DeliveryStartInfo>(32);
-
- private static final class DeliveryStartInfo {
- private final String topicName;
- private final String subscriberId;
-
- private DeliveryStartInfo(String subscriberId, String topicName) {
- this.subscriberId = subscriberId;
- this.topicName = topicName;
- }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- DeliveryStartInfo that = (DeliveryStartInfo) o;
-
- if (subscriberId != null ? !subscriberId.equals(that.subscriberId) : that.subscriberId != null)
- return false;
- if (topicName != null ? !topicName.equals(that.topicName) : that.topicName != null) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = topicName != null ? topicName.hashCode() : 0;
- result = 31 * result + (subscriberId != null ? subscriberId.hashCode() : 0);
- return result;
- }
- }
-
-
- public HedwigMessagingSessionFacade(HedwigConnectionImpl connection, SessionImpl session) throws JMSException {
- this.connection = connection;
- this.session = session;
- // always create client ...
- final ClientConfiguration cfg = connection.getHedwigClientConfig();
- if (null == cfg) throw new JMSException("Unable to fetch client config ?");
- this.hedwigClient = new HedwigClient(cfg);
- resetStartInfoSet();
- }
-
- @Override
- public void start() throws JMSException {
- if (!connection.isInStartMode()) throw new JMSException("Connection not yet started ?");
- if (logger.isTraceEnabled()) logger.trace("Creating HedwigClient");
- // create only if there is need for it.
- if (null == this.hedwigClient) {
- this.hedwigClient = new HedwigClient(connection.getHedwigClientConfig());
- resetStartInfoSet();
- }
- this.stopped = false;
- }
-
- @Override
- public void stop() {
- // stopping does not inhibit send.
- if (logger.isTraceEnabled()) logger.trace("Stopping HedwigClient");
- /*
- HedwigClient client = this.hedwigClient;
- this.hedwigClient = null;
- client.close();
- */
- this.stopped = true;
- }
-
-
- @Override
- public void close() {
- HedwigClient client = this.hedwigClient;
- resetStartInfoSet();
-
- this.stopped = true;
- this.hedwigClient = null;
- if (logger.isTraceEnabled()) logger.trace("Closing HedwigClient");
- client.close();
- }
-
- private void resetStartInfoSet(){
- synchronized (deliveryStartInfoSet){
- deliveryStartInfoSet.clear();
- subscribeInfoSet.clear();
- }
- }
-
- @Override
- public DestinationType findDestinationType(String destination) throws JMSException {
- // TODO: For now, we support ONLY topic's, so always returning that.
- return DestinationType.TOPIC;
- }
-
- @Override
- public DestinationType findDestinationType(Destination destination) throws JMSException {
- if (destination instanceof Topic) return DestinationType.TOPIC;
- if (destination instanceof Queue) return DestinationType.QUEUE;
-
- // TODO: For now, we support ONLY topic's, so always returning that when unknown.
- return DestinationType.TOPIC;
- }
-
- @Override
- public TopicPublisher createTopicPublisher(Destination destination) throws JMSException {
- return new TopicPublisherImpl(this, session, null != destination ?
- session.createTopic(session.toName(destination)) : null);
- }
-
- @Override
- public TopicSubscriber createTopicSubscriber(Destination destination) throws JMSException {
- session.subscriberCreated();
- connection.initConnectionClientID();
- return new TopicSubscriberImpl(session, session.createTopic(session.toName(destination)),
- session.createSubscriberId(SessionImpl.generateRandomString()), true);
- }
-
- @Override
- public TopicSubscriber createTopicSubscriber(Destination destination,
- String messageSelector, boolean noLocal) throws JMSException {
- session.subscriberCreated();
- connection.initConnectionClientID();
- return new TopicSubscriberImpl(session,
- session.createTopic(session.toName(destination)),
- session.createSubscriberId(SessionImpl.generateRandomString()), messageSelector, noLocal, true);
- }
-
- @Override
- public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId) throws JMSException {
- if (null != session.getMessageListener()) {
- throw new JMSException("Message listener is set - not other form of message receipt can be used");
- }
- session.subscriberCreated();
-
- TopicSubscriberImpl subscriber = new TopicSubscriberImpl(session, topic, subscribedId, false);
- subscriber.start();
- return subscriber;
- }
-
- @Override
- public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId,
- String messageSelector, boolean noLocal) throws JMSException {
- if (null != session.getMessageListener()) {
- throw new JMSException("Message listener is set - not other form of message receipt can be used");
- }
- session.subscriberCreated();
- connection.initConnectionClientID();
-
- return new TopicSubscriberImpl(session, topic, subscribedId, messageSelector, noLocal, false);
- }
-
- /*
- @Override
- public void unsubscribe(String subscriberId) throws JMSException {
- throw new JMSException("Hedwig requires BOTH topic name and subscriberId to unsubscribe -
- unlike JMS. Need to figure this out.");
- }
- */
-
- // Note: order SENSITIVE !!
- @Override
- public void registerUnAcknowledgedMessage(SessionImpl.ReceivedMessage message) {
- synchronized (unAckMessageList){
- unAckMessageList.add(message);
- }
- }
-
- @Override
- // public void acknowledge(String topicName, String subscriberId, String jmsMessageID)
- public void acknowledge(MessageImpl message) throws JMSException {
- if (this.stopped || null == hedwigClient)
- throw new javax.jms.IllegalStateException("session in stopped or closed state, cant acknowledge message");
-
- /*
- This approach does not seem to work fine due to implicit assumptions in hedwig client ...
- I am modifying it in following way :
- a) For each message receieved, maintain it in List.
- b) Acknowledging a message means traversing this list to find message with same seq-id :
- and acknowledge ALL message until that in the list.
- Since hedwig does ack until, inctead of individual ack, this violation of JMS spec is consistent with hedwig.
- Note that even though hedwig does ack until, hedwig client on other hand DOES NOT ! It will
- throttle connection if we do not ack individually ...
- sigh :-(
- */
- // sendAcknowledge(topicName, subscriberId, seqId);
-
- LinkedList<SessionImpl.ReceivedMessage> ackList = new LinkedList<SessionImpl.ReceivedMessage>();
- synchronized (unAckMessageList){
- // Should I simply copy and release ?
- ListIterator<SessionImpl.ReceivedMessage> iter = unAckMessageList.listIterator();
-
- boolean found = false;
- while (iter.hasNext()){
- if (iter.next().originalMessage.getServerJmsMessageId().equals(message.getServerJmsMessageId())){
- found = true;
- break;
- }
- }
-
- // probably already acknowledged ?
- if (!found) return ;
- while (iter.hasPrevious()){
- ackList.addFirst(iter.previous());
- iter.remove();
- }
- }
-
- // Now acknowledge the messages in ackList by running its runnable.
- if (logger.isTraceEnabled()) {
- logger.trace("facade acknowledge ackList (" + ackList.size() + ") ... " + ackList);
- }
- for (SessionImpl.ReceivedMessage msg : ackList){
- try {
- msg.originalMessage.getAckRunnable().run();
- } catch (Exception ex){
- // Ignore any exception thrown.
- if (logger.isDebugEnabled()) {
- logger.debug("Ignoring exception thrown while acknowledging messages", ex);
- }
- }
- }
-
- }
-
- private void sendAcknowledge(String topicName, String subscriberId, PubSubProtocol.MessageSeqId seqId)
- throws JMSException {
-
- if (logger.isTraceEnabled()) logger.trace("Acknowledging " +
- MessageUtil.generateJMSMessageIdFromSeqId(seqId) + " for " + topicName + " by " + subscriberId);
- try {
- hedwigClient.getSubscriber().consume(ByteString.copyFromUtf8(topicName),
- ByteString.copyFromUtf8(subscriberId), seqId);
- } catch (PubSubException.ClientNotSubscribedException e) {
- JMSException jEx = new JMSException("Client not subscribed .. " + e);
- jEx.setLinkedException(e);
- throw jEx;
- }
- }
-
-
- public void subscribeToTopic(String topicName, String subscribedId) throws JMSException {
- if (null == hedwigClient)
- throw new javax.jms.IllegalStateException("session in closed state, cant subscribe to topic " + topicName);
-
- final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
- final boolean start;
- synchronized (deliveryStartInfoSet){
- start = ! subscribeInfoSet.contains(info);
-
- if (start) {
- subscribeInfoSet.add(info);
- }
- }
-
- if (! start) {
- if (logger.isDebugEnabled()) logger.debug("Client already subscribed ?");
- return ;
- }
-
- try {
- SubscriptionOptions opts = SubscriptionOptions.newBuilder()
- .setCreateOrAttach(CreateOrAttach.CREATE_OR_ATTACH).build();
- hedwigClient.getSubscriber().subscribe(ByteString.copyFromUtf8(topicName),
- ByteString.copyFromUtf8(subscribedId), opts);
- } catch (PubSubException.CouldNotConnectException e) {
- JMSException je = new JMSException("receive failed, could not connect .. " + e);
- je.setLinkedException(e);
- throw je;
- } catch (PubSubException.ClientAlreadySubscribedException e) {
- JMSException je = new JMSException("receive failed, already subscribed .. " + e);
- je.setLinkedException(e);
- throw je;
- } catch (PubSubException.ServiceDownException e) {
- JMSException je = new JMSException("receive failed, hedwig service down .. " + e);
- je.setLinkedException(e);
- throw je;
- } catch (InvalidSubscriberIdException e) {
- JMSException je = new JMSException("receive failed, invalid subscriber .. " + e);
- je.setLinkedException(e);
- throw je;
- }
- }
-
- public void unsubscribeFromTopic(String topicName, String subscribedId) throws JMSException {
- if (null == hedwigClient)
- throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
-
- // Also implies removal of delivery, right ?
- final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
- synchronized (deliveryStartInfoSet){
- deliveryStartInfoSet.remove(info);
- subscribeInfoSet.remove(info);
- }
-
- try {
- hedwigClient.getSubscriber().unsubscribe(ByteString.copyFromUtf8(topicName),
- ByteString.copyFromUtf8(subscribedId));
- } catch (PubSubException.CouldNotConnectException e) {
- JMSException je = new JMSException("receive failed, could not connect .. " + e);
- je.setLinkedException(e);
- throw je;
- } catch (PubSubException.ServiceDownException e) {
- JMSException je = new JMSException("receive failed, hedwig service down .. " + e);
- je.setLinkedException(e);
- throw je;
- } catch (InvalidSubscriberIdException e) {
- JMSException je = new JMSException("receive failed, invalid subscriber .. " + e);
- je.setLinkedException(e);
- throw je;
- } catch (PubSubException.ClientNotSubscribedException e) {
- JMSException je = new JMSException("receive failed, client not subscribed .. " + e);
- je.setLinkedException(e);
- throw je;
- }
- }
-
- public void stopTopicDelivery(String topicName, String subscribedId) throws JMSException {
- if (null == hedwigClient)
- throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
-
- DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
- synchronized (deliveryStartInfoSet){
- deliveryStartInfoSet.remove(info);
- }
-
- try {
- hedwigClient.getSubscriber().stopDelivery(ByteString.copyFromUtf8(topicName),
- ByteString.copyFromUtf8(subscribedId));
- } catch (PubSubException.ClientNotSubscribedException e) {
- if (logger.isTraceEnabled()) logger.trace("Client not subscribed or already unsubscribed ? ", e);
- }
- }
-
- public void startTopicDelivery(String topicName, String subscribedId) throws JMSException {
- if (null == hedwigClient)
- throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
-
- final DeliveryStartInfo info = new DeliveryStartInfo(topicName, subscribedId);
- final boolean start;
- synchronized (deliveryStartInfoSet){
- start = ! deliveryStartInfoSet.contains(info);
-
- if (start) {
- deliveryStartInfoSet.add(info);
- }
- }
-
- if (! start) {
- if (logger.isDebugEnabled()) logger.debug("Client already started delivery ?");
- return ;
- }
-
- try {
- if (logger.isTraceEnabled()) logger.trace("Start topic delivery for " + topicName +
- ", subscriberId " + subscribedId);
- hedwigClient.getSubscriber().startDelivery(ByteString.copyFromUtf8(topicName),
- ByteString.copyFromUtf8(subscribedId), this);
- if (logger.isTraceEnabled()) logger.trace("Start topic delivery for " + topicName +
- ", subscriberId " + subscribedId + " DONE");
- } catch (PubSubException.ClientNotSubscribedException e) {
- if (logger.isDebugEnabled()) logger.debug("Client not subscribed or already unsubscribed ? ", e);
- } catch (AlreadyStartDeliveryException e) {
- if (logger.isDebugEnabled()) logger.debug("Client already started delivery ? ", e);
- }
- }
-
- @Override
- public void deliver(ByteString topic, ByteString subscriberId, PubSubProtocol.Message msg,
- final Callback<Void> callback, final Object context) {
- // Deliver the message to the session.
-
- if (this.stopped) {
- if (logger.isDebugEnabled()) logger.debug("Ignoring message while in stopped mode .. topic - " +
- topic.toStringUtf8() + ", subscriber - " + subscriberId.toStringUtf8() + ", msg - " + msg);
- return ;
- }
-
- if (logger.isTraceEnabled()) logger.trace("recieved message from server : topic - " +
- topic.toStringUtf8() + ", subscriber - " + subscriberId.toStringUtf8() + ", msg - " + msg);
-
- // I am assuming that we can defer the acknowledgement of the message ...
- final String topicName = topic.toStringUtf8();
- final String sid = subscriberId.toStringUtf8();
- final PubSubProtocol.MessageSeqId seqId = msg.getMsgId();
- final Runnable ack = new Runnable(){
- public void run() {
- callback.operationFinished(context, null);
- // Only when auto-send is NOT enabled.
- if (! connection.getHedwigClientConfig().isAutoSendConsumeMessageEnabled()) {
- try {
- sendAcknowledge(topicName, sid, seqId);
- } catch (JMSException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Unable to send acknowledgement ... " + topicName + ", " +
- sid + ", seqId : " + seqId);
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- }
- };
-
- try {
- if (logger.isTraceEnabled()) logger.trace("Pushing to session " + session);
-
- MessageImpl messageImpl = MessageUtil.processHedwigMessage(session, msg, topicName, sid, ack);
- session.messageReceived(messageImpl, DestinationType.TOPIC);
- } catch (JMSException e) {
- // Unable to process the incoming message - log and ignore ?
- if (logger.isDebugEnabled()) {
- logger.debug("Unable to consume message");
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
-
- public String getSubscriberId(TopicSubscriber topicSubscriber) throws JMSException {
- if (! (topicSubscriber instanceof TopicSubscriberImpl) )
- throw new JMSException("TopicSubscriber not instanceof of TopicSubscriberImpl ? " +
- topicSubscriber.getClass());
-
- return ((TopicSubscriberImpl) topicSubscriber).getSubscriberId();
- }
-
- @Override
- public boolean enqueueReceivedMessage(MessageConsumer messageConsumer, SessionImpl.ReceivedMessage receivedMessage,
- boolean addFirst) throws JMSException {
- if (! (messageConsumer instanceof TopicSubscriberImpl) )
- throw new JMSException("TopicSubscriber not instanceof of TopicSubscriberImpl ? " +
- messageConsumer.getClass());
-
- return ((TopicSubscriberImpl) messageConsumer).enqueueReceivedMessage(receivedMessage, addFirst);
- }
-
- public Publisher getPublisher() throws javax.jms.IllegalStateException {
- if (null == hedwigClient)
- throw new javax.jms.IllegalStateException("session in closed state, cant acknowledge message");
- return hedwigClient.getPublisher();
- }
-
- public String publish(String topicName, MessageImpl message) throws JMSException {
- try {
- PubSubProtocol.PublishResponse response = getPublisher().publish(
- ByteString.copyFromUtf8(topicName), message.generateHedwigMessage());
- PubSubProtocol.MessageSeqId seqId =
- (null != response && response.hasPublishedMsgId() ? response.getPublishedMsgId() : null);
- if (null == seqId){
- // if (logger.isDebugEnabled())
- // logger.debug("Unexpected NOT to receive the sequence id in response to publish " + response);
- logger.warn("Unexpected NOT to receive the sequence id in response to publish " + response);
- return null;
- }
-
- return MessageUtil.generateJMSMessageIdFromSeqId(seqId);
- } catch (PubSubException.CouldNotConnectException e) {
- JMSException jmsEx = new JMSException("Cant publish to " + topicName + " .. " + e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- } catch (PubSubException.ServiceDownException e) {
- JMSException jmsEx = new JMSException("Cant publish to " + topicName + " .. " + e);
- jmsEx.setLinkedException(e);
- throw jmsEx;
- }
- }
-
- // Queue methods which are NOT supported yet.
- @Override
- public QueueSender createQueueSender(Destination destination) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public QueueReceiver createQueueReceiver(Destination destination) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public QueueReceiver createQueueReceiver(Destination destination, String messageSelector) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public QueueReceiver createQueueReceiver(Destination destination, String messageSelector,
- boolean noLocal) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public String getSubscriberId(QueueReceiver queueReceiver) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public void stopQueueDelivery(String queueName, String subscribedId) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public void startQueueDelivery(String queueName, String subscriberId) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public QueueBrowser createBrowser(Queue queue) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public TemporaryTopic createTemporaryTopic() throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
- @Override
- public TemporaryQueue createTemporaryQueue() throws JMSException {
- throw new JMSException("hedwig does not support queues yet");
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java
deleted file mode 100644
index a13e259..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageConsumerImpl.java
+++ /dev/null
@@ -1,74 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.selector.Node;
-import org.apache.hedwig.jms.selector.ParseException;
-import org.apache.hedwig.jms.selector.SelectorParser;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-
-/**
- * Base class for consumers ...
- */
-public abstract class MessageConsumerImpl implements MessageConsumer {
- private final String messageSelector;
- private final Node selectorAst;
- // volatile to prevent need to lock and ensure visibility of mods across threads.
- private volatile MessageListener messageListener;
-
- protected MessageConsumerImpl(String msgSelector) throws InvalidSelectorException {
- {
- msgSelector = null != msgSelector ? msgSelector.trim() : null;
- this.messageSelector = (null == msgSelector || 0 == msgSelector.length()) ?
- null : msgSelector;
- }
- try {
- this.selectorAst = null == this.messageSelector ?
- null : SelectorParser.parseMessageSelector(this.messageSelector);
- } catch (ParseException pEx) {
- InvalidSelectorException jmsEx =
- new InvalidSelectorException("Unable to parse selector '" + this.messageSelector + "'");
- jmsEx.setLinkedException(pEx);
- throw jmsEx;
- }
- }
-
- @Override
- public String getMessageSelector() {
- return messageSelector;
- }
-
- public Node getSelectorAst() {
- return selectorAst;
- }
-
- @Override
- public MessageListener getMessageListener() {
- return messageListener;
- }
-
- @Override
- public void setMessageListener(MessageListener messageListener) throws JMSException {
- this.messageListener = messageListener;
- }
-
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java
deleted file mode 100644
index caf4b3e..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/MessageProducerImpl.java
+++ /dev/null
@@ -1,117 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.SessionImpl;
-
-import javax.jms.DeliveryMode;
-import javax.jms.JMSException;
-import javax.jms.MessageProducer;
-
-/**
- *
- */
-public abstract class MessageProducerImpl implements MessageProducer {
-
- static final int DEFAULT_PRIORITY = 4;
-
- private final SessionImpl session;
-
- // We dont really use this - since we always populate message-id : found in response of publish.
- private boolean disableMessageID = false;
- // We can support this, but dont - will overly complicate some aspects of the code : deferring for now
- // (we will need to pass this around along all failure paths).
- private boolean disableMessageTimestamp = false;
- // Hedwig supports only PERSISTENT mode, so setting to anytihng else will just cause it to be ignored.
- private int deliveryMode = DeliveryMode.PERSISTENT;
- // Hedwig does not support priorities, so everything is at default priority !
- // this does not influence actual message delivery.
- private int defaultPriority = DEFAULT_PRIORITY;
- // Hedwig does not support TTL (iirc), so we allow setting/querying this, but it has no
- // actual impact on the message delivery/expiry.
- private long timeToLive = 0;
-
- protected MessageProducerImpl(SessionImpl session) {
- this.session = session;
- }
-
- @Override
- public void setDisableMessageID(boolean disableMessageID) throws JMSException {
- this.disableMessageID = disableMessageID;
- }
-
- @Override
- public boolean getDisableMessageID() throws JMSException {
- return disableMessageID;
- }
-
- protected SessionImpl getSession() {
- return session;
- }
-
-
- @Override
- public void setDisableMessageTimestamp(boolean disableMessageTimestamp) throws JMSException {
- this.disableMessageTimestamp = disableMessageTimestamp;
- }
-
- @Override
- public boolean getDisableMessageTimestamp() throws JMSException {
- return disableMessageTimestamp;
- }
-
- @Override
- public void setDeliveryMode(int deliveryMode) throws JMSException {
- if (DeliveryMode.NON_PERSISTENT != deliveryMode &&
- DeliveryMode.PERSISTENT != deliveryMode) {
- throw new JMSException("Invalid delivery mode specified : " + deliveryMode);
- }
-
- // if (DeliveryMode.NON_PERSISTENT == deliveryMode)
- // throw new JMSException("non-persistent delivery mode is not yet supported");
- this.deliveryMode = deliveryMode;
- }
-
- @Override
- public int getDeliveryMode() throws JMSException {
- return deliveryMode;
- }
-
-
- @Override
- public void setPriority(int defaultPriority) throws JMSException {
- // Not supported, we simply allow it to be set and retrieved ...
- this.defaultPriority = defaultPriority;
- }
-
- @Override
- public int getPriority() throws JMSException {
- return defaultPriority;
- }
-
-
- @Override
- public void setTimeToLive(long timeToLive) throws JMSException {
- this.timeToLive = timeToLive;
- }
-
- @Override
- public long getTimeToLive() throws JMSException {
- return timeToLive;
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java
deleted file mode 100644
index 2beeea7..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/QueueSessionImpl.java
+++ /dev/null
@@ -1,84 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.ConnectionImpl;
-import org.apache.hedwig.jms.MessagingSessionFacade;
-import org.apache.hedwig.jms.SessionImpl;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.QueueSession;
-import javax.jms.TemporaryTopic;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-
-/**
- * Queue specific impl
- */
-public class QueueSessionImpl extends SessionImpl implements QueueSession {
-
- public QueueSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException {
- super(connection, transacted, acknowledgeMode);
- }
-
- @Override
- public QueueReceiver createReceiver(Queue queue) throws JMSException {
- return super.createReceiverImpl(queue);
- }
-
- @Override
- public QueueReceiver createReceiver(Queue queue, String messageSelector) throws JMSException {
- return super.createReceiverImpl(queue, messageSelector);
- }
-
- @Override
- public QueueSender createSender(Queue queue) throws JMSException {
- return super.createSenderImpl(queue);
- }
-
- // JMS requires these methods cant be called on QueueSession.
- @Override
- public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId) throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
- }
-
- @Override
- public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId, String messageSelector,
- boolean noLocal) throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
- }
-
- @Override
- public TemporaryTopic createTemporaryTopic() throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
- }
-
- @Override
- public void unsubscribe(String subscribedId) throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
- }
-
- @Override
- public Topic createTopic(String topicName) throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on QueueSession");
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java
deleted file mode 100644
index 23dfb54..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicPublisherImpl.java
+++ /dev/null
@@ -1,226 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.message.MessageUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.DeliveryMode;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-
-/**
- *
- */
-public class TopicPublisherImpl extends MessageProducerImpl implements TopicPublisher {
-
- private static final Logger logger = LoggerFactory.getLogger(TopicPublisherImpl.class);
-
- private final HedwigMessagingSessionFacade facade;
- private final Topic topic;
-
- public TopicPublisherImpl(HedwigMessagingSessionFacade facade, SessionImpl session, Topic topic) {
- super(session);
- this.facade = facade;
- this.topic = topic;
- }
-
- @Override
- public Topic getTopic() throws JMSException {
- return topic;
- }
-
- @Override
- public void publish(Message message) throws JMSException {
- if (null == getTopic()) throw new UnsupportedOperationException("Need to specify topic");
- publish(getTopic(), message);
- }
-
- @Override
- public void publish(Topic topic, Message message) throws JMSException {
- publish(topic, message, getDeliveryMode(), getPriority(), getTimeToLive());
- }
-
- @Override
- public void publish(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
- if (null == getTopic()) throw new UnsupportedOperationException("Need to specify topic");
- publish(getTopic(), message, deliveryMode, priority, timeToLive);
- }
-
- // all publish/send methods delegate to this ...
- @Override
- public void publish(final Topic topic, final Message message, final int deliveryMode,
- final int priority, final long timeToLive) throws JMSException {
-
- // Simulating this in provider ...
- // if (0 != timeToLive) throw new JMSException("We do not support TTL for messages right now.
- // Specified TTL : " + timeToLive);
-
- if (MessageProducerImpl.DEFAULT_PRIORITY != priority) {
- if (logger.isInfoEnabled())
- logger.info("We do not support message priorities right now. Specified priority : " + priority);
- }
- if (DeliveryMode.PERSISTENT != deliveryMode) {
- if (logger.isInfoEnabled())
- logger.info("We support only PERSISTENT delivery mode. Unsupported mode : " + deliveryMode);
- }
-
- if (null == topic){
- throw new InvalidDestinationException("Topic must be specified to publish");
- }
-
- final MessageImpl copiedMessageImpl;
- if (message instanceof MessageImpl) copiedMessageImpl = MessageUtil.createCloneForDispatch(
- getSession(), (MessageImpl) message, topic.getTopicName(), null);
- else copiedMessageImpl = MessageUtil.createMessageCopy(getSession(), message);
-
- // Note: Ensure that we set properties below on both message (user input) and copiedMessageImpl
- // (the cloned/copied message).
- // We are doing set on both instead of set followed by close/copy to prevent cases where message
- // implementation drops
- // headers (like our own impl earlier !)
-
- // priority ...
- {
- // Set the message priority
- // 3.4.10 JMSPriority "When a message is sent, this field is ignored. After completion of
- // the send, it holds the value specified by the method sending the message."
- // On other hand, we have
- // 3.4.12 Overriding Message Header Fields : "JMS permits an administrator to configure
- // JMS to override the client-specified
- // values for JMSDeliveryMode, JMSExpiration and JMSPriority. If this is done, the header
- // field value must reflect the
- // administratively specified value."
- // For now, to unblock testcases, setting to msgPriority :-) Actually, I think we should
- // set it to Message.DEFAULT_PRIORITY ...
- message.setJMSPriority(priority);
- copiedMessageImpl.setJMSPriority(priority);
- // message.setJMSPriority(Message.DEFAULT_PRIORITY);
- // copiedMessageImpl.setJMSPriority(Message.DEFAULT_PRIORITY);
- }
-
- // delivery mode ...
- {
-
- // 3.4.2 JMSDeliveryMode "The JMSDeliveryMode header field contains the delivery mode
- // specified when the message was sent.
- // When a message is sent, this field is ignored. After completion of the send, it holds
- // the delivery mode specified by the sending method."
- message.setJMSDeliveryMode(deliveryMode);
- copiedMessageImpl.setJMSDeliveryMode(deliveryMode);
- }
-
- // destination ...
- {
- // 3.4.1 JMSDestination "The JMSDestination header field contains the destination to which
- // the message is being sent.
- // When a message is sent, this field is ignored. After completion of the send, it holds
- // the destination object
- // specified by the sending method. When a message is received, its destination value
- // must be equivalent to the
- // value assigned when it was sent."
- message.setJMSDestination(getSession().createTopic(topic.getTopicName()));
- copiedMessageImpl.setJMSDestination(getSession().createTopic(topic.getTopicName()));
- }
-
- {
- // 3.4.4 JMSTimestamp
- // "The JMSTimestamp header field contains the time a message was handed off to a provider to be sent.
- // It is not the time the message was actually transmitted because the actual send may occur later
- // due to transactions or other client side queueing of messages."
- final long timestamp = SessionImpl.currentTimeMillis();
- message.setJMSTimestamp(timestamp);
- copiedMessageImpl.setJMSTimestamp(timestamp);
- }
-
- if (timeToLive > 0) {
- final long expiryTime = SessionImpl.currentTimeMillis() + timeToLive;
- message.setJMSExpiration(expiryTime);
- copiedMessageImpl.setJMSExpiration(expiryTime);
- }
- else {
- // no expiry.
- message.setJMSExpiration(0);
- }
-
-
- if (getSession().getTransacted()){
- // enqueue if within transactions.
- getSession().enqueuePublishWithinTransaction(topic.getTopicName(), copiedMessageImpl, message);
- return ;
- }
-
- if (logger.isTraceEnabled()) logger.trace("Publishing message ... recepient " + topic.getTopicName());
- // facade.getPublisher().publish(ByteString.copyFromUtf8(topic.getTopicName()),
- // copiedMessageImpl.generateHedwigMessage(this));
- String msgId = facade.publish(topic.getTopicName(), copiedMessageImpl);
- getSession().addToLocallyPublishedMessageIds(msgId);
- if (message instanceof MessageImpl) ((MessageImpl) message).setJMSMessageIDInternal(msgId);
- else message.setJMSMessageID(msgId);
-
- if (logger.isTraceEnabled()) logger.trace("Publishing message ... recepient " +
- topic.getTopicName() + ", msgId : " + msgId + " DONE");
-
- // This is not required, we already do this as part of copiedMessageImpl.generateHedwigMessage()
- // message.setJMSTimestamp(SessionImpl.currentTimeMillis());
-
- }
-
- @Override
- public Destination getDestination() throws JMSException {
- return topic;
- }
-
- @Override
- public void close() throws JMSException {
- // This will be a noop actually ... session.close() takes care of closing the publisher.
- }
-
- @Override
- public void send(Message message) throws JMSException {
- publish(message);
- }
-
- @Override
- public void send(Destination destination, Message message) throws JMSException {
- if (!(destination instanceof Topic))
- throw new JMSException("Expected destination to be a Topic : " + destination);
- publish((Topic) destination, message);
- }
-
- @Override
- public void send(Message message, int deliveryMode, int priority, long timeToLive) throws JMSException {
- publish(message, deliveryMode, priority, timeToLive);
- }
-
- @Override
- public void send(Destination destination, Message message, int deliveryMode,
- int priority, long timeToLive) throws JMSException {
- if (!(destination instanceof Topic))
- throw new JMSException("Expected destination to be a Topic : " + destination);
-
- publish((Topic) destination, message, deliveryMode, priority, timeToLive);
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java
deleted file mode 100644
index e96f998..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSessionImpl.java
+++ /dev/null
@@ -1,76 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.ConnectionImpl;
-import org.apache.hedwig.jms.SessionImpl;
-
-import javax.jms.JMSException;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.TemporaryQueue;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSession;
-import javax.jms.TopicSubscriber;
-
-/**
- * Topic specific impl
- */
-public class TopicSessionImpl extends SessionImpl implements TopicSession {
-
- public TopicSessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException {
- super(connection, transacted, acknowledgeMode);
- }
-
- @Override
- public TopicSubscriber createSubscriber(Topic topic) throws JMSException {
- return super.createSubscriberImpl(topic);
- }
-
- @Override
- public TopicSubscriber createSubscriber(Topic topic, String messageSelector, boolean noLocal) throws JMSException {
- return super.createSubscriberImpl(topic, messageSelector, noLocal);
- }
-
- @Override
- public TopicPublisher createPublisher(Topic topic) throws JMSException {
- return super.createPublisherImpl(topic);
- }
-
- @Override
- public TemporaryQueue createTemporaryQueue() throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
- }
-
- @Override
- public Queue createQueue(String queueName) throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
- }
-
- @Override
- public QueueBrowser createBrowser(Queue queue) throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
- }
-
- @Override
- public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
- throw new javax.jms.IllegalStateException("Cant call this method on TopicSession");
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java
deleted file mode 100644
index 4a51e8d..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/TopicSubscriberImpl.java
+++ /dev/null
@@ -1,323 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms.spi;
-
-import org.apache.hedwig.jms.SessionImpl;
-import org.apache.hedwig.jms.DebugUtil;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.InvalidSelectorException;
-import javax.jms.JMSException;
-import javax.jms.Message;
-import javax.jms.MessageListener;
-import javax.jms.Topic;
-import javax.jms.TopicSubscriber;
-import java.util.ArrayList;
-import java.util.LinkedList;
-import java.util.List;
-
-/**
- * Subscriber to a topic.
- *
- */
-public class TopicSubscriberImpl extends MessageConsumerImpl implements TopicSubscriber {
-
- private static final Logger logger = LoggerFactory.getLogger(TopicSubscriberImpl.class);
-
- private final SessionImpl session;
- private final Topic topic;
- private final String subscriberId;
- private final boolean noLocal;
-
- private final boolean forceUnsubscribe;
- private volatile boolean registered = false;
- private boolean closed = false;
-
- // Any publically exposed object MUST NOT rely on 'this' for its locking semantics unless it is
- // explicitly exposing this behavior.
- private final Object lockObject = new Object();
- private final LinkedList<SessionImpl.ReceivedMessage> pendingMessageList
- = new LinkedList<SessionImpl.ReceivedMessage>();
-
- public TopicSubscriberImpl(SessionImpl session, Topic topic, String subscriberId,
- boolean forceUnsubscribe) throws JMSException {
- super(null);
- this.session = session;
- this.topic = topic;
- this.subscriberId = subscriberId;
- // default is false right ?
- this.noLocal = false;
- this.forceUnsubscribe = forceUnsubscribe;
-
- // I am not sure if we have to register with session immediately on create or not ...
- registerWithSession();
- }
-
- public TopicSubscriberImpl(SessionImpl session, Topic topic, String subscriberId,
- String messageSelector, boolean noLocal, boolean forceUnsubscribe) throws JMSException {
- super(messageSelector);
- this.session = session;
- this.topic = topic;
- this.subscriberId = subscriberId;
-
- this.noLocal = noLocal;
- this.forceUnsubscribe = forceUnsubscribe;
-
- if (null == getSelectorAst()){
- // Only if NOT empty string - treat empty string as null selector spec.
- if (null != messageSelector && 0 != messageSelector.trim().length()){
- throw new InvalidSelectorException("Invalid selector specified '" + messageSelector + "'");
- }
- }
- else {
- session.registerTopicSubscriptionInfo(new SessionImpl.TopicSubscription(topic.getTopicName(),
- subscriberId), getSelectorAst());
- }
-
- // I am not sure if we have to register with session immediately on create or not ...
- registerWithSession();
- }
-
- @Override
- public Topic getTopic() {
- return topic;
- }
-
- @Override
- public boolean getNoLocal() {
- return noLocal;
- }
-
- public String getSubscriberId() {
- return subscriberId;
- }
-
- @Override
- public void setMessageListener(MessageListener messageListener) throws JMSException {
- super.setMessageListener(messageListener);
- registerWithSession();
- }
-
- private void registerWithSession() throws JMSException {
-
- // Fail fast ... volatile perf hit is ok in comparison to rest.
- if (this.registered) return ;
-
- final boolean register;
- synchronized (lockObject){
- // if (closed) throw new JMSException("Already closed");
- if (closed) return ;
-
- if (!this.registered) {
- this.registered = true;
- register = true;
- }
- else register = false;
- }
- if (register) this.session.registerTopicSubscriber(this);
- }
-
- @Override
- public Message receive() throws JMSException {
- return receive(0);
- }
-
-
- @Override
- public Message receive(final long maxTimeout) throws JMSException {
- return receiveImpl(maxTimeout, true);
- }
-
- private Message receiveImpl(final long maxTimeout, boolean canWait) throws JMSException {
- final long waitTimeout;
- final long startTime;
-
- // periodically wake up !
- if (canWait){
- if (maxTimeout <= 0) waitTimeout = 1000;
- else {
- long duration = maxTimeout / 16;
- if (duration <= 0) duration = 1;
- waitTimeout = duration;
- }
- startTime = SessionImpl.currentTimeMillis();
- }
- else {
- waitTimeout = 0;
- startTime = 0;
- }
-
- registerWithSession();
-
- // check before lock ...
- if (null != getMessageListener()) {
- throw new javax.jms.IllegalStateException(
- "There is a message listener already subscribed for this subscriber");
- }
-
- final SessionImpl.ReceivedMessage message;
- final List<SessionImpl.ReceivedMessage> ackList = new ArrayList<SessionImpl.ReceivedMessage>(4);
-
- synchronized (lockObject){
-
-outer:
- while (true) {
-
- // Should we ignore cached messages instead of this ?
- // Once closed, wont help much anyway, right ?
- if (closed) {
- message = null;
- break outer;
- }
-
- // While we waited, it could have been set.
- if (null != getMessageListener()) {
- throw new javax.jms.IllegalStateException(
- "There is a message listener already subscribed for this subscriber");
- }
-
- while (canWait && pendingMessageList.isEmpty()){
-
- // Should we ignore cached messages instead of this ?
- // Once closed, wont help much anyway, right ?
- if (closed) {
- message = null;
- break outer;
- }
-
- if (0 != maxTimeout && startTime + maxTimeout < SessionImpl.currentTimeMillis()) {
- message = null;
- break outer;
- }
-
- try {
- lockObject.wait(waitTimeout);
- } catch (InterruptedException iEx){
- JMSException jEx = new JMSException("Interrupted .. " + iEx);
- jEx.setLinkedException(iEx);
- throw jEx;
- }
- }
-
-
- if (pendingMessageList.isEmpty()) {
- message = null;
- break outer;
- }
- SessionImpl.ReceivedMessage tmessage = pendingMessageList.remove();
- ackList.add(tmessage);
-
- if (noLocal){
- if (session.isLocallyPublished(tmessage.originalMessage.getJMSMessageID())){
- // find next message.
- continue;
- }
- }
- if (session.isMessageExpired(tmessage.originalMessage)) continue;
- // use this message then.
- message = tmessage;
- break;
- }
- }
-
- if (logger.isTraceEnabled()) logger.trace("Acklist receive (" + ackList.size() + ") ... " + ackList);
- for (SessionImpl.ReceivedMessage ackMessage : ackList){
- session.handleAutomaticMessageAcknowledgement(ackMessage, this);
- }
-
- if (logger.isTraceEnabled()) logger.trace("receive response " + (null != message ? message.msg : null));
- return null != message ? message.msg : null;
- }
-
- @Override
- public Message receiveNoWait() throws JMSException {
- return receiveImpl(0, false);
- }
-
- @Override
- public void close() throws JMSException {
-
- final boolean unregister;
- final boolean unsubscribe;
-
- synchronized (lockObject){
- if (closed) return ;
- closed = true;
-
- // This means that we drop all pending messages ...
- // gc friendly.
- pendingMessageList.clear();
-
- unregister = registered;
- this.registered = false;
-
- unsubscribe = this.forceUnsubscribe;
- }
-
- if (unregister) this.session.unregisterTopicSubscriber(this);
-
- // this.session.stopTopicDelivery(topic.getTopicName(), subscriberId);
- if (unsubscribe) session.unsubscribeFromTopic(topic.getTopicName(), subscriberId);
-
- // nothing else to be done ...
- }
-
- boolean enqueueReceivedMessage(SessionImpl.ReceivedMessage receivedMessage, final boolean addFirst) {
- if (logger.isTraceEnabled())
- logger.trace("Enqueing message " + receivedMessage + " to subscriber " + subscriberId +
- " for topic " + topic.toString() + ", addFirst : " + addFirst);
-
- String infoMsg = null;
- String traceMsg = null;
- synchronized (lockObject){
- // ignore
- if (closed) return false;
- // If number of buffered messages > some max limit, evict them - else we run out of memory !
- if (pendingMessageList.size() > SessionImpl.MAX_SUBSCRIBER_BUFFERED_MESSAGES) {
- // simply discard it with an error logged.
- infoMsg = "Discarding " + pendingMessageList.size() + " messages since there are no consumers for them";
- pendingMessageList.clear();
- }
-
- // Note: Selector evaluation will happen in SessionImpl.
- // if (!selectorMatched(receivedMessage)) return false;
-
- if (addFirst) pendingMessageList.addFirst(receivedMessage);
- else pendingMessageList.add(receivedMessage);
-
- lockObject.notifyAll();
- if (logger.isTraceEnabled()) traceMsg = "pendingMessageList (" + pendingMessageList.size() +
- ") : \n" + pendingMessageList + "\n---\n next : " + pendingMessageList.getFirst();
- }
-
- if (null != infoMsg) logger.info(infoMsg);
- if (logger.isTraceEnabled() && null != traceMsg) logger.trace(traceMsg);
-
- return true;
- }
-
- public void start() {
- try {
- registerWithSession();
- } catch (JMSException jEx){
- // ignore.
- DebugUtil.dumpJMSStacktrace(logger, jEx);
- }
- }
-}
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html
deleted file mode 100644
index fe6c1e1..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/spi/package-info.html
+++ /dev/null
@@ -1,30 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one or more
- contributor license agreements. See the NOTICE file distributed with
- this work for additional information regarding copyright ownership.
- The ASF licenses this file to You under the Apache License, Version 2.0
- (the "License"); you may not use this file except in compliance with
- the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
--->
-
-
-Contains all the implementation which interacts directly with Hedwig (except for message parsing
-which is in message package). <br/>
-The does not, typically, adhere to JMS MT-constraints - and needs to be MT-safe : it can be invoked
-by underlying hedwig thread-pools and by client JMS invocations concurrently. <br/>
-
-Primarily provides :
-<ul>
- <li>The HedwigConnectionImpl which is (by default) looked up via JNDI. This bootstraps access to rest of system.</li>
- <li>The default MessagingSessionFacade implementation for Hedwig.</li>
- <li>Associated implementations relevant to the classes exposed by the Facade - Topic handling
- (no support for Queue yet), etc</li>
-</ul>
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/protobuf/JmsHeader.proto
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/protobuf/JmsHeader.proto b/hedwig-client-jms/src/main/protobuf/JmsHeader.proto
deleted file mode 100644
index 2338587..0000000
--- a/hedwig-client-jms/src/main/protobuf/JmsHeader.proto
+++ /dev/null
@@ -1,54 +0,0 @@
-/*
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-
-option java_package = "org.apache.hedwig.jms.message.header";
-option optimize_for = SPEED;
-// change ?
-package Hedwig.Jms.Header;
-
-enum ProtocolVersion{
- VERSION_ONE = 1;
-}
-
-message JmsValue {
- enum ValueType {
- BOOLEAN = 1;
- BYTE = 2;
- SHORT = 3;
- INT = 4;
- LONG = 5;
- FLOAT = 6;
- DOUBLE = 7;
- STRING = 8;
- // raw bytes. (custom correlation id, for example, uses this : though we dont support it right now).
- BYTES = 9;
- };
-
- required ValueType type = 1;
-
- optional bool booleanValue = 2;
- optional sint32 byteValue = 3;
- optional sint32 shortValue = 4;
- optional sint32 intValue = 5;
- optional sint64 longValue = 6;
- optional float floatValue = 7;
- optional double doubleValue = 8;
- optional string stringValue = 9;
- optional bytes bytesValue = 10;
-}
-
-
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/resources/findbugsExclude.xml
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/resources/findbugsExclude.xml b/hedwig-client-jms/src/main/resources/findbugsExclude.xml
deleted file mode 100644
index bae9e09..0000000
--- a/hedwig-client-jms/src/main/resources/findbugsExclude.xml
+++ /dev/null
@@ -1,48 +0,0 @@
-<!--
- Licensed to the Apache Software Foundation (ASF) under one
- or more contributor license agreements. See the NOTICE file
- distributed with this work for additional information
- regarding copyright ownership. The ASF licenses this file
- to you under the Apache License, Version 2.0 (the
- "License"); you may not use this file except in compliance
- with the License. You may obtain a copy of the License at
-
- http://www.apache.org/licenses/LICENSE-2.0
-
- Unless required by applicable law or agreed to in writing, software
- distributed under the License is distributed on an "AS IS" BASIS,
- WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- See the License for the specific language governing permissions and
- limitations under the License.
-//-->
-<FindBugsFilter>
- <Match>
- <!-- generated code, we can't be held responsible for findbugs in it //-->
- <Or>
- <Class name="~org\.apache\.hedwig\.jms\.message\.header\.JmsHeader.*" />
- <Class name="~org\.apache\.hedwig\.jms\.selector\.SelectorParser.*" />
- <Class name="~org\.apache\.hedwig\.jms\.selector\.SimpleCharStream.*" />
- <Class name="~org\.apache\.hedwig\.jms\.selector\.ParseException.*" />
- <Class name="~org\.apache\.hedwig\.jms\.selector\.SimpleNode.*" />
- <Class name="~org\.apache\.hedwig\.jms\.selector\.TokenMgrError.*" />
- </Or>
- </Match>
- <Match>
- <Or>
- <Class name="~org\.apache\.hedwig\.jms\.selector\.ValueComparisonFunction.*" />
- <Class name="~org\.apache\.hedwig\.jms\.selector\.LogicalComparisonFunction.*" />
- </Or>
- <Bug pattern="NP_BOOLEAN_RETURN_NULL" />
- </Match>
- <Match>
- <Class name="~org\.apache\.hedwig\.jms\.selector\.PropertyExprFunction.*" />
- <Bug pattern="BX_UNBOXING_IMMEDIATELY_REBOXED" />
- </Match>
- <Match>
- <Class name="~org\.apache\.hedwig\.jms\.message\.MessageUtil" />
- <Or>
- <Bug pattern="NP_LOAD_OF_KNOWN_NULL_VALUE" />
- <Bug pattern="NP_NULL_PARAM_DEREF_NONVIRTUAL" />
- </Or>
- </Match>
-</FindBugsFilter>
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/resources/log4j.properties
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/resources/log4j.properties b/hedwig-client-jms/src/main/resources/log4j.properties
deleted file mode 100644
index 27d78f1..0000000
--- a/hedwig-client-jms/src/main/resources/log4j.properties
+++ /dev/null
@@ -1,35 +0,0 @@
-#
-#
-# Licensed to the Apache Software Foundation (ASF) under one
-# or more contributor license agreements. See the NOTICE file
-# distributed with this work for additional information
-# regarding copyright ownership. The ASF licenses this file
-# to you under the Apache License, Version 2.0 (the
-# "License"); you may not use this file except in compliance
-# with the License. You may obtain a copy of the License at
-#
-# http://www.apache.org/licenses/LICENSE-2.0
-#
-# Unless required by applicable law or agreed to in writing,
-# software distributed under the License is distributed on an
-# "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY
-# KIND, either express or implied. See the License for the
-# specific language governing permissions and limitations
-# under the License.
-#
-#
-
-# log4j.rootLogger=trace, CONSOLE
-# log4j.rootLogger=info, CONSOLE
-log4j.rootLogger=off, CONSOLE
-
-log4j.appender.CONSOLE=org.apache.log4j.ConsoleAppender
-log4j.appender.CONSOLE.Threshold=off
-# log4j.appender.CONSOLE.Threshold=info
-# log4j.appender.CONSOLE.Threshold=trace
-log4j.appender.CONSOLE.layout=org.apache.log4j.PatternLayout
-log4j.appender.CONSOLE.layout.ConversionPattern=%d{ISO8601} - %-5p - [%t:%C{1}@%L] - %m%n
-log4j.logger.org.apache=OFF
-# log4j.logger.org.apache=INFO
-# log4j.logger.org.apache=TRACE
-
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java b/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java
deleted file mode 100644
index f4c0f7e..0000000
--- a/hedwig-client-jms/src/test/java/org/apache/activemq/AutoFailTestSupport.java
+++ /dev/null
@@ -1,153 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one or more
- * contributor license agreements. See the NOTICE file distributed with
- * this work for additional information regarding copyright ownership.
- * The ASF licenses this file to You under the Apache License, Version 2.0
- * (the "License"); you may not use this file except in compliance with
- * the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.activemq;
-
-import java.util.Map;
-import java.util.Map.Entry;
-import java.util.concurrent.atomic.AtomicBoolean;
-
-import junit.framework.TestCase;
-import org.apache.hedwig.JmsTestBase;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-/**
- * Enforces a test case to run for only an allotted time to prevent them from
- * hanging and breaking the whole testing.
- */
-
-public abstract class AutoFailTestSupport extends JmsTestBase {
- public static final int EXIT_SUCCESS = 0;
- public static final int EXIT_ERROR = 1;
- private static final Logger LOG = LoggerFactory.getLogger(AutoFailTestSupport.class);
-
- private long maxTestTime = 5 * 60 * 1000; // 5 mins by default
- private Thread autoFailThread;
-
- private boolean verbose = true;
- private boolean useAutoFail; // Disable auto fail by default
- private AtomicBoolean isTestSuccess;
-
- protected void setUp() throws Exception {
- // Runs the auto fail thread before performing any setup
- if (isAutoFail()) {
- startAutoFailThread();
- }
- super.setUp();
- }
-
- protected void tearDown() throws Exception {
- super.tearDown();
-
- // Stops the auto fail thread only after performing any clean up
- stopAutoFailThread();
- }
-
- /**
- * Manually start the auto fail thread. To start it automatically, just set
- * the auto fail to true before calling any setup methods. As a rule, this
- * method is used only when you are not sure, if the setUp and tearDown
- * method is propagated correctly.
- */
- public void startAutoFailThread() {
- setAutoFail(true);
- isTestSuccess = new AtomicBoolean(false);
- autoFailThread = new Thread(new Runnable() {
- public void run() {
- try {
- // Wait for test to finish succesfully
- Thread.sleep(getMaxTestTime());
- } catch (InterruptedException e) {
- // This usually means the test was successful
- } finally {
- // Check if the test was able to tear down succesfully,
- // which usually means, it has finished its run.
- if (!isTestSuccess.get()) {
- LOG.error("Test case has exceeded the maximum allotted time to run of: "
- + getMaxTestTime() + " ms.");
- dumpAllThreads(getName());
- System.exit(EXIT_ERROR);
- }
- }
- }
- }, "AutoFailThread");
-
- if (verbose) {
- LOG.info("Starting auto fail thread...");
- }
-
- LOG.info("Starting auto fail thread...");
- autoFailThread.start();
- }
-
- /**
- * Manually stops the auto fail thread. As a rule, this method is used only
- * when you are not sure, if the setUp and tearDown method is propagated
- * correctly.
- */
- public void stopAutoFailThread() {
- if (isAutoFail() && autoFailThread != null && autoFailThread.isAlive()) {
- isTestSuccess.set(true);
-
- if (verbose) {
- LOG.info("Stopping auto fail thread...");
- }
-
- LOG.info("Stopping auto fail thread...");
- autoFailThread.interrupt();
- }
- }
-
- /**
- * Sets the auto fail value. As a rule, this should be used only before any
- * setup methods is called to automatically enable the auto fail thread in
- * the setup method of the test case.
- *
- * @param val
- */
- public void setAutoFail(boolean val) {
- this.useAutoFail = val;
- }
-
- public boolean isAutoFail() {
- return this.useAutoFail;
- }
-
- /**
- * The assigned value will only be reflected when the auto fail thread has
- * started its run. Value is in milliseconds.
- *
- * @param val
- */
- public void setMaxTestTime(long val) {
- this.maxTestTime = val;
- }
-
- public long getMaxTestTime() {
- return this.maxTestTime;
- }
-
- public static void dumpAllThreads(String prefix) {
- Map<Thread, StackTraceElement[]> stacks = Thread.getAllStackTraces();
- for (Entry<Thread, StackTraceElement[]> stackEntry : stacks.entrySet()) {
- System.err.println(prefix + " " + stackEntry.getKey());
- for(StackTraceElement element : stackEntry.getValue()) {
- System.err.println(" " + element);
- }
- }
- }
-}