You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@bookkeeper.apache.org by si...@apache.org on 2016/03/16 04:44:56 UTC
[46/49] bookkeeper git commit: BOOKKEEPER-769: Remove the Hedwig Code
http://git-wip-us.apache.org/repos/asf/bookkeeper/blob/9a8d62b1/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/SessionImpl.java
----------------------------------------------------------------------
diff --git a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/SessionImpl.java b/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/SessionImpl.java
deleted file mode 100644
index f86d955..0000000
--- a/hedwig-client-jms/src/main/java/org/apache/hedwig/jms/SessionImpl.java
+++ /dev/null
@@ -1,2009 +0,0 @@
-/**
- * Licensed to the Apache Software Foundation (ASF) under one
- * or more contributor license agreements. See the NOTICE file
- * distributed with this work for additional information
- * regarding copyright ownership. The ASF licenses this file
- * to you under the Apache License, Version 2.0 (the
- * "License"); you may not use this file except in compliance
- * with the License. You may obtain a copy of the License at
- *
- * http://www.apache.org/licenses/LICENSE-2.0
- *
- * Unless required by applicable law or agreed to in writing, software
- * distributed under the License is distributed on an "AS IS" BASIS,
- * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
- * See the License for the specific language governing permissions and
- * limitations under the License.
- */
-package org.apache.hedwig.jms;
-
-import org.apache.hedwig.jms.message.BytesMessageImpl;
-import org.apache.hedwig.jms.message.MapMessageImpl;
-import org.apache.hedwig.jms.message.MessageImpl;
-import org.apache.hedwig.jms.message.MessageUtil;
-import org.apache.hedwig.jms.message.ObjectMessageImpl;
-import org.apache.hedwig.jms.message.StreamMessageImpl;
-import org.apache.hedwig.jms.message.TextMessageImpl;
-import org.apache.hedwig.jms.selector.Node;
-import org.apache.hedwig.jms.selector.SelectorParser;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import javax.jms.BytesMessage;
-import javax.jms.Destination;
-import javax.jms.InvalidDestinationException;
-import javax.jms.JMSException;
-import javax.jms.MapMessage;
-import javax.jms.Message;
-import javax.jms.MessageConsumer;
-import javax.jms.MessageListener;
-import javax.jms.MessageProducer;
-import javax.jms.ObjectMessage;
-import javax.jms.Queue;
-import javax.jms.QueueBrowser;
-import javax.jms.QueueReceiver;
-import javax.jms.QueueSender;
-import javax.jms.Session;
-import javax.jms.StreamMessage;
-import javax.jms.TemporaryQueue;
-import javax.jms.TemporaryTopic;
-import javax.jms.TextMessage;
-import javax.jms.Topic;
-import javax.jms.TopicPublisher;
-import javax.jms.TopicSubscriber;
-import javax.jms.TransactionRolledBackException;
-import java.io.Serializable;
-import java.util.ArrayList;
-import java.util.Collections;
-import java.util.HashMap;
-import java.util.IdentityHashMap;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Map;
-import java.util.Set;
-import java.util.UUID;
-import java.util.concurrent.ConcurrentHashMap;
-import java.util.concurrent.CopyOnWriteArrayList;
-import java.util.concurrent.atomic.AtomicInteger;
-
-/**
- * Implementation of jms Session.
- * IS NOT MT-safe (2.8) - except for close()
- *
- * We are yet to support/implement this - must pass a flag through constructor on "how" this object
- * was created and use that to throw exception...
- *
- */
-public class SessionImpl implements Session {
-
-
- private final static Logger logger = LoggerFactory.getLogger(SessionImpl.class);
-
- // 8k, too high ?
- public static final int MAX_SESSION_BUFFERED_MESSAGES =
- Integer.getInteger("Session.MAX_BUFFERED_MESSAGES", 1024 * 8);
- // 0.5k too low/high ?
- public static final int MAX_SUBSCRIBER_BUFFERED_MESSAGES =
- Integer.getInteger("Session.MAX_SUBSCRIBER_BUFFERED_MESSAGES", 512);
-
- // Number of attempts to retry and see if a transaction keeps getting rolled back as part
- // of async delivery of messages.
- public static final int RETRY_DISPATCH_TO_TRANSACTION_ATTEMPTS =
- Integer.getInteger("Session.RETRY_DISPATCH_TO_TRANSACTION_ATTEMPTS", 9);
-
- private final boolean transacted;
- private final int acknowledgeMode;
- private volatile MessageListener messageListener = null;
-
- private final ConnectionImpl connection;
- private final MessagingSessionFacade sessionFacade;
-
- private final Object lockObject = new Object();
- // Message processing locks on this object itself - everything else on lockObject. This is to
- // prevent interactions with hedwig threading idioms.
- // messageList is a leaf in call graph - so it must not cause MT interactions with other locks
- // acquired prior to it.
- private final List<ReceivedMessage> messageList = new LinkedList<ReceivedMessage>();
- private final List<TransactedReceiveOperation> rolledbackMessageList
- = new LinkedList<TransactedReceiveOperation>();
-
- private StateManager sessionState = new StateManager(StateManager.State.STOPPED, lockObject);
-
- // Simply encapsulating all state within a single class.
- private final Subscriptions subscriptions = new Subscriptions();
-
- private boolean messageListenerThreadStarted = false;
- private final Thread messageListenerThread;
- private boolean messageListenerThreadFinished = false;
-
- public SessionImpl(ConnectionImpl connection, boolean transacted, int acknowledgeMode) throws JMSException {
- if (Session.AUTO_ACKNOWLEDGE != acknowledgeMode &&
- Session.CLIENT_ACKNOWLEDGE != acknowledgeMode &&
- Session.DUPS_OK_ACKNOWLEDGE != acknowledgeMode){
- // On;y if not transacted !
- if (!transacted){
- throw new javax.jms.IllegalStateException("Unknown/unsupported acknowledgeMode specified : " +
- acknowledgeMode);
- }
- }
- this.transacted = transacted;
- this.acknowledgeMode = acknowledgeMode;
- this.connection = connection;
- this.sessionFacade = connection.createMessagingSessionFacade(this);
- this.messageListenerThread = new Thread(this, "JMS message listener thread");
- // not daemon, right ?
- this.messageListenerThread.setDaemon(false);
- }
-
- @Override
- public BytesMessage createBytesMessage() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new BytesMessageImpl(this);
- }
-
- @Override
- public MapMessage createMapMessage() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new MapMessageImpl(this);
- }
-
- @Override
- public Message createMessage() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new MessageImpl(this);
- }
-
- @Override
- public ObjectMessage createObjectMessage() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new ObjectMessageImpl(this, null);
- }
-
- @Override
- public ObjectMessage createObjectMessage(Serializable serializable) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new ObjectMessageImpl(this, serializable);
- }
-
- @Override
- public StreamMessage createStreamMessage() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new StreamMessageImpl(this);
- }
-
- @Override
- public TextMessage createTextMessage() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new TextMessageImpl(this);
- }
-
- @Override
- public TextMessage createTextMessage(String payload) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- return new TextMessageImpl(this, payload);
- }
-
- @Override
- public boolean getTransacted() {
- return transacted;
- }
-
- @Override
- public int getAcknowledgeMode() {
- return acknowledgeMode;
- }
-
- @Override
- public void commit() throws JMSException {
- // Apparently, we can send even if connection is not open ?
- // if (!sessionState.isStarted()) throw new javax.jms.IllegalStateException("Session not open");
- if (!getTransacted()) throw new javax.jms.IllegalStateException("Session not transacted");
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- commitTransactionState();
- }
-
- @Override
- public void rollback() throws JMSException {
- if (!sessionState.isStarted()) throw new javax.jms.IllegalStateException("Session not open");
- if (!getTransacted()) throw new javax.jms.IllegalStateException("Session not transacted");
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- rollbackTransactionState();
- }
-
- void start() throws JMSException {
- final StateManager.State prevState;
- final Map<Subscription, CopyOnWriteArrayList<MessageConsumer>> subscriptionToSubscriberMapCopy;
-
- if (logger.isTraceEnabled()) logger.trace("Attempting to start session");
-
- synchronized (lockObject){
- // Do not throw exception, it might be connection starting while another thread might
- // be doing a close() - there is a
- // potential race there !
- // if (isClosed()) throw new javax.jms.IllegalStateException("Already closed");
- if (isClosed()) return ;
- if (sessionState.isStarted()) return ;
-
- if (sessionState.isTransitionState()){
- sessionState.waitForTransientStateChange(StateManager.WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE, logger);
- // Not expected actually, present to guard against future changes ...
- if (sessionState.isTransitionState())
- throw new JMSException("Connection did not make state change to steady state ?");
-
- if (isClosed()) throw new javax.jms.IllegalStateException("Already closed");
- if (sessionState.isStarted()) return ;
-
- assert sessionState.isStopped();
- // try again ...
- }
-
- prevState = sessionState.getCurrentState();
- sessionState.setCurrentState(StateManager.State.STARTING);
-
- // Copy to prevent concurrent mod exceptions - is it required here ? Not sure ...
- subscriptionToSubscriberMapCopy = subscriptions.createSubscriptionToSubscriberMapCopy();
- }
-
- StateManager.State nextState = prevState;
-
- try {
- rollbackTransactionState();
- // Note: this part of the code IS thread-safe for our private state.
-
- // Validate state - in terms of listener's, etc : we are relying on the single thread semantics of JMS
- // to NOT do any complex locking, etc.
-
- if (null != getMessageListener()){
- // There CANNOT be any subscriber with listeners registered.
- for (MessageConsumer consumer : subscriptions.getAllConsumersSet()){
- if (null != consumer.getMessageListener()) {
- throw new JMSException("Session's message listener is already set - " +
- "cannot have a consumer with listener also set.");
- }
- }
- }
-
-
- sessionFacade.start();
-
- if (logger.isTraceEnabled()) logger.trace("Starting " + subscriptionToSubscriberMapCopy.size() +
- " subscribers");
-
- // Subscribe to all the subscriberId's
- for (Map.Entry<Subscription, CopyOnWriteArrayList<MessageConsumer>> entry :
- subscriptionToSubscriberMapCopy.entrySet()){
-
- if (entry.getValue().isEmpty()) continue;
- if (entry.getKey().isTopic()){
- TopicSubscription topicSubscription = (TopicSubscription) entry.getKey();
- try {
- sessionFacade.subscribeToTopic(topicSubscription.topicName, topicSubscription.subscriberId);
- } catch (JMSException e) {
- // Log and ignore
- // This CAN fail, it is ok to fail !
- if (logger.isDebugEnabled()) {
- logger.debug("(Potentially Benign error) Error subscribing from topic for entry : " +
- topicSubscription);
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- try {
- sessionFacade.startTopicDelivery(topicSubscription.topicName,
- topicSubscription.subscriberId);
- } catch (JMSException e) {
- // Log and ignore
- if (logger.isDebugEnabled()) {
- logger.debug("Error starting topic delivery for entry : " + entry.getKey());
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- else {
- assert entry.getKey().isQueue();
-
- QueueSubscription queueSubscription = (QueueSubscription) entry.getKey();
-
- // There is no notion like subscription to queue, right ?
- /*
- try {
- sessionFacade.subscribeToQueue(queueSubscription.queueName,
- queueSubscription.subscriberId);
- } catch (JMSException e) {
- // Log and ignore
- // This CAN fail, it is ok to fail !
- if (logger.isDebugEnabled()) {
- logger.debug("(Potentially Benign error) Error subscribing from queue for entry : " +
- queueSubscription);
- Util.dumpJMSStacktrace(logger, e);
- }
- }
- */
- try {
- sessionFacade.startQueueDelivery(queueSubscription.queueName, queueSubscription.subscriberId);
- } catch (JMSException e) {
- // Log and ignore
- if (logger.isDebugEnabled()) {
- logger.debug("Error starting queue delivery for entry : " + entry.getKey());
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- }
- nextState = StateManager.State.STARTED;
- } finally {
- // set status and notify.
- synchronized (lockObject){
- lockObject.notifyAll();
- sessionState.setCurrentState(nextState);
- }
- }
- }
-
- void stop() throws JMSException {
- final StateManager.State prevState;
- final Map<Subscription, CopyOnWriteArrayList<MessageConsumer>> subscriptionToSubscriberMapCopy;
-
- if (logger.isTraceEnabled()) logger.trace("Attempting to stop connection");
-
- synchronized (lockObject){
- if (isClosed()) throw new javax.jms.IllegalStateException("Already closed");
- if (sessionState.isStopped()) return ;
-
- if (sessionState.isTransitionState()){
- sessionState.waitForTransientStateChange(StateManager.WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE, logger);
- // Not expected actually, present to guard against future changes ...
- if (sessionState.isTransitionState())
- throw new JMSException("Connection did not make state change to steady state ?");
-
- if (isClosed()) throw new javax.jms.IllegalStateException("Already closed");
- if (sessionState.isStopped()) return ;
-
- assert sessionState.isStarted();
-
- // try again ...
- }
-
- prevState = sessionState.getCurrentState();
- sessionState.setCurrentState(StateManager.State.STOPPING);
-
- // Copy to prevent concurrent mod exceptions.
- subscriptionToSubscriberMapCopy = subscriptions.createSubscriptionToSubscriberMapCopy();
- }
-
- StateManager.State nextState = prevState;
- try {
- rollbackTransactionState();
- // In case there are other things to be done ...
-
- // Unsubscribe to all the subscriberId's
- if (logger.isTraceEnabled()) logger.trace("Stopping " +
- subscriptionToSubscriberMapCopy.size() + " subscribers");
- for (Map.Entry<Subscription, CopyOnWriteArrayList<MessageConsumer>> entry :
- subscriptionToSubscriberMapCopy.entrySet()){
-
- if (entry.getValue().isEmpty()) continue;
- if (entry.getKey().isTopic()){
- TopicSubscription topicSubscription = (TopicSubscription) entry.getKey();
- try {
- stopTopicDelivery(topicSubscription.topicName, topicSubscription.subscriberId);
- } catch (JMSException e) {
- // Log and ignore
- if (logger.isDebugEnabled()) {
- logger.debug("Error unsubscribing from topic for entry : " + topicSubscription);
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- else {
- assert entry.getKey().isQueue();
-
- QueueSubscription queueSubscription = (QueueSubscription) entry.getKey();
- try {
- stopQueueDelivery(queueSubscription.queueName, queueSubscription.subscriberId);
- } catch (JMSException e) {
- // Log and ignore
- if (logger.isDebugEnabled()) {
- logger.debug("Error unsubscribing from queue for entry : " + queueSubscription);
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- }
- // stop facade AFTER subscriber's are stopped.
- sessionFacade.stop();
- nextState = StateManager.State.STOPPED;
- } finally {
- synchronized (lockObject){
- lockObject.notifyAll();
- sessionState.setCurrentState(nextState);
- }
- }
- }
-
-
- /**
- *
- * Closes the session. <br/>
- * Since a provider may allocate some resources on behalf of a session outside the JVM, clients
- * should close the resources
- * when they are not needed. Relying on garbage collection to eventually reclaim these resources
- * may not be timely enough.<br/>
- * <p/>
- * There is no need to close the producers and consumers of a closed session.
- * <p/>
- *
- *
- * A blocked message consumer receive call returns null when this session is closed.
- * <p/>
- * Closing a transacted session must roll back the transaction in progress.<br/>
- * This method is the only Session method that can be called concurrently.<br/>
- * Invoking any other Session method on a closed session must throw a JMSException.IllegalStateException.<br/>
- * Closing a closed session must not throw an exception.<br/>
- *
- */
- private static final ThreadLocal<Boolean> closeFromWithinListener = new ThreadLocal<Boolean>(){
- @Override
- protected Boolean initialValue() {
- return false;
- }
- };
-
- @Override
- public void close() throws JMSException {
- final StateManager.State prevState;
- final Set<MessageConsumer> subscriberSetCopy = Collections.newSetFromMap(
- new IdentityHashMap<MessageConsumer, Boolean>());
-
- if (logger.isTraceEnabled()) logger.trace("Attempting to close session");
-
- synchronized (lockObject){
- if (isClosed()) return ;
- if (! sessionState.isStopped()) {
- if (sessionState.isTransitionState()){
- sessionState.waitForTransientStateChange(StateManager.WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE, logger);
- // Not expected actually, present to guard against future changes ...
- if (sessionState.isTransitionState())
- throw new JMSException("Connection did not make state change to steady state ?");
-
- if (isClosed()) return ;
-
- assert sessionState.isStarted() || sessionState.isStopped();
- }
- }
-
- prevState = sessionState.getCurrentState();
- sessionState.setCurrentState(StateManager.State.CLOSING);
- // Copy to prevent concurrent mod exceptions.
- subscriberSetCopy.addAll(subscriptions.getAllConsumersSet());
- }
-
- StateManager.State nextState = prevState;
-
- try {
- rollbackTransactionState();
- connection.removeSession(this);
-
- // Close all publishers - doing this within synchronized block to prevent any possibility
- // of race conditions.
- // Potentially expensive, but it is a tradeoff between correctness and performance :-(
- if (logger.isTraceEnabled()) logger.trace("Closing " + subscriberSetCopy.size() + " subscribers");
- for (MessageConsumer subscriber : subscriberSetCopy){
- try {
- subscriber.close();
- } catch (JMSException e) {
- // Log and ignore
- if (logger.isDebugEnabled()) {
- logger.debug("Error unsubscribing from destination for entry : " + subscriber);
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- sessionFacade.close();
- nextState = StateManager.State.CLOSED;
- } finally {
-
- // set status and notify.
- synchronized (lockObject){
- lockObject.notifyAll();
- sessionState.setCurrentState(nextState);
- }
- }
-
- if (logger.isTraceEnabled()) logger.trace(this + "Waiting for messageListenerThreadStarted " +
- messageListenerThreadStarted + ", messageListenerThreadFinished " + messageListenerThreadFinished);
-
- // spin on messageListenerThreadFinished
- // spin ONLY if we are NOT within the listener already !
- if (! closeFromWithinListener.get()){
- synchronized (lockObject){
- long waitTime = 100;
- long retryCount = StateManager.WAIT_TIME_FOR_TRANSIENT_STATE_CHANGE / waitTime;
- if (messageListenerThreadStarted) {
- while (!messageListenerThreadFinished) {
- try {
- lockObject.wait(waitTime);
- } catch (InterruptedException e) {
- // ignore ...
- if (logger.isDebugEnabled()) logger.debug("interrupted ?", e);
- }
- retryCount --;
- // Fail if we have waiting long enough ... hardcoded for now.
- if (retryCount <= 0) break;
- }
- }
- }
- }
-
- if (logger.isTraceEnabled()) logger.trace("Waiting for messageListenerThreadFinished " +
- messageListenerThreadFinished + " DONE");
- }
-
- @Override
- public void recover() throws JMSException {
- // Typically will be in stopped state.
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (getTransacted())
- throw new javax.jms.IllegalStateException("cannot invoke recover in transacted session.");
- throw new JMSException("recovery : TODO");
- }
-
- @Override
- public MessageListener getMessageListener() {
- return messageListener;
- }
-
- @Override
- public void setMessageListener(final MessageListener messageListener) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- // Explicitly forbidding it for now : leads to too many complexities otherwise.
- // start session ONLY AFTER you have set the listener.
- if (messageListener != this.messageListener && sessionState.isInStartMode()) {
- throw new JMSException("Set the message listener BEFORE starting session (and/or connection)");
- }
-
- this.messageListener = messageListener;
- }
-
- @Override
- public void run() {
- if (logger.isTraceEnabled()) logger.trace("Session thread started");
- try {
- while (!isClosed()){
- final List<ReceivedMessage> messageListCopy;
- final List<TransactedReceiveOperation> rolledbackMessageListCopy;
- final MessageListener msglistener;
- synchronized (lockObject){
- while (!isClosed() &&
- (!sessionState.isStarted() ||
- (null == getMessageListener() && 0 == subscriptions.getNumSubscribers()) ||
- (messageList.isEmpty() && rolledbackMessageList.isEmpty())
- )
- ) {
- // Check for buffer over-run's due to no listener being available !
- if (messageList.size() > MAX_SESSION_BUFFERED_MESSAGES){
- // simply discard it with an error logged.
- if (logger.isInfoEnabled()) logger.info("Discarding " + messageList.size() +
- " messages since there are no consumers for them");
- messageList.clear();
- }
- // Check for buffer over-run's due to no listener being available !
- if (rolledbackMessageList.size() > MAX_SESSION_BUFFERED_MESSAGES){
- // simply discard it with an error logged.
- if (logger.isInfoEnabled()) logger.info("Discarding " + rolledbackMessageList.size() +
- " messages since there are no consumers for them from recovered list.");
- rolledbackMessageList.clear();
- }
-
- if (logger.isTraceEnabled()) logger.trace(this + "sessionState : " + sessionState +
- ", listener " + getMessageListener() + ", numSubscribers : " +
- subscriptions.getNumSubscribers());
- try {
- lockObject.wait(500);
- } catch (InterruptedException e) {
- // Should we ignore this ? There is no way this thread can be interrupted currently -
- // while closing it will cause issues !
- // Log and forget
- if (logger.isDebugEnabled()) {
- logger.debug("Ignoring interrupted exception", e);
- }
- }
- }
-
- if (isClosed()) break;
- msglistener = getMessageListener();
- messageListCopy = new ArrayList<ReceivedMessage>(messageList);
- rolledbackMessageListCopy = new ArrayList<TransactedReceiveOperation>(rolledbackMessageList);
- messageList.clear();
- rolledbackMessageList.clear();
- assert subscriptions.getNumSubscribers() > 0 || null != msglistener;
- }
-
- if (logger.isTraceEnabled()) logger.trace("Processing " + messageListCopy.size() +
- " messages using listener ? " + (null != msglistener));
-
- dispatchReceivedMessagesToSubscribers(msglistener, messageListCopy, rolledbackMessageListCopy);
- }
- } finally {
- String msg = null;
- synchronized (lockObject){
- lockObject.notifyAll();
- messageListenerThreadFinished = true;
- if (logger.isTraceEnabled()) msg = "Exiting thread and setting " +
- messageListenerThreadFinished;
- }
- if (logger.isTraceEnabled()) logger.trace(msg);
- }
- }
-
- private void dispatchReceivedMessagesToSubscribers(MessageListener sessionMessageListener,
- List<ReceivedMessage> messageListCopy,
- List<TransactedReceiveOperation> rolledbackMessageListCopy) {
- assert null != messageListCopy;
-
- // Doing it before processing messageList.
- handleRollbackInDispatch(rolledbackMessageListCopy);
-
- for (final ReceivedMessage receivedMessage : messageListCopy){
-
- if (isClosed()) break;
-
- // It is possible that previous listener rolledback transaction ... check that before
- // delivering the other messages !
- // Else we will mess up the oder of message delivery.
- {
- int retryCount = 0;
- while (retryCount < RETRY_DISPATCH_TO_TRANSACTION_ATTEMPTS){
- if (! handleRollbackInDispatch(null)) break;
- retryCount ++;
- }
- if (RETRY_DISPATCH_TO_TRANSACTION_ATTEMPTS == retryCount){
- // we cant do much - close session and abort.
- try {
- SessionImpl.this.close();
- } catch (JMSException e) {
- if (logger.isDebugEnabled()) logger.debug("Exception closing session", e);
- }
- return ;
- }
- }
-
- final Subscription subscription = createSubscription(receivedMessage.destinationType,
- receivedMessage.originalMessage.getSourceName(), receivedMessage.originalMessage.getSubscriberId());
-
- // COW - so no need to worry about concurrent-mod's or inconsistent states - other than
- // potential stale state,
- // which is fine since MessageConsumer's are essentially immutable from basic state point
- // of view (subscriberId, destination).
- CopyOnWriteArrayList<? extends MessageConsumer> subscriberList =
- subscriptions.getSubscribers(subscription);
- if (null == subscriberList) continue;
-
- if (! subscriberList.listIterator().hasNext()) continue;
-
- // For selector support - pick up the last register
- Node ast = subscriptions.getSelectorExpression(subscription);
- if (logger.isTraceEnabled()) logger.trace("subscription : " + subscription + ", selector : " + ast);
- if (null != ast){
- // final Boolean value = SelectorParser.evaluateSelector(ast, receivedMessage.originalMessage);
- final Boolean value = SelectorParser.evaluateSelector(ast, receivedMessage.msg);
-
- if (null == value){
- if (logger.isDebugEnabled()) {
- logger.debug("Unable to evaluate selector ? ... ignoring message");
- logger.debug("Message : " + receivedMessage.msg);
- }
- receivedMessage.originalMessage.getAckRunnable().run();
- continue;
- }
- if (! Boolean.TRUE.equals(value)){
- if (logger.isTraceEnabled()) logger.trace("Selector DID NOT evaluate to true (" +
- value + "), ignore message ignoring message");
- receivedMessage.originalMessage.getAckRunnable().run();
- continue;
- }
- }
-
-
- if (null != sessionMessageListener){
- // Since there was atleast one subscriber when we started this loop (which might
- // not be case anymore, but that is just an uncontrollable harmless race)
- // we can send it to messageListener for the session.
- if (logger.isTraceEnabled()) logger.trace("Dispatching " + receivedMessage.originalMessage +
- " to session listener");
-
- if (isMessageExpired(receivedMessage.originalMessage)){
- // message already expired.
- // This means we acknowledge for all subscribers with this subscription id ...
- receivedMessage.originalMessage.getAckRunnable().run();
- continue;
- }
-
- try {
- final MessageImpl message = MessageUtil.createCloneForDispatch(this,
- receivedMessage.originalMessage, receivedMessage.originalMessage.getSourceName(),
- receivedMessage.originalMessage.getSubscriberId());
- deliverToListener(sessionMessageListener, receivedMessage, message, false);
- } catch (JMSException e) {
- // Unexpected not to be able to clone ...
- if (logger.isDebugEnabled()) {
- logger.debug("Unexpected exception trying to process message");
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- continue;
- }
-
- for (final MessageConsumer subscriber : subscriberList){
- if (isClosed()) break;
- try {
- final MessageListener subscriberListener = subscriber.getMessageListener();
- // Clone - since each subscrber can modify the message. We are optimizing this
- // to clone only if subscriberList
- // has more than one subscriber to avoid the (potentially) expensive creation.
- if (getNoLocal(subscription, subscriber)){
- if (isLocallyPublished(receivedMessage.originalMessage.getJMSMessageID())){
- // This means we acknowledge for all subscribers with this subscription id ...
- receivedMessage.originalMessage.getAckRunnable().run();
- continue;
- }
- }
- if (isMessageExpired(receivedMessage.originalMessage)){
- receivedMessage.originalMessage.getAckRunnable().run();
- continue;
- }
-
- final MessageImpl message = MessageUtil.createCloneForDispatch(this,
- receivedMessage.originalMessage, receivedMessage.originalMessage.getSourceName(),
- receivedMessage.originalMessage.getSubscriberId());
-
- if (logger.isTraceEnabled()) logger.trace("Dispatching " + message +
- " to subscriber subscriberListener ? " + (subscriberListener != null));
-
- if (null != subscriberListener) {
- deliverToListener(subscriberListener, receivedMessage, message, false);
- }
- else {
- sessionFacade.enqueueReceivedMessage(subscriber,
- new ReceivedMessage(receivedMessage.originalMessage, message,
- receivedMessage.destinationType), false);
- }
-
- if (logger.isTraceEnabled()) logger.trace("Dispatching " + message +
- " to subscriberListener ? " + (subscriberListener != null) + ", DONE");
- } catch (JMSException e) {
- // Unexpected not to be able to clone ...
- if (logger.isDebugEnabled()) {
- logger.debug("Unexpected exception trying to process message", e);
- }
- continue ;
- }
- }
- }
-
- if (logger.isTraceEnabled()) logger.trace("dispatchReceivedMessagesToSubscribers() DONE");
- }
-
- private boolean getNoLocal(Subscription subscription, MessageConsumer subscriber) throws JMSException {
- if (subscription.isTopic()) return ((TopicSubscriber) subscriber).getNoLocal();
- // nothing equivalent for queue.
- // if (subscription.isQueue()) return ((QueueReceiver) subscriber).getNoLocal();
- return false;
- }
-
- // Note that rollback can happen WHILE a listener is being run - so we need to check this
- // between EACH message delivery :-(
- // Not just as part of block draining of the queue.
- // returns true if there was any async operation to rollback (specifically async !).
- private boolean handleRollbackInDispatch(List<TransactedReceiveOperation> rolledbackMessageListCopy) {
- if (null == rolledbackMessageListCopy) {
- // Attempt to drain the queue.
- synchronized (lockObject){
- if (rolledbackMessageList.isEmpty()) return false;
- rolledbackMessageListCopy = new ArrayList<TransactedReceiveOperation>(rolledbackMessageList);
- rolledbackMessageList.clear();
- }
- }
-
- if (logger.isTraceEnabled()) logger.trace("rolledbackMessageList (" +
- rolledbackMessageListCopy.size() + ") ... " + rolledbackMessageListCopy);
-
- LinkedList<TransactedReceiveOperation> listenerDeliveryList = new LinkedList<TransactedReceiveOperation>();
- for (TransactedReceiveOperation receiveOp : rolledbackMessageListCopy){
- if (isClosed()) break;
- receiveOp.recover(listenerDeliveryList);
- }
- for (TransactedReceiveOperation receiveOp : listenerDeliveryList){
- if (isClosed()) break;
- receiveOp.recoverForListener();
- }
-
- return listenerDeliveryList.size() > 0;
- }
-
- private void deliverToListener(MessageListener sessionMessageListener, ReceivedMessage receivedMessage,
- MessageImpl theMessage, boolean redelivery) {
-
- // NOT re-enterent method ...
- closeFromWithinListener.set(true);
- try {
- int errorRetry = 0;
- boolean success = false;
- final int retryFor =
- (!getTransacted() &&
- (Session.AUTO_ACKNOWLEDGE == getAcknowledgeMode() ||
- Session.CLIENT_ACKNOWLEDGE == getAcknowledgeMode()))
- ? 3 : 1;
-
- while (errorRetry < retryFor && !isClosed()){
- try {
- if (redelivery || errorRetry > 0) theMessage.setJMSRedelivered(true);
-
- // Changed my mind, always ack the message before processing it. This seems to
- // be consistent with activemq testcases too ...
- handleAutomaticMessageAcknowledgement(receivedMessage, sessionMessageListener);
- sessionMessageListener.onMessage(theMessage);
- success = true;
- break ;
- } catch (RuntimeException rEx){
- // Badly behaved client, retry ...
- if (logger.isInfoEnabled())
- logger.info("Unexpected runtime exception from client message listener.", rEx);
- }
- errorRetry ++;
- }
-
- if (isClosed()) return ;
-
- if (!success){
- // If failed, then reset transaction state - so that next txn will not get affected by this.
- rollbackTransactionState();
- // We gave up deliverying message ...
- if (retryFor > 1) {
- if (logger.isInfoEnabled())
- logger.info("Delivery of message to listener resulted in repeated failures, " +
- " dropping message - session recovery should be used to handle it.");
- }
- else {
- if (logger.isInfoEnabled()) logger.info("Use session recovery to handle message");
- }
- }
- } finally {
- closeFromWithinListener.remove();
- }
- }
-
- public MessagingSessionFacade.DestinationType findDestinationType(String destination) throws JMSException {
- return sessionFacade.findDestinationType(destination);
- }
-
- public MessagingSessionFacade.DestinationType findDestinationType(Destination destination) throws JMSException {
- return sessionFacade.findDestinationType(destination);
- }
-
- @Override
- public MessageProducer createProducer(Destination destination) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- connection.initConnectionClientID();
-
- return createProducerImpl(findDestinationType(destination), destination);
- }
-
- private MessageProducer createProducerImpl(MessagingSessionFacade.DestinationType type,
- Destination destination) throws JMSException {
- switch (type){
- case QUEUE:
- return sessionFacade.createQueueSender(destination);
- case TOPIC:
- return sessionFacade.createTopicPublisher(destination);
- default:
- throw new JMSException("Unable to find destination type " + destination +
- ", please use explicit queue/topic methods to create producer");
- }
- }
-
- // delegate to this IF this method can be invoked (specifically, if not QueueSession)
- protected TopicPublisher createPublisherImpl(Topic topic) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (null == topic) throw new InvalidDestinationException("Illegal destination");
- connection.initConnectionClientID();
-
- return (TopicPublisher) createProducerImpl(MessagingSessionFacade.DestinationType.TOPIC, topic);
- }
-
-
- @Override
- public MessageConsumer createConsumer(Destination destination) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (null == destination) throw new InvalidDestinationException("Illegal destination");
- connection.initConnectionClientID();
-
- return createConsumerImpl(findDestinationType(destination), destination);
- }
-
- private MessageConsumer createConsumerImpl(MessagingSessionFacade.DestinationType type,
- Destination destination) throws JMSException {
- switch (type){
- case QUEUE:
- return sessionFacade.createQueueReceiver(destination);
- case TOPIC:
- return sessionFacade.createTopicSubscriber(destination);
- default:
- throw new JMSException("Unable to find destination type " + destination +
- ", please use explicit queue/topic methods to create consumer");
- }
- }
-
- // delegate to this IF this method can be invoked (specifically, if not QueueSession)
- protected TopicSubscriber createSubscriberImpl(Topic topic) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (null == topic) throw new InvalidDestinationException("Illegal destination");
- connection.initConnectionClientID();
-
- return (TopicSubscriber) createConsumerImpl(MessagingSessionFacade.DestinationType.TOPIC, topic);
- }
-
- // delegate to this IF this method can be invoked (specifically, if not QueueSession)
- protected TopicSubscriber createSubscriberImpl(Topic topic, String messageSelector, boolean noLocal)
- throws JMSException {
-
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (null == topic) throw new InvalidDestinationException("Illegal destination");
- connection.initConnectionClientID();
-
- return (TopicSubscriber) createConsumerImpl(MessagingSessionFacade.DestinationType.TOPIC,
- topic, messageSelector, noLocal);
- }
-
-
- @Override
- public MessageConsumer createConsumer(Destination destination, String messageSelector) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (null == destination) throw new InvalidDestinationException("Illegal destination");
- connection.initConnectionClientID();
-
- return createConsumer(destination, messageSelector, false);
- }
-
- @Override
- public MessageConsumer createConsumer(Destination destination, String messageSelector,
- boolean noLocal) throws JMSException {
-
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (null == destination) throw new InvalidDestinationException("Illegal destination");
- connection.initConnectionClientID();
-
- return createConsumerImpl(findDestinationType(destination), destination, messageSelector, noLocal);
- }
-
- private MessageConsumer createConsumerImpl(MessagingSessionFacade.DestinationType type, Destination destination,
- String messageSelector, boolean noLocal) throws JMSException {
- switch (type){
- case QUEUE:
- return sessionFacade.createQueueReceiver(destination, messageSelector, noLocal);
- case TOPIC:
- return sessionFacade.createTopicSubscriber(destination, messageSelector, noLocal);
- default:
- throw new JMSException("Unable to find destination type " + destination +
- ", please use explicit queue/topic methods to create consumer");
- }
- }
-
- // TODO: Check if it is actually a Queue !
- @Override
- public Queue createQueue(String queueName) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- return (Queue) getDestination(MessagingSessionFacade.DestinationType.QUEUE, queueName);
- }
-
- // delegate to this IF this method can be invoked (specifically, if not TopicSession)
- protected QueueReceiver createReceiverImpl(Queue queue) throws JMSException {
- return sessionFacade.createQueueReceiver(queue);
- }
-
- // delegate to this IF this method can be invoked (specifically, if not QueueSession)
- protected QueueReceiver createReceiverImpl(Queue queue, String messageSelector) throws JMSException {
- return sessionFacade.createQueueReceiver(queue, messageSelector);
- }
-
- // delegate to this IF this method can be invoked (specifically, if not QueueSession)
- protected QueueSender createSenderImpl(Queue queue) throws JMSException {
- return sessionFacade.createQueueSender(queue);
- }
-
- // TODO: Check if it is actually a Topic !
- @Override
- public Topic createTopic(String topicName) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- if (null == topicName) throw new InvalidDestinationException("Illegal destination");
- connection.initConnectionClientID();
-
- return (Topic) getDestination(MessagingSessionFacade.DestinationType.TOPIC, topicName);
- }
-
- @Override
- public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- if (null == topic) throw new InvalidDestinationException("Illegal destination");
- if (null == subscribedId) throw new JMSException("Illegal subscribedId");
- connection.initConnectionClientID();
-
- subscriptions.registerSubscriberIdToTopic(subscribedId, topic.getTopicName());
- return sessionFacade.createDurableSubscriber(topic, createSubscriberId(subscribedId));
- }
-
- @Override
- public TopicSubscriber createDurableSubscriber(Topic topic, String subscribedId, String messageSelector,
- boolean noLocal) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- if (null == topic) throw new InvalidDestinationException("Illegal destination");
- if (null == subscribedId) throw new JMSException("Illegal subscribedId");
- connection.initConnectionClientID();
-
- subscriptions.registerSubscriberIdToTopic(subscribedId, topic.getTopicName());
- return sessionFacade.createDurableSubscriber(topic, createSubscriberId(subscribedId),
- messageSelector, noLocal);
- }
-
- @Override
- public void unsubscribe(String subscribedId) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- final String topicName = subscriptions.findTopicNameForSubscriberId(subscribedId);
- sessionFacade.unsubscribeFromTopic(topicName, createSubscriberId(subscribedId));
- }
-
- public String createSubscriberId(final String subscribedId) throws JMSException {
- final String clientId = connection.getClientID();
- StringBuilder sb = new StringBuilder();
-
- // Some arbitrary combination of client id and subscriber id.
- sb.append("CLIENT_ID:");
- sb.append(clientId);
- sb.append('|');
- sb.append("SUBSCRIBER_ID:");
- sb.append(subscribedId);
-
- return sb.toString();
- }
-
- /*
- public String createTemporaryTopicId() throws JMSException {
- final String clientId = connection.getClientID();
- StringBuilder sb = new StringBuilder();
-
- // Some arbitrary combination of client id and subscriber id.
- sb.append("CLIENT_ID:");
- sb.append(clientId);
- sb.append('|');
- sb.append("TOPIC_ID:");
- sb.append(generateRandomString());
-
- return sb.toString();
- }
- */
-
- @Override
- public QueueBrowser createBrowser(Queue queue) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- return sessionFacade.createBrowser(queue);
- }
-
- @Override
- public QueueBrowser createBrowser(Queue queue, String messageSelector) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- return sessionFacade.createBrowser(queue, messageSelector);
- }
-
- @Override
- public TemporaryQueue createTemporaryQueue() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- return sessionFacade.createTemporaryQueue();
- }
-
- @Override
- public TemporaryTopic createTemporaryTopic() throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
-
- return sessionFacade.createTemporaryTopic();
- }
-
- public void subscriberCreated() {
- // subscriberCreatedCount ++;
- }
-
- public void acknowledge(MessageImpl message) throws JMSException {
- if (sessionState.isInCloseMode()) throw new javax.jms.IllegalStateException("Already closed");
- // If NOT in explicit acknowledge mode, ignore request.
- if (Session.CLIENT_ACKNOWLEDGE != getAcknowledgeMode()) return;
- // If in transaction, ignore request.
- if (getTransacted()) return ;
-
- sessionFacade.acknowledge(message);
- }
-
- public String toName(Destination destination) throws JMSException {
- if (destination instanceof Topic) return ((Topic)destination).getTopicName();
- if (destination instanceof Queue) return ((Queue)destination).getQueueName();
-
- throw new javax.jms.IllegalStateException("Unknown/unsupported destination " + destination);
- }
-
- public static Topic asTopic(final String topicName){
- return new Topic() {
- @Override
- public String getTopicName() throws JMSException {
- return topicName;
- }
-
- @Override
- public String toString(){
- return topicName;
- }
- };
- }
-
-
- public static Queue asQueue(final String queueName){
- return new Queue() {
- @Override
- public String getQueueName() throws JMSException {
- return queueName;
- }
-
- @Override
- public String toString(){
- return queueName;
- }
- };
- }
- // TODO: Convert to JNDI lookup.
- public Destination getDestination(final MessagingSessionFacade.DestinationType type,
- final String destination) throws JMSException {
- switch (type){
- case TOPIC:
- return asTopic(destination);
- case QUEUE:
- return asQueue(destination);
- default:
- throw new JMSException("Unknown destination type " + type +
- " for destination " + destination);
- }
- }
-
- public static String generateRandomString() {
- // UUID is expensive, but using it for now ...
- return UUID.randomUUID().toString();
- }
-
- public void registerTopicSubscriptionInfo(TopicSubscription topicSubscription, Node selectorAst) {
- subscriptions.registerTopicSubscriptionSelector(topicSubscription, selectorAst);
- }
-
- public void registerQueueSubscriptionInfo(QueueSubscription queueSubscription, Node selectorAst) {
- subscriptions.registerQueueSubscriptionSelector(queueSubscription, selectorAst);
- }
-
- // returns true IF we need to do an explicit subscribe to the topic (there was NO subscription to it earlier).
- public void registerTopicSubscriber(TopicSubscriber topicSubscriber) throws JMSException {
- registerSubscriber(topicSubscriber, MessagingSessionFacade.DestinationType.TOPIC,
- topicSubscriber.getTopic().getTopicName(), sessionFacade.getSubscriberId(topicSubscriber));
- }
-
- public void registerQueueSubscriber(QueueReceiver queueReceiver) throws JMSException {
- registerSubscriber(queueReceiver, MessagingSessionFacade.DestinationType.QUEUE,
- queueReceiver.getQueue().getQueueName(), sessionFacade.getSubscriberId(queueReceiver));
- }
-
- private void registerSubscriber(MessageConsumer subscriber, MessagingSessionFacade.DestinationType type,
- final String destination, final String subscriberId) throws JMSException {
-
- assert MessagingSessionFacade.DestinationType.QUEUE == type ||
- MessagingSessionFacade.DestinationType.TOPIC == type;
-
- boolean needSubscription = false;
- boolean needDelivery = false;
- if (logger.isTraceEnabled()) logger.trace("Registering ... " + subscriber + " for " + destination +
- ", sid " + subscriberId);
-
- synchronized (lockObject){
- if (sessionState.isInCloseMode()) throw new JMSException("Already closed");
-
- // already subscribed.
- if (! subscriptions.addToSubscriberSet(subscriber)) return ;
-
- if (subscriptions.addToSubscribers(subscriber, type, destination, subscriberId)) {
- // needSubscription = sessionState.isStarted();
- needSubscription = ! sessionState.isInCloseMode();
- needDelivery = sessionState.isStarted();
- }
- }
-
- // TODO: There is a potential race here between registering/starting subscription and
- // stopping/closing subscription(s) elsewhere.
- // We should resolve it by taking a per List lock (which is gauranteed to be non-null here)
- // and a per List subscription status.
- // For now, NOT handling crazy edge-cases like this - under most circumsances, this will
- // fail for other reasons anyway !
-
- // Session must be used by clients only in a thread safe manner, since it is ok to do this
- // outside the lock.
- if (needSubscription){
- if (logger.isTraceEnabled()) logger.trace("Subscribing ... " + subscriber + " for " +
- destination + ", sid " + subscriberId);
-
- if (MessagingSessionFacade.DestinationType.TOPIC == type){
- // Only for topic's, right ?
- try {
- sessionFacade.subscribeToTopic(destination, subscriberId);
- } catch (JMSException e){
- // It might be possible for this to fail ...
- // Log and ignore
- if (logger.isDebugEnabled()) {
- logger.debug("Error subscribing from topic for entry : " + subscriberId);
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- if (logger.isTraceEnabled()) logger.trace("Subscribing ... " + subscriber + " for " +
- destination + ", sid " + subscriberId + " DONE");
- }
-
- if (needDelivery) {
- if (MessagingSessionFacade.DestinationType.TOPIC == type){
- if (logger.isTraceEnabled()) logger.trace("Topic delivery ... " + subscriber + " for " +
- destination + ", sid " + subscriberId);
- sessionFacade.startTopicDelivery(destination, subscriberId);
- if (logger.isTraceEnabled()) logger.trace("Topic delivery ... " + subscriber + " for " +
- destination + ", sid " + subscriberId + " DONE");
- }
- else {
- if (logger.isTraceEnabled()) logger.trace("Queue delivery ... " + subscriber + " for " +
- destination + ", sid " + subscriberId);
- sessionFacade.startQueueDelivery(destination, subscriberId);
- if (logger.isTraceEnabled()) logger.trace("Queue delivery ... " + subscriber + " for " +
- destination + ", sid " + subscriberId + " DONE");
- }
- }
-
- if (logger.isTraceEnabled()) logger.trace("registerSubscriber ... " + messageListenerThreadStarted);
- if (! messageListenerThreadStarted){
- try {
- this.messageListenerThread.start();
- } catch (IllegalThreadStateException itse){
- // ignore
- // This should not happen, it will happen when Session is used in an MT-unsafe manner,
- // contrary to what is expected from JMS.
- if (logger.isDebugEnabled()) logger.debug("Unexpected", itse);
- }
- messageListenerThreadStarted = true;
- }
- if (logger.isTraceEnabled()) logger.trace("registerSubscriber ... DONE");
- }
-
- public void unregisterTopicSubscriber(TopicSubscriber topicSubscriber) throws JMSException {
- unregisterSubscriber(topicSubscriber, MessagingSessionFacade.DestinationType.TOPIC,
- topicSubscriber.getTopic().getTopicName(), sessionFacade.getSubscriberId(topicSubscriber));
- }
-
- public void unregisterQueueReceiver(QueueReceiver queueReceiver) throws JMSException {
- unregisterSubscriber(queueReceiver, MessagingSessionFacade.DestinationType.QUEUE,
- queueReceiver.getQueue().getQueueName(), sessionFacade.getSubscriberId(queueReceiver));
- }
-
- private void unregisterSubscriber(MessageConsumer subscriber, MessagingSessionFacade.DestinationType type,
- final String destination, final String subscriberId) throws JMSException {
-
- assert MessagingSessionFacade.DestinationType.QUEUE == type ||
- MessagingSessionFacade.DestinationType.TOPIC == type;
- final boolean stopDelivery;
-
- synchronized (lockObject){
- // if in closing, continue on anyway ...
- if (isClosed()) return ;
-
- stopDelivery = subscriptions.removeSubscriber(subscriber, type, destination, subscriberId);
- if (stopDelivery) {
- if (! subscriptions.getAllConsumersSet().remove(subscriber)) return ;
- }
- }
-
- // Session is expected to be used in a MT safe manner, since it is MT-unsafe.
- if (stopDelivery){
- if (MessagingSessionFacade.DestinationType.TOPIC == type){
- stopTopicDelivery(destination, subscriberId);
- }
- else {
- stopQueueDelivery(destination, subscriberId);
- }
- }
- }
-
- public void handleAutomaticMessageAcknowledgement(ReceivedMessage receivedMessage,
- MessageListener sessionMessageListener) {
- doHandleAutomaticMessageAcknowledgement(new TransactedReceiveOperation(receivedMessage,
- sessionMessageListener));
- }
-
- public void handleAutomaticMessageAcknowledgement(ReceivedMessage receivedMessage, MessageConsumer subscriber) {
- doHandleAutomaticMessageAcknowledgement(new TransactedReceiveOperation(receivedMessage, subscriber));
- }
-
- // This is a provider internal method.
- private void doHandleAutomaticMessageAcknowledgement(TransactedReceiveOperation receiveOperation) {
- if (isClosed()) return ;
- // If in transaction, ignore.
- if (transacted) {
- enqueueReceiveWithinTransaction(receiveOperation);
- return ;
- }
-
- if (Session.AUTO_ACKNOWLEDGE == getAcknowledgeMode() ||
- Session.DUPS_OK_ACKNOWLEDGE == getAcknowledgeMode()){
- // Ignore (any) exceptions which might be thrown ...
- try {
- if (logger.isTraceEnabled()) logger.trace("acknowledging ... " + receiveOperation);
- receiveOperation.receivedMessage.originalMessage.getAckRunnable().run();
- } catch (Exception ex){
- if (logger.isDebugEnabled()) {
- logger.debug("Ignoring exception while sending ack ... ", ex);
- }
- }
- }
- }
-
-
- public void unsubscribeFromTopic(String topicName, String subscribedId) throws JMSException {
- sessionFacade.unsubscribeFromTopic(topicName, subscribedId);
- }
-
- public void stopTopicDelivery(String topicName, String subscribedId) throws JMSException {
- sessionFacade.stopTopicDelivery(topicName, subscribedId);
- }
-
- public void stopQueueDelivery(String queueName, String subscribedId) throws JMSException {
- sessionFacade.stopQueueDelivery(queueName, subscribedId);
- }
-
- public void messageReceived(final MessageImpl msg, MessagingSessionFacade.DestinationType type)
- throws JMSException {
-
- String traceMsg = null;
- ReceivedMessage receivedMessage = new ReceivedMessage(msg, msg, type);
- synchronized (lockObject){
- // ignore if closed ... continue on if in closing state.
- if (isClosed()) return ;
- messageList.add(receivedMessage);
-
- if (!getTransacted() && CLIENT_ACKNOWLEDGE == getAcknowledgeMode()) {
- sessionFacade.registerUnAcknowledgedMessage(receivedMessage);
- }
-
- lockObject.notifyAll();
- if (logger.isTraceEnabled()) traceMsg = "messageReceived from " + msg.getSourceName() +
- ", for " + msg.getSubscriberId() + " = " + msg;
- }
-
- if (logger.isTraceEnabled()) logger.trace(traceMsg);
- }
-
- // A simple immutable datastructure to hold details about a message which has been recieved.
- public static class ReceivedMessage {
- // Ensure that the original message is NOT modified in any way !
- public final MessageImpl originalMessage;
- // This is the message returned to the client : to the listener and/or in the TopicSubscriberImpl -
- // created as a
- // clone of the originalMessage.
- public final MessageImpl msg;
-
- public final MessagingSessionFacade.DestinationType destinationType;
-
- private ReceivedMessage(MessageImpl originalMessage, MessageImpl msg,
- MessagingSessionFacade.DestinationType destinationType) {
- this.originalMessage = originalMessage;
- this.msg = msg;
- this.destinationType = destinationType;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("ReceivedMessage");
- sb.append("{originalMessage=").append(originalMessage);
- sb.append(", msg=").append(msg);
- sb.append(", destinationType=").append(destinationType);
- sb.append('}');
- return sb.toString();
- }
- }
-
- // required to catch resource leaks ...
- @Override
- protected void finalize() throws Throwable {
- super.finalize();
- if (!sessionState.isInCloseMode()) {
- if (logger.isErrorEnabled()) logger.error("Session was NOT closed before it went out of scope");
- close();
- }
- }
-
- public boolean isClosed() {
- return sessionState.isClosed();
- }
-
-
- // TODO: Introduce a daemon thread which periodically updates this - each call to
- // System.currentTimeMillis is a native call
- // And can be very expensive if there are a lot of concurrent invocations to it : learnings
- // from XMPP server circa 2006 !
- public static long currentTimeMillis() {
- return System.currentTimeMillis();
- }
-
- public Subscription createSubscription(MessagingSessionFacade.DestinationType type, String name,
- String subscriberId){
- switch(type){
- case QUEUE:
- return new QueueSubscription(name, subscriberId);
- case TOPIC:
- return new TopicSubscription(name, subscriberId);
- default:
- throw new IllegalArgumentException("Unknown destination type " + type +
- " for destination " + name + ", subscriberId " + subscriberId);
- }
- }
- public interface Subscription {
- public boolean isTopic();
- public boolean isQueue();
- }
-
- public static final class TopicSubscription implements Subscription {
- public final String topicName;
- public final String subscriberId;
-
- public TopicSubscription(String topicName, String subscriberId) {
- if (null == topicName || null == subscriberId) {
- throw new NullPointerException("Unexpected null as parameter topicName: " +
- topicName + ", subscriberId: " + subscriberId);
- }
- this.topicName = topicName;
- this.subscriberId = subscriberId;
- }
-
- public boolean isTopic() { return true; }
-
- public boolean isQueue() { return false; }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- TopicSubscription that = (TopicSubscription) o;
-
- if (!subscriberId.equals(that.subscriberId)) return false;
- if (!topicName.equals(that.topicName)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = topicName.hashCode();
- result = 31 * result + subscriberId.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("TopicSubscription");
- sb.append("{topicName='").append(topicName).append('\'');
- sb.append(", subscriberId='").append(subscriberId).append('\'');
- sb.append('}');
- return sb.toString();
- }
- }
-
- public static final class QueueSubscription implements Subscription {
- public final String queueName;
- public final String subscriberId;
-
- public QueueSubscription(String queueName, String subscriberId) {
- if (null == queueName || null == subscriberId) {
- throw new NullPointerException("Unexpected null as parameter queueName: " +
- queueName + ", subscriberId: " + subscriberId);
- }
- this.queueName = queueName;
- this.subscriberId = subscriberId;
- }
-
- public boolean isTopic() { return false; }
-
- public boolean isQueue() { return true; }
-
- @Override
- public boolean equals(Object o) {
- if (this == o) return true;
- if (o == null || getClass() != o.getClass()) return false;
-
- QueueSubscription that = (QueueSubscription) o;
-
- if (!subscriberId.equals(that.subscriberId)) return false;
- if (!queueName.equals(that.queueName)) return false;
-
- return true;
- }
-
- @Override
- public int hashCode() {
- int result = queueName.hashCode();
- result = 31 * result + subscriberId.hashCode();
- return result;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("QueueSubscription");
- sb.append("{queueName='").append(queueName).append('\'');
- sb.append(", subscriberId='").append(subscriberId).append('\'');
- sb.append('}');
- return sb.toString();
- }
- }
-
-
- private static interface TransactedOperation {
- public void rollback();
- public void commit() throws JMSException;
- public boolean requiresStartedSession();
- }
-
- private class TransactedSendOperation implements TransactedOperation {
- private final String destination;
- private final MessageImpl messageImpl;
- private final Message userMessage;
-
- private TransactedSendOperation(String destination, MessageImpl messageImpl, Message userMessage) {
- this.destination = destination;
- this.messageImpl = messageImpl;
- this.userMessage = userMessage;
- }
-
-
- @Override
- public void commit() throws JMSException {
- String msgId = SessionImpl.this.sessionFacade.publish(destination, messageImpl);
- if (userMessage instanceof MessageImpl) ((MessageImpl) userMessage).setJMSMessageIDInternal(msgId);
- else userMessage.setJMSMessageID(msgId);
- }
-
- public void rollback() {
- // noop ...
- }
-
- @Override
- public boolean requiresStartedSession() {
- return false;
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("TransactedSendOperation");
- sb.append("{destination='").append(destination).append('\'');
- sb.append(", messageImpl=").append(messageImpl);
- sb.append(", userMessage=").append(userMessage);
- sb.append('}');
- return sb.toString();
- }
- }
-
- private class TransactedReceiveOperation implements TransactedOperation {
- private final ReceivedMessage receivedMessage;
- private final MessageListener sessionMessageListener;
- private final MessageConsumer subscriber;
-
- private TransactedReceiveOperation(ReceivedMessage receivedMessage, MessageListener sessionMessageListener) {
- this.receivedMessage = receivedMessage;
- this.sessionMessageListener = sessionMessageListener;
- this.subscriber = null;
- }
-
- private TransactedReceiveOperation(ReceivedMessage receivedMessage, MessageConsumer subscriber) {
- this.receivedMessage = receivedMessage;
- this.subscriber = subscriber;
- this.sessionMessageListener = null;
- }
-
- @Override
- public boolean requiresStartedSession() {
- return true;
- }
-
- @Override
- public void rollback() {
-
- // async dispatch ...
- if (null != sessionMessageListener) {
- synchronized (SessionImpl.this.lockObject){
- rolledbackMessageList.add(this);
- SessionImpl.this.lockObject.notifyAll();
- }
- }
- // If rollback in sync mode, do in same thread - else a rollback, receive WILL see messages
- // in different order !
- // This is also required since in async mode, session IS NOT MT-safe - and so is
- // expecting this behavior.
- else if (null != subscriber){
- try {
- MessageImpl theMessage = MessageUtil.createCloneForDispatch(SessionImpl.this,
- receivedMessage.originalMessage,
- receivedMessage.originalMessage.getSourceName(),
- receivedMessage.originalMessage.getSubscriberId());
- theMessage.setJMSRedelivered(true);
-
- sessionFacade.enqueueReceivedMessage(subscriber,
- new ReceivedMessage(receivedMessage.originalMessage, theMessage,
- receivedMessage.destinationType), true);
- } catch (JMSException e) {
- if (logger.isDebugEnabled()) {
- logger.debug("Unable to enqueue received message to");
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
- }
-
- @Override
- public void commit() throws JMSException {
- try {
- receivedMessage.originalMessage.getAckRunnable().run();
- } catch (Exception ex){
- if (logger.isDebugEnabled()) {
- logger.debug("Ignoring exception while sending ack ... ", ex);
- }
- }
- }
-
- /**
- * Recovery is slightly tricky - we have two cases here :
- * a) Messages which to be consumed via subscriber which support the sync mode - via receive(), variants.
- * b) Messages which are to be consumed via the subscriber or session's async mode - via listener.
- *
- *
- * We have to ensure that message recovery will result in the exact SAME order of message
- * delivery to client as it was done first time
- * (when rollback was triggered).
- * To ensure this, the recover method is called in REVERSE order in which operations were
- * enqueued in the txn (log) queue.
- *
- * To handle (a), sessionFacade.enqueueReceivedMessage pushes message to begining of pending
- * message queue in subscriber.
- * Taken along with reverse order of unwinding of txn log, this ensure the desired behavior for (a).
- *
- * To ensure desired behavior for (b), we pass a listenerDeliveryList as parameter - which is
- * used to maintain the
- * order of how to invoke onMessage to recover for async dispatch. Note: we keep adding to
- * begining of this list to ensure that
- * in the end, oldest message in txn log is the first message in listenerDeliveryList when
- * we attempt recovery.
- *
- */
- public void recover(LinkedList<TransactedReceiveOperation> listenerDeliveryList) {
- // Do the actual recovery ...
- if (null != subscriber){
- // already handled in rollback ...
- assert false : "unexpected ...";
- }
- else if (null != sessionMessageListener){
- listenerDeliveryList.addFirst(this);
- }
- }
-
- public void recoverForListener(){
- assert null == subscriber;
- assert null != sessionMessageListener;
-
- try {
- final MessageImpl message = MessageUtil.createCloneForDispatch(SessionImpl.this,
- receivedMessage.originalMessage,
- receivedMessage.originalMessage.getSourceName(),
- receivedMessage.originalMessage.getSubscriberId());
- deliverToListener(sessionMessageListener, receivedMessage, message, true);
- } catch (JMSException e) {
- // Unexpected not to be able to clone ...
- if (logger.isDebugEnabled()) {
- logger.debug("Unexpected exception trying to process message");
- DebugUtil.dumpJMSStacktrace(logger, e);
- }
- }
- }
-
- @Override
- public String toString() {
- final StringBuilder sb = new StringBuilder();
- sb.append("TransactedReceiveOperation");
- sb.append("{receivedMessage=").append(receivedMessage);
- sb.append(", sessionMessageListener=").append(sessionMessageListener);
- sb.append(", subscriber=").append(subscriber);
- sb.append('}');
- return sb.toString();
- }
- }
-
-
- // For txn support.
- private final Object transactionLock = new Object();
- private final List<TransactedOperation> transactedOperation = new LinkedList<TransactedOperation>();
-
- private void rollbackTransactionState(){
- final ArrayList<TransactedOperation> transactedOperationCopy;
- synchronized (transactionLock){
- transactedOperationCopy = new ArrayList<TransactedOperation>(transactedOperation);
- transactedOperation.clear();
- }
- rollbackTransactionState(transactedOperationCopy);
- }
-
- private void rollbackTransactionState(ArrayList<TransactedOperation> transactedOperationCopy){
-
- if (logger.isDebugEnabled()) logger.debug("Attempting to rollback " +
- transactedOperationCopy.size() + " operations");
- if (logger.isTraceEnabled()) logger.trace("Operations : " + transactedOperationCopy);
-
- // Rollback MUST be in reverse order !
- final int size = transactedOperationCopy.size();
- for (int i = size - 1;i >= 0; i --){
- TransactedOperation op = transactedOperationCopy.get(i);
- op.rollback();
- }
- }
-
-
- // Note: the messageImpl MUST be a copy of what the user sent - so that modifications by user
- // WILL NOT affect this.
- public void enqueuePublishWithinTransaction(String topicName, MessageImpl messageImpl, Message userMessage) {
- TransactedSendOperation sendOperation = new TransactedSendOperation(topicName, messageImpl, userMessage);
- synchronized (transactionLock){
- transactedOperation.add(sendOperation);
- }
- }
-
- private void enqueueReceiveWithinTransaction(TransactedReceiveOperation receiveOperation) {
- synchronized (transactionLock){
- transactedOperation.add(receiveOperation);
- }
- }
-
- private void commitTransactionState() throws JMSException {
- final ArrayList<TransactedOperation> transactedOperationCopy;
- synchronized (transactionLock){
- transactedOperationCopy = new ArrayList<TransactedOperation>(transactedOperation);
- transactedOperation.clear();
- }
-
- if (!sessionState.isStarted()){
- // Ensure that there are ONLY send op's - else throw TransactionRolledBackException : we
- // cannot ack message !
- for (TransactedOperation op : transactedOperationCopy){
- if (op.requiresStartedSession())
- throw new TransactionRolledBackException("Commit failed : session is not open - cant ack message");
- }
- }
-
- for (TransactedOperation op : transactedOperationCopy){
- try {
- op.commit();
- } catch (JMSException jEx){
- if (logger.isDebugEnabled()) logger.debug("Commit failed for " + op, jEx);
- rollbackTransactionState(transactedOperationCopy);
- TransactionRolledBackException trbEx = new TransactionRolledBackException("Commit failed");
- trbEx.setLinkedException(jEx);
- throw trbEx;
- }
- }
- }
-
- public boolean isMessageExpired(MessageImpl message) {
- return 0 != message.getJMSExpiration() && SessionImpl.currentTimeMillis() > message.getJMSExpiration();
- }
-
- public boolean isLocallyPublished(String messageId) {
- return connection.isLocallyPublished(messageId);
- }
-
- public void addToLocallyPublishedMessageIds(String messageId) {
- connection.addToLocallyPublishedMessageIds(messageId);
- }
-
- public ConnectionImpl getConnection() {
- return connection;
- }
-
- private static final class Subscriptions {
- // Keeps track of number of subscribers created. This will prevent setMessageListener form
- // succeeding in case subscriberCreatedCount > 0
- // Their use is mutually exclusive.
- // private int subscriberCreatedCount = 0;
- private AtomicInteger numSubscribers = new AtomicInteger(0);
-
- private final ConcurrentHashMap<TopicSubscription, Node> topicSubscriptionToSelectorMap =
- new ConcurrentHashMap<TopicSubscription, Node>(32);
- private final ConcurrentHashMap<QueueSubscription, Node> queueSubscriptionToSelectorMap =
- new ConcurrentHashMap<QueueSubscription, Node>(32);
-
- // We make use of concurrent api for this map (and its list) since we will be reading
- // it heavily concurrently while modifying it rarely.
- private final Map<Subscription, CopyOnWriteArrayList<MessageConsumer>> topicSubscriptionToSubscriberMap =
- new ConcurrentHashMap<Subscription, CopyOnWriteArrayList<MessageConsumer>>();
- private final Map<Subscription, CopyOnWriteArrayList<MessageConsumer>> queueSubscriptionToSubscriberMap =
- new ConcurrentHashMap<Subscription, CopyOnWriteArrayList<MessageConsumer>>();
-
- // The value for the key is irrelevant - there is not
- private final Set<MessageConsumer> allConsumersSet = Collections.newSetFromMap(
- new IdentityHashMap<MessageConsumer, Boolean>());
-
- private static final int SUBSCRIBER_ID_TO_DESTINATION_CACHE_SIZE =
- Integer.getInteger("SUBSCRIBER_ID_TO_DESTINATION_CACHE_SIZE", 1024);
- // This is guarded by subscriberIdTo<Destination> lock - query/modify ONLY in that context !
- private final Map<String, String> topicSubscriberIdToTopicName =
- new LRUCacheMap<String, String>(SUBSCRIBER_ID_TO_DESTINATION_CACHE_SIZE, true);
-
- public Map<Subscription, CopyOnWriteArrayList<MessageConsumer>> createSubscriptionToSubscriberMapCopy() {
- Map<Subscription, CopyOnWriteArrayList<MessageConsumer>> retval
- = new HashMap<Subscription, CopyOnWriteArrayList<MessageConsumer>>();
- retval.putAll(topicSubscriptionToSubscriberMap);
- retval.putAll(queueSubscriptionToSubscriberMap);
- return retval;
- }
-
- public Set<MessageConsumer> getAllConsumersSet() {
- return allConsumersSet;
- }
-
- public CopyOnWriteArrayList<? extends MessageConsumer> getSubscribers(Subscription subscription) {
- if (subscription.isTopic()) {
- assert subscription instanceof TopicSubscription;
- return topicSubscriptionToSubscriberMap.get(subscription);
- }
- if (subscription.isQueue()) {
- assert subscription instanceof QueueSubscription;
- return queueSubscriptionToSubscriberMap.get(subscription);
- }
- throw new IllegalArgumentException("Unknown subscription type " + subscription);
- }
-
- public Node getSelectorExpression(Subscription subscription) {
- if (subscription.isTopic()) {
- assert subscription instanceof TopicSubscription;
- return topicSubscriptionToSelectorMap.get(subscription);
- }
- if (subscription.isQueue()) {
- assert subscription instanceof QueueSubscription;
- return queueSubscriptionToSelectorMap.get(subscription);
- }
- throw new IllegalArgumentException("Unknown subscription type " + subscription);
- }
-
- public void registerSubscriberIdToTopic(String subscribedId, String topicName) throws JMSException {
- synchronized (topicSubscriberIdToTopicName){
- String currentTopicName = topicSubscriberIdToTopicName.get(subscribedId);
- if (null != currentTopicName && !currentTopicName.equals(topicName)) {
- throw new JMSException("There is already a subscription in this session for " +
- "same subscriberId for topic " + currentTopicName);
- }
- topicSubscriberIdToTopicName.put(subscribedId, topicName);
- }
- }
-
- public String findTopicNameForSubscriberId(String subscribedId) throws JMSException {
- synchronized (topicSubscriberIdToTopicName){
- String topicName = topicSubscriberIdToTopicName.get(subscribedId);
- if (null == topicName){
- throw new JMSException("Unable to find topicName for subscriberId " + subscribedId);
- }
- return topicName;
- }
- }
-
- public void registerTopicSubscriptionSelector(TopicSubscription topicSubscription, Node selectorAst) {
- topicSubscriptionToSelectorMap.put(topicSubscription, selectorAst);
- }
-
- public void registerQueueSubscriptionSelector(QueueSubscription queueSubscription, Node selectorAst) {
- queueSubscriptionToSelectorMap.put(queueSubscription, selectorAst);
- }
-
- public boolean addToSubscriberSet(MessageConsumer consumer) {
- return allConsumersSet.add(consumer);
- }
-
- public boolean addToSubscribers(MessageConsumer subscriber, MessagingSessionFacade.DestinationType type,
- String destination, String subscriberId) {
- switch (type){
- case QUEUE :
- return createIfMissingAndAdd(queueSubscriptionToSubscriberMap,
- new QueueSubscription(destination, subscriberId),
- subscriber);
- case TOPIC:
- return createIfMissingAndAdd(topicSubscriptionToSubscriberMap,
- new TopicSubscription(destination, subscriberId),
- subscriber);
- default:
- throw new IllegalArgumentException("Unknown subscription type " + type);
- }
- }
-
- public boolean removeSubscriber(MessageConsumer subscriber, MessagingSessionFacade.DestinationType type,
- String destinatio
<TRUNCATED>