You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/20 08:42:18 UTC

svn commit: r1221129 - in /camel/trunk: camel-core/src/main/java/org/apache/camel/ components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/

Author: davsclaus
Date: Tue Dec 20 07:42:17 2011
New Revision: 1221129

URL: http://svn.apache.org/viewvc?rev=1221129&view=rev
Log:
CAMEL-4800: request/reply over JMS include details about correlationID if timeout occurs. Reduce noise in log to only log a WARN and no stacktrace etc.

Removed:
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UnknownReplyMessageException.java
Modified:
    camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
    camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java

Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java Tue Dec 20 07:42:17 2011
@@ -31,6 +31,11 @@ public class ExchangeTimedOutException e
         this.timeout = timeout;
     }
 
+    public ExchangeTimedOutException(Exchange exchange, long timeout, String message) {
+        super("The OUT message was not received within: " + timeout + " millis due " + message, exchange);
+        this.timeout = timeout;
+    }
+
     /**
      * Return the timeout which expired in milliseconds
      */

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Tue Dec 20 07:42:17 2011
@@ -26,12 +26,9 @@ import org.apache.camel.Exchange;
  */
 public class PersistentQueueReplyHandler extends TemporaryQueueReplyHandler {
 
-    private MessageSelectorCreator dynamicMessageSelector;
-
     public PersistentQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
-                                       String originalCorrelationId, long timeout, MessageSelectorCreator dynamicMessageSelector) {
-        super(replyManager, exchange, callback, originalCorrelationId, timeout);
-        this.dynamicMessageSelector = dynamicMessageSelector;
+                                       String originalCorrelationId, String correlationId, long timeout) {
+        super(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
     }
 
 }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Tue Dec 20 07:42:17 2011
@@ -44,7 +44,7 @@ public class PersistentQueueReplyManager
                                 String originalCorrelationId, String correlationId, long requestTimeout) {
         // add to correlation map
         PersistentQueueReplyHandler handler = new PersistentQueueReplyHandler(replyManager, exchange, callback,
-                originalCorrelationId, requestTimeout, dynamicMessageSelector);
+                originalCorrelationId, correlationId, requestTimeout);
         correlation.put(correlationId, handler, requestTimeout);
         return correlationId;
     }
@@ -76,9 +76,9 @@ public class PersistentQueueReplyManager
         } else {
             // we could not correlate the received reply message to a matching request and therefore
             // we cannot continue routing the unknown message
-            String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
+            String text = "Reply received for unknown correlationID [" + correlationID + "]. The message will be ignored: " + message;
+            // log a warn and then ignore the message
             log.warn(text);
-            throw new UnknownReplyMessageException(text, message, correlationID);
         }
     }
 

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java Tue Dec 20 07:42:17 2011
@@ -33,23 +33,27 @@ public class ReplyHolder {
     private final AsyncCallback callback;
     private final Message message;
     private final String originalCorrelationId;
+    private final String correlationId;
     private long timeout;
 
     /**
      * Constructor to use when a reply message was received
      */
-    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, Message message) {
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+                       String correlationId, Message message) {
         this.exchange = exchange;
         this.callback = callback;
         this.originalCorrelationId = originalCorrelationId;
+        this.correlationId = correlationId;
         this.message = message;
     }
 
     /**
      * Constructor to use when a timeout occurred
      */
-    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, long timeout) {
-        this(exchange, callback, originalCorrelationId, null);
+    public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+                       String correlationId, long timeout) {
+        this(exchange, callback, originalCorrelationId, correlationId, null);
         this.timeout = timeout;
     }
 
@@ -72,6 +76,15 @@ public class ReplyHolder {
     }
 
     /**
+     * Gets the correlation id
+     *
+     * @see #getOriginalCorrelationId()
+     */
+    public String getCorrelationId() {
+        return correlationId;
+    }
+
+    /**
      * Gets the received message
      *
      * @return  the received message, or <tt>null</tt> if timeout occurred and no message has been received

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Tue Dec 20 07:42:17 2011
@@ -100,7 +100,7 @@ public abstract class ReplyManagerSuppor
             return;
         }
 
-        log.debug("Received reply message with correlationID: {} -> {}", correlationID, message);
+        log.debug("Received reply message with correlationID [{}] -> {}", correlationID, message);
 
         // handle the reply message
         handleReplyMessage(correlationID, message);
@@ -114,8 +114,14 @@ public abstract class ReplyManagerSuppor
 
                 boolean timeout = holder.isTimeout();
                 if (timeout) {
+                    // timeout occurred do a WARN log so its easier to spot in the logs
+                    log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}]."
+                            + " Setting ExchangeTimedOutException on ExchangeId: {} and continue routing.",
+                            new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), exchange.getExchangeId()});
+
                     // no response, so lets set a timed out exception
-                    exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
+                    String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received";
+                    exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
                 } else {
                     JmsMessage response = new JmsMessage(message, endpoint.getBinding());
                     Object body = response.getBody();

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java Tue Dec 20 07:42:17 2011
@@ -34,27 +34,29 @@ public class TemporaryQueueReplyHandler 
     protected final AsyncCallback callback;
     // remember the original correlation id, in case the server returns back a reply with a messed up correlation id
     protected final String originalCorrelationId;
+    protected final String correlationId;
     protected final long timeout;
 
     public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
-                                      String originalCorrelationId, long timeout) {
+                                      String originalCorrelationId, String correlationId, long timeout) {
         this.replyManager = replyManager;
         this.exchange = exchange;
         this.originalCorrelationId = originalCorrelationId;
+        this.correlationId = correlationId;
         this.callback = callback;
         this.timeout = timeout;
     }
 
     public void onReply(String correlationId, Message reply) {
         // create holder object with the the reply
-        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, reply);
+        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, reply);
         // process the reply
         replyManager.processReply(holder);
     }
 
     public void onTimeout(String correlationId) {
         // create holder object without the reply which means a timeout occurred
-        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, timeout);
+        ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, timeout);
         // process timeout
         replyManager.processReply(holder);
     }

Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Tue Dec 20 07:42:17 2011
@@ -38,7 +38,7 @@ public class TemporaryQueueReplyManager 
     public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
                                 String originalCorrelationId, String correlationId, long requestTimeout) {
         // add to correlation map
-        TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, requestTimeout);
+        TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
         correlation.put(correlationId, handler, requestTimeout);
         return correlationId;
     }
@@ -66,9 +66,9 @@ public class TemporaryQueueReplyManager 
         } else {
             // we could not correlate the received reply message to a matching request and therefore
             // we cannot continue routing the unknown message
-            String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
+            String text = "Reply received for unknown correlationID [" + correlationID + "]. The message will be ignored: " + message;
+            // log a warn and then ignore the message
             log.warn(text);
-            throw new UnknownReplyMessageException(text, message, correlationID);
         }
     }