You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@servicemix.apache.org by gn...@apache.org on 2009/03/20 13:08:51 UTC

svn commit: r756427 - in /servicemix/smx4/nmr/trunk/jbi/cluster: engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/

Author: gnodet
Date: Fri Mar 20 12:08:51 2009
New Revision: 756427

URL: http://svn.apache.org/viewvc?rev=756427&view=rev
Log:
SMX4NMR-128: add some synchronizations around the requestor to avoid it being used concurrently by multiple threads

Modified:
    servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
    servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
    servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java?rev=756427&r1=756426&r2=756427&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/engine/src/main/java/org/apache/servicemix/jbi/cluster/engine/ClusterEngine.java Fri Mar 20 12:08:51 2009
@@ -554,23 +554,31 @@
     //-------------------------------------------------------------------------
 
     public void process(Exchange exchange) {
-        JmsRequestor item = null;
         try {
             String corrId = (String) exchange.getProperty(PROPERTY_CORR_ID + "." + name);
             if (corrId != null) {
-                item = pool.resume(corrId);
+                JmsRequestor item = pool.resume(corrId);
+                synchronized (item) {
+                    try {
+                        processExchange(item, exchange);
+                    } finally {
+                        item.close();
+                    }
+                }
             } else {
-                item = pool.newRequestor();
-                item.begin();
+                JmsRequestor item = pool.newRequestor();
+                synchronized (item) {
+                    try {
+                        item.begin();
+                        processExchange(item, exchange);
+                    } finally {
+                        item.close();
+                    }
+                }
             }
-            processExchange(item, exchange);
         } catch (Exception e) {
             // TODO what id the problem is a JMS exception or related
             fail(exchange,  e);
-        } finally {
-            if (item != null) {
-                item.close();
-            }
         }
     }
 
@@ -705,171 +713,173 @@
      * @throws Exception if an error occur
      */
     protected void processExchange(JmsRequestor requestor, Exchange exchange)  throws Exception {
-        decrementPendingExchangeIfNeeded(exchange);
-        boolean rollbackOnErrors;
-        if (exchange.getRole() == Role.Consumer) {
-            rollbackOnErrors = Boolean.TRUE.equals(exchange.getProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name));
-        } else {
-            rollbackOnErrors = this.rollbackOnErrors;
-        }
-        if (exchange.getStatus() == Status.Active) {
-            Message msg = exchange.getFault(false);
-            int type;
-            if (msg != null) {
-                type = JBI_MESSAGE_FAULT;
+        synchronized (requestor) {
+            decrementPendingExchangeIfNeeded(exchange);
+            boolean rollbackOnErrors;
+            if (exchange.getRole() == Role.Consumer) {
+                rollbackOnErrors = Boolean.TRUE.equals(exchange.getProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name));
             } else {
-                msg = exchange.getOut(false);
+                rollbackOnErrors = this.rollbackOnErrors;
+            }
+            if (exchange.getStatus() == Status.Active) {
+                Message msg = exchange.getFault(false);
+                int type;
                 if (msg != null) {
-                    type = JBI_MESSAGE_OUT;
+                    type = JBI_MESSAGE_FAULT;
                 } else {
-                    msg = exchange.getIn(false);
+                    msg = exchange.getOut(false);
                     if (msg != null) {
-                        type = JBI_MESSAGE_IN;
+                        type = JBI_MESSAGE_OUT;
                     } else {
-                        throw new IllegalStateException("No normalized message on an active exchange: " + exchange);
+                        msg = exchange.getIn(false);
+                        if (msg != null) {
+                            type = JBI_MESSAGE_IN;
+                        } else {
+                            throw new IllegalStateException("No normalized message on an active exchange: " + exchange);
+                        }
                     }
                 }
-            }
-            javax.jms.Message message = requestor.getSession().createObjectMessage(msg);
-            message.setIntProperty(JBI_MESSAGE, type);
-            if (type == JBI_MESSAGE_IN) {
-                rollbackOnErrors = this.rollbackOnErrors;
-                exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, rollbackOnErrors);
-                message.setStringProperty(JBI_MEP, exchange.getPattern().getWsdlUri());
-                if (exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP) != null) {
-                    message.setStringProperty(JBI_INTERFACE, exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP).toString());
-                }
-                if (exchange.getOperation() != null) {
-                    message.setStringProperty(JBI_OPERATION, exchange.getOperation().toString());
-                }
-                if (exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP) != null) {
-                    message.setStringProperty(JBI_SERVICE, exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP).toString());
-                }
-                if (exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP) != null) {
-                    ServiceEndpoint se = (ServiceEndpoint) exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP);
-                    message.setStringProperty(JBI_ENDPOINT, "{" + se.getServiceName().toString() + "}" + se.getEndpointName());
-                }
-                // TODO: write exchange properties
-            }
-            message.setBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS, rollbackOnErrors);
-            boolean expectResponse;
-            if (!rollbackOnErrors) {
-                expectResponse = true;
-            } else {
-                switch (exchange.getPattern()) {
-                    case InOnly:
-                        expectResponse = false;
-                        break;
-                    case RobustInOnly:
-                        expectResponse = exchange.getRole() == Role.Provider;
-                        break;
-                    case InOut:
-                        expectResponse = exchange.getRole() == Role.Provider;
-                        break;
-                    default:
-                        // TODO:
-                        expectResponse = true;
-                        break;
-                }
-            }
-            if (expectResponse) {
-                exchanges.put(exchange.getId(), exchange);
-                message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
-                message.setStringProperty(PROPERTY_SENDER_CORR_ID, exchange.getId());
-                if (requestor.getMessage() != null) {
-                    message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
-                    message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                javax.jms.Message message = requestor.getSession().createObjectMessage(msg);
+                message.setIntProperty(JBI_MESSAGE, type);
+                if (type == JBI_MESSAGE_IN) {
+                    rollbackOnErrors = this.rollbackOnErrors;
+                    exchange.setProperty(PROPERTY_ROLLBACK_ON_ERRORS + "." + name, rollbackOnErrors);
+                    message.setStringProperty(JBI_MEP, exchange.getPattern().getWsdlUri());
+                    if (exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP) != null) {
+                        message.setStringProperty(JBI_INTERFACE, exchange.getProperty(MessageExchangeImpl.INTERFACE_NAME_PROP).toString());
+                    }
+                    if (exchange.getOperation() != null) {
+                        message.setStringProperty(JBI_OPERATION, exchange.getOperation().toString());
+                    }
+                    if (exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP) != null) {
+                        message.setStringProperty(JBI_SERVICE, exchange.getProperty(MessageExchangeImpl.SERVICE_NAME_PROP).toString());
+                    }
+                    if (exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP) != null) {
+                        ServiceEndpoint se = (ServiceEndpoint) exchange.getProperty(MessageExchangeImpl.SERVICE_ENDPOINT_PROP);
+                        message.setStringProperty(JBI_ENDPOINT, "{" + se.getServiceName().toString() + "}" + se.getEndpointName());
+                    }
+                    // TODO: write exchange properties
                 }
-                requestor.send(message);
-            } else {
-                message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
-                message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
-                if (requestor.getMessage() != null) {
-                    message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
-                    message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                message.setBooleanProperty(PROPERTY_ROLLBACK_ON_ERRORS, rollbackOnErrors);
+                boolean expectResponse;
+                if (!rollbackOnErrors) {
+                    expectResponse = true;
+                } else {
+                    switch (exchange.getPattern()) {
+                        case InOnly:
+                            expectResponse = false;
+                            break;
+                        case RobustInOnly:
+                            expectResponse = exchange.getRole() == Role.Provider;
+                            break;
+                        case InOut:
+                            expectResponse = exchange.getRole() == Role.Provider;
+                            break;
+                        default:
+                            // TODO:
+                            expectResponse = true;
+                            break;
+                    }
                 }
-                requestor.send(message);
-                // TODO: send done in the tx synchronization
-                done(exchange);
-            }
-        } else if (exchange.getStatus() == Status.Done) {
-            boolean doSend;
-            if (!rollbackOnErrors) {
-                doSend = true;
-            } else {
-                switch (exchange.getPattern()) {
-                    case InOnly:
-                        // never send done for InOnly
-                        doSend = false;
-                        break;
-                    case RobustInOnly:
-                        // only send done when there is no fault
-                        // which means the exchange has a consumer role
-                        doSend = exchange.getRole() == Role.Consumer;
-                        break;
-                    case InOptionalOut:
-                        // TODO
-                        doSend = true;
-                        break;
-                    case InOut:
-                        // in an InOut mep, the DONE status always come from the JBI consumer
-                        doSend = false;
-                        break;
-                    default:
-                        throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+                if (expectResponse) {
+                    exchanges.put(exchange.getId(), exchange);
+                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, exchange.getId());
+                    if (requestor.getMessage() != null) {
+                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                    }
+                    requestor.send(message);
+                } else {
+                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
+                    if (requestor.getMessage() != null) {
+                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                    }
+                    requestor.send(message);
+                    // TODO: send done in the tx synchronization
+                    done(exchange);
+                }
+            } else if (exchange.getStatus() == Status.Done) {
+                boolean doSend;
+                if (!rollbackOnErrors) {
+                    doSend = true;
+                } else {
+                    switch (exchange.getPattern()) {
+                        case InOnly:
+                            // never send done for InOnly
+                            doSend = false;
+                            break;
+                        case RobustInOnly:
+                            // only send done when there is no fault
+                            // which means the exchange has a consumer role
+                            doSend = exchange.getRole() == Role.Consumer;
+                            break;
+                        case InOptionalOut:
+                            // TODO
+                            doSend = true;
+                            break;
+                        case InOut:
+                            // in an InOut mep, the DONE status always come from the JBI consumer
+                            doSend = false;
+                            break;
+                        default:
+                            throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+                    }
                 }
-            }
-            if (doSend) {
-                javax.jms.Message message = requestor.getSession().createMessage();
-                message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_DONE);
-                message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
-                message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
-                if (requestor.getMessage() != null) {
-                    message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
-                    message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                if (doSend) {
+                    javax.jms.Message message = requestor.getSession().createMessage();
+                    message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_DONE);
+                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
+                    if (requestor.getMessage() != null) {
+                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                    }
+                    requestor.send(message);
                 }
-                requestor.send(message);
-            }
-        } else if (exchange.getStatus() == Status.Error) {
-            boolean doSend;
-            if (!rollbackOnErrors) {
-                doSend = true;
-            } else {
-                switch (exchange.getPattern()) {
-                    case InOnly:
-                        // never send errors for InOnly
-                        doSend = false;
-                        break;
-                    case RobustInOnly:
-                        // do not send exchange from the provider back to the consumer
-                        doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
-                        break;
-                    case InOptionalOut:
-                        // TODO
-                        doSend = true;
-                        break;
-                    case InOut:
-                        doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
-                        break;
-                    default:
-                        throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+            } else if (exchange.getStatus() == Status.Error) {
+                boolean doSend;
+                if (!rollbackOnErrors) {
+                    doSend = true;
+                } else {
+                    switch (exchange.getPattern()) {
+                        case InOnly:
+                            // never send errors for InOnly
+                            doSend = false;
+                            break;
+                        case RobustInOnly:
+                            // do not send exchange from the provider back to the consumer
+                            doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
+                            break;
+                        case InOptionalOut:
+                            // TODO
+                            doSend = true;
+                            break;
+                        case InOut:
+                            doSend = pool.getTransacted() == Transacted.None || exchange.getRole() != Role.Consumer;
+                            break;
+                        default:
+                            throw new IllegalStateException("Unsupported MEP: " + exchange.getPattern());
+                    }
                 }
-            }
-            if (doSend) {
-                javax.jms.Message message = requestor.getSession().createObjectMessage(exchange.getError());
-                message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_ERROR);
-                message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
-                message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
-                if (requestor.getMessage() != null) {
-                    message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
-                    message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                if (doSend) {
+                    javax.jms.Message message = requestor.getSession().createObjectMessage(exchange.getError());
+                    message.setIntProperty(JBI_MESSAGE, JBI_MESSAGE_ERROR);
+                    message.setStringProperty(PROPERTY_SENDER_CLUSTER_NAME, name);
+                    message.setStringProperty(PROPERTY_SENDER_CORR_ID, null);
+                    if (requestor.getMessage() != null) {
+                        message.setStringProperty(ClusterEngine.PROPERTY_CLUSTER_NAME, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CLUSTER_NAME));
+                        message.setStringProperty(ClusterEngine.PROPERTY_CORR_ID, requestor.getMessage().getStringProperty(ClusterEngine.PROPERTY_SENDER_CORR_ID));
+                    }
+                    requestor.send(message);
+                } else {
+                    requestor.setRollbackOnly();
                 }
-                requestor.send(message);
             } else {
-                requestor.setRollbackOnly();
+                throw new IllegalStateException("Unknown exchange status: " + exchange);
             }
-        } else {
-            throw new IllegalStateException("Unknown exchange status: " + exchange);
         }
     }
 

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java?rev=756427&r1=756426&r2=756427&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/AbstractPollingRequestorPool.java Fri Mar 20 12:08:51 2009
@@ -102,7 +102,9 @@
 
     public JmsRequestor resume(String id) {
         Requestor requestor = parked.remove(id);
-        requestor.resume();
+        synchronized (requestor) {
+            requestor.resume();
+        }
         return requestor;
     }
 
@@ -131,7 +133,7 @@
          * Retrieve the jms session
          * @return the session
          */
-        public Session getSession() throws JmsException {
+        public synchronized Session getSession() throws JmsException {
             try {
                 if (session == null) {
                     Connection conToUse;
@@ -161,7 +163,7 @@
          * @return the JMS message
          * @throws javax.jms.JMSException if an error occur
          */
-        public Message receive(long timeout) throws JMSException {
+        public synchronized Message receive(long timeout) throws JMSException {
             if (timeout < 0) {
                 message = getConsumer().receive();
             } else {
@@ -188,7 +190,7 @@
          * @param msg the message to send
          * @throws javax.jms.JMSException if an error occur
          */
-        public void send(Message msg) throws JmsException {
+        public synchronized void send(Message msg) throws JmsException {
             if (logger.isDebugEnabled()) {
                 logger.debug("Sending JMS message: " + msg);
             }
@@ -217,7 +219,7 @@
          * the item is no longer used.  If an item has been parked previously,
          * this call will do nothing.
          */
-        public void close() {
+        public synchronized void close() {
             if (session != null) {
                 if (!suspended) {
                     try {
@@ -274,7 +276,7 @@
          * This should be called when the same session need to be reused at a later time.
          * @param id the parking id
          */
-        public void suspend(String id) {
+        public synchronized void suspend(String id) {
             if (transacted == Transacted.Xa) {
                 try {
                     if (logger.isDebugEnabled()) {
@@ -317,7 +319,7 @@
          * Mark this item has not parked anymore so that it can later be
          * returned to the pool by a call to {@link #close()}.
          */
-        protected void resume() {
+        protected synchronized void resume() {
             if (transaction != null) {
                 try {
                     if (logger.isDebugEnabled()) {
@@ -340,7 +342,7 @@
          * Internal use only.
          * Prepare this item to be used again.
          */
-        public void begin() throws JmsException {
+        public synchronized void begin() throws JmsException {
             startXaTransaction();
             getSession();
         }
@@ -364,7 +366,7 @@
          * This means that the transaction will be rolled back instead of
          * being committed.
          */
-        public void setRollbackOnly() {
+        public synchronized void setRollbackOnly() {
             rollbackOnly = true;
             if (transacted == Transacted.Xa) {
                 try {

Modified: servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java
URL: http://svn.apache.org/viewvc/servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java?rev=756427&r1=756426&r2=756427&view=diff
==============================================================================
--- servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java (original)
+++ servicemix/smx4/nmr/trunk/jbi/cluster/requestor/src/main/java/org/apache/servicemix/jbi/cluster/requestor/GenericJmsRequestorPool.java Fri Mar 20 12:08:51 2009
@@ -462,29 +462,30 @@
         }
 
         protected boolean invokeListener() throws Exception {
-            Requestor req = null;
             boolean messageReceived = false;
-            try {
-                req = createRequestor(true);
-                req.begin();
-                messageReceived = req.receive(receiveTimeout) != null;
-                if (messageReceived) {
-                    if (logger.isDebugEnabled()) {
-                        logger.debug("Received message of type [" + req.getMessage().getClass() + "] from consumer");
+            Requestor req = createRequestor(true);
+            synchronized (req) {
+                try {
+                    req.begin();
+                    messageReceived = req.receive(receiveTimeout) != null;
+                    if (messageReceived) {
+                        if (logger.isDebugEnabled()) {
+                            logger.debug("Received message of type [" + req.getMessage().getClass() + "] from consumer");
+                        }
+                        messageReceived(this, req.getSession());
+                        listener.onMessage(req);
+                        lastMessageSucceeded = true;
+                    } else {
+                        if (logger.isTraceEnabled()) {
+                            logger.trace("Consumer did not receive a message");
+                        }
+                        noMessageReceived(this, req.getSession());
+                        lastMessageSucceeded = true;
                     }
-                    messageReceived(this, req.getSession());
-                    listener.onMessage(req);
-                    lastMessageSucceeded = true;
-                } else {
-                    if (logger.isTraceEnabled()) {
-                        logger.trace("Consumer did not receive a message");
+                } finally {
+                    if (req != null) {
+                        req.close();
                     }
-                    noMessageReceived(this, req.getSession());
-                    lastMessageSucceeded = true;
-                }
-            } finally {
-                if (req != null) {
-                    req.close();
                 }
             }
             return messageReceived;