You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/10/09 13:57:30 UTC

svn commit: r1180587 - /camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/

Author: davsclaus
Date: Sun Oct  9 11:57:29 2011
New Revision: 1180587

URL: http://svn.apache.org/viewvc?rev=1180587&view=rev
Log:
CAMEL-4529: Better trace logging for correlationID add/remove to track if something is wrong with that.

Added:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java   (contents, props changed)
      - copied, changed from r1180554, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
Removed:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
Modified:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java

Copied: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java (from r1180554, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java?p2=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java&p1=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java&r1=1180554&r2=1180587&rev=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java Sun Oct  9 11:57:29 2011
@@ -21,11 +21,15 @@ import java.util.concurrent.ScheduledExe
 import org.apache.camel.support.DefaultTimeoutMap;
 
 /**
+ * A {@link org.apache.camel.TimeoutMap} which is used to track reply messages which
+ * has been timed out, and thus should trigger the waiting {@link org.apache.camel.Exchange} to
+ * timeout as well.
+ *
  * @version 
  */
-public class CorrelationMap extends DefaultTimeoutMap<String, ReplyHandler> {
+public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
 
-    public CorrelationMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+    public CorrelationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
         super(executor, requestMapPollTimeMillis);
     }
 
@@ -33,6 +37,7 @@ public class CorrelationMap extends Defa
         // trigger timeout
         value.onTimeout(key);
         // return true to remove the element
+        log.trace("Evicted correlationID: {}", key);
         return true;
     }
 
@@ -44,5 +49,14 @@ public class CorrelationMap extends Defa
         } else {
             super.put(key, value, timeoutMillis);
         }
+        log.trace("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis);
     }
+
+    @Override
+    public ReplyHandler remove(String id) {
+        ReplyHandler answer = super.remove(id);
+        log.trace("Removed correlationID: {} -> {}", id, answer != null);
+        return answer;
+    }
+
 }

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
------------------------------------------------------------------------------
    svn:eol-style = native

Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
------------------------------------------------------------------------------
    svn:keywords = Rev Date

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java Sun Oct  9 11:57:29 2011
@@ -19,11 +19,15 @@ package org.apache.camel.component.jms.r
 import java.util.HashMap;
 import java.util.Map;
 
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
 /**
  * A creator which can build the JMS message selector query string to use
  * with a shared persistent reply-to queue, so we can select the correct messages we expect as replies.
  */
 public class MessageSelectorCreator {
+    protected static final Logger LOG = LoggerFactory.getLogger(MessageSelectorCreator.class);
     protected Map<String, String> correlationIds;
     protected boolean dirty = true;
     protected StringBuilder expression;
@@ -34,11 +38,13 @@ public class MessageSelectorCreator {
 
     public synchronized void addCorrelationID(String id) {
         correlationIds.put(id, id);
+        LOG.trace("Added correlationID: {}", id);
         dirty = true;
     }
 
     public synchronized void removeCorrelationID(String id) {
-        correlationIds.remove(id);
+        boolean answer = correlationIds.remove(id) != null;
+        LOG.trace("Removed correlationID: {} -> {}", id, answer);
         dirty = true;
     }
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Sun Oct  9 11:57:29 2011
@@ -38,19 +38,25 @@ public class PersistentQueueReplyHandler
 
     @Override
     public void onReply(String correlationId, Message reply) {
-        if (dynamicMessageSelector != null) {
-            // remove correlation id from message selector
-            dynamicMessageSelector.removeCorrelationID(correlationId);
+        try {
+            if (dynamicMessageSelector != null) {
+                // remove correlation id from message selector
+                dynamicMessageSelector.removeCorrelationID(correlationId);
+            }
+        } finally {
+            super.onReply(correlationId, reply);
         }
-        super.onReply(correlationId, reply);
     }
 
     @Override
     public void onTimeout(String correlationId) {
-        if (dynamicMessageSelector != null) {
-            // remove correlation id from message selector
-            dynamicMessageSelector.removeCorrelationID(correlationId);
+        try {
+            if (dynamicMessageSelector != null) {
+                // remove correlation id from message selector
+                dynamicMessageSelector.removeCorrelationID(correlationId);
+            }
+        } finally {
+            super.onTimeout(correlationId);
         }
-        super.onTimeout(correlationId);
     }
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Sun Oct  9 11:57:29 2011
@@ -81,10 +81,6 @@ public class PersistentQueueReplyManager
             try {
                 handler.onReply(correlationID, message);
             } finally {
-                if (dynamicMessageSelector != null) {
-                    // also remember to keep the dynamic selector updated with the new correlation id
-                    dynamicMessageSelector.removeCorrelationID(correlationID);
-                }
                 correlation.remove(correlationID);
             }
         } else {

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Sun Oct  9 11:57:29 2011
@@ -50,7 +50,7 @@ public abstract class ReplyManagerSuppor
     protected AbstractMessageListenerContainer listenerContainer;
     protected final CountDownLatch replyToLatch = new CountDownLatch(1);
     protected final long replyToTimeout = 10000;
-    protected CorrelationMap correlation;
+    protected CorrelationTimeoutMap correlation;
 
     public void setScheduledExecutorService(ScheduledExecutorService executorService) {
         this.executorService = executorService;
@@ -108,38 +108,40 @@ public abstract class ReplyManagerSuppor
 
     public void processReply(ReplyHolder holder) {
         if (holder != null && isRunAllowed()) {
-            Exchange exchange = holder.getExchange();
-            Message message = holder.getMessage();
-
-            boolean timeout = holder.isTimeout();
-            if (timeout) {
-                // no response, so lets set a timed out exception
-                exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
-            } else {
-                JmsMessage response = new JmsMessage(message, endpoint.getBinding());
-                Object body = response.getBody();
+            try {
+                Exchange exchange = holder.getExchange();
+                Message message = holder.getMessage();
 
-                if (endpoint.isTransferException() && body instanceof Exception) {
-                    log.debug("Reply received. Setting reply as an Exception: {}", body);
-                    // we got an exception back and endpoint was configured to transfer exception
-                    // therefore set response as exception
-                    exchange.setException((Exception) body);
+                boolean timeout = holder.isTimeout();
+                if (timeout) {
+                    // no response, so lets set a timed out exception
+                    exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
                 } else {
-                    log.debug("Reply received. Setting reply as OUT message: {}", body);
-                    // regular response
-                    exchange.setOut(response);
-                }
+                    JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+                    Object body = response.getBody();
 
-                // restore correlation id in case the remote server messed with it
-                if (holder.getOriginalCorrelationId() != null) {
-                    JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
-                    exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
+                    if (endpoint.isTransferException() && body instanceof Exception) {
+                        log.debug("Reply received. Setting reply as an Exception: {}", body);
+                        // we got an exception back and endpoint was configured to transfer exception
+                        // therefore set response as exception
+                        exchange.setException((Exception) body);
+                    } else {
+                        log.debug("Reply received. Setting reply as OUT message: {}", body);
+                        // regular response
+                        exchange.setOut(response);
+                    }
+
+                    // restore correlation id in case the remote server messed with it
+                    if (holder.getOriginalCorrelationId() != null) {
+                        JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
+                        exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
+                    }
                 }
+            } finally {
+                // notify callback
+                AsyncCallback callback = holder.getCallback();
+                callback.done(false);
             }
-
-            // notify callback
-            AsyncCallback callback = holder.getCallback();
-            callback.done(false);
         }
     }
 
@@ -196,7 +198,7 @@ public abstract class ReplyManagerSuppor
         ObjectHelper.notNull(endpoint, "endpoint", this);
 
         // purge for timeout every second
-        correlation = new CorrelationMap(executorService, 1000);
+        correlation = new CorrelationTimeoutMap(executorService, 1000);
         ServiceHelper.startService(correlation);
 
         // create JMS listener and start it

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java Sun Oct  9 11:57:29 2011
@@ -22,6 +22,8 @@ import javax.jms.Message;
 import javax.jms.Session;
 
 import org.apache.camel.component.jms.MessageSentCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
 
 /**
  * Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>.
@@ -33,6 +35,7 @@ import org.apache.camel.component.jms.Me
  */
 public class UseMessageIdAsCorrelationIdMessageSentCallback implements MessageSentCallback {
 
+    private static final Logger LOG = LoggerFactory.getLogger(MessageSelectorCreator.class);
     private ReplyManager replyManager;
     private String correlationId;
     private long requestTimeout;
@@ -49,6 +52,7 @@ public class UseMessageIdAsCorrelationId
             newCorrelationID = message.getJMSMessageID();
         } catch (JMSException e) {
             // ignore
+            LOG.warn("Cannot get JMSMessageID from message: " + message, e);
         }
         if (newCorrelationID != null) {
             replyManager.updateCorrelationId(correlationId, newCorrelationID, requestTimeout);