You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/12/20 08:42:18 UTC
svn commit: r1221129 - in /camel/trunk:
camel-core/src/main/java/org/apache/camel/
components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/
Author: davsclaus
Date: Tue Dec 20 07:42:17 2011
New Revision: 1221129
URL: http://svn.apache.org/viewvc?rev=1221129&view=rev
Log:
CAMEL-4800: request/reply over JMS include details about correlationID if timeout occurs. Reduce noise in log to only log a WARN and no stacktrace etc.
Removed:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UnknownReplyMessageException.java
Modified:
camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
Modified: camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java
URL: http://svn.apache.org/viewvc/camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java (original)
+++ camel/trunk/camel-core/src/main/java/org/apache/camel/ExchangeTimedOutException.java Tue Dec 20 07:42:17 2011
@@ -31,6 +31,11 @@ public class ExchangeTimedOutException e
this.timeout = timeout;
}
+ public ExchangeTimedOutException(Exchange exchange, long timeout, String message) {
+ super("The OUT message was not received within: " + timeout + " millis due " + message, exchange);
+ this.timeout = timeout;
+ }
+
/**
* Return the timeout which expired in milliseconds
*/
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Tue Dec 20 07:42:17 2011
@@ -26,12 +26,9 @@ import org.apache.camel.Exchange;
*/
public class PersistentQueueReplyHandler extends TemporaryQueueReplyHandler {
- private MessageSelectorCreator dynamicMessageSelector;
-
public PersistentQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, long timeout, MessageSelectorCreator dynamicMessageSelector) {
- super(replyManager, exchange, callback, originalCorrelationId, timeout);
- this.dynamicMessageSelector = dynamicMessageSelector;
+ String originalCorrelationId, String correlationId, long timeout) {
+ super(replyManager, exchange, callback, originalCorrelationId, correlationId, timeout);
}
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Tue Dec 20 07:42:17 2011
@@ -44,7 +44,7 @@ public class PersistentQueueReplyManager
String originalCorrelationId, String correlationId, long requestTimeout) {
// add to correlation map
PersistentQueueReplyHandler handler = new PersistentQueueReplyHandler(replyManager, exchange, callback,
- originalCorrelationId, requestTimeout, dynamicMessageSelector);
+ originalCorrelationId, correlationId, requestTimeout);
correlation.put(correlationId, handler, requestTimeout);
return correlationId;
}
@@ -76,9 +76,9 @@ public class PersistentQueueReplyManager
} else {
// we could not correlate the received reply message to a matching request and therefore
// we cannot continue routing the unknown message
- String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
+ String text = "Reply received for unknown correlationID [" + correlationID + "]. The message will be ignored: " + message;
+ // log a warn and then ignore the message
log.warn(text);
- throw new UnknownReplyMessageException(text, message, correlationID);
}
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyHolder.java Tue Dec 20 07:42:17 2011
@@ -33,23 +33,27 @@ public class ReplyHolder {
private final AsyncCallback callback;
private final Message message;
private final String originalCorrelationId;
+ private final String correlationId;
private long timeout;
/**
* Constructor to use when a reply message was received
*/
- public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, Message message) {
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+ String correlationId, Message message) {
this.exchange = exchange;
this.callback = callback;
this.originalCorrelationId = originalCorrelationId;
+ this.correlationId = correlationId;
this.message = message;
}
/**
* Constructor to use when a timeout occurred
*/
- public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId, long timeout) {
- this(exchange, callback, originalCorrelationId, null);
+ public ReplyHolder(Exchange exchange, AsyncCallback callback, String originalCorrelationId,
+ String correlationId, long timeout) {
+ this(exchange, callback, originalCorrelationId, correlationId, null);
this.timeout = timeout;
}
@@ -72,6 +76,15 @@ public class ReplyHolder {
}
/**
+ * Gets the correlation id
+ *
+ * @see #getOriginalCorrelationId()
+ */
+ public String getCorrelationId() {
+ return correlationId;
+ }
+
+ /**
* Gets the received message
*
* @return the received message, or <tt>null</tt> if timeout occurred and no message has been received
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Tue Dec 20 07:42:17 2011
@@ -100,7 +100,7 @@ public abstract class ReplyManagerSuppor
return;
}
- log.debug("Received reply message with correlationID: {} -> {}", correlationID, message);
+ log.debug("Received reply message with correlationID [{}] -> {}", correlationID, message);
// handle the reply message
handleReplyMessage(correlationID, message);
@@ -114,8 +114,14 @@ public abstract class ReplyManagerSuppor
boolean timeout = holder.isTimeout();
if (timeout) {
+ // timeout occurred do a WARN log so its easier to spot in the logs
+ log.warn("Timeout occurred after {} millis waiting for reply message with correlationID [{}]."
+ + " Setting ExchangeTimedOutException on ExchangeId: {} and continue routing.",
+ new Object[]{holder.getRequestTimeout(), holder.getCorrelationId(), exchange.getExchangeId()});
+
// no response, so lets set a timed out exception
- exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
+ String msg = "reply message with correlationID: " + holder.getCorrelationId() + " not received";
+ exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout(), msg));
} else {
JmsMessage response = new JmsMessage(message, endpoint.getBinding());
Object body = response.getBody();
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyHandler.java Tue Dec 20 07:42:17 2011
@@ -34,27 +34,29 @@ public class TemporaryQueueReplyHandler
protected final AsyncCallback callback;
// remember the original correlation id, in case the server returns back a reply with a messed up correlation id
protected final String originalCorrelationId;
+ protected final String correlationId;
protected final long timeout;
public TemporaryQueueReplyHandler(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
- String originalCorrelationId, long timeout) {
+ String originalCorrelationId, String correlationId, long timeout) {
this.replyManager = replyManager;
this.exchange = exchange;
this.originalCorrelationId = originalCorrelationId;
+ this.correlationId = correlationId;
this.callback = callback;
this.timeout = timeout;
}
public void onReply(String correlationId, Message reply) {
// create holder object with the the reply
- ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, reply);
+ ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, reply);
// process the reply
replyManager.processReply(holder);
}
public void onTimeout(String correlationId) {
// create holder object without the reply which means a timeout occurred
- ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, timeout);
+ ReplyHolder holder = new ReplyHolder(exchange, callback, originalCorrelationId, correlationId, timeout);
// process timeout
replyManager.processReply(holder);
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java?rev=1221129&r1=1221128&r2=1221129&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/TemporaryQueueReplyManager.java Tue Dec 20 07:42:17 2011
@@ -38,7 +38,7 @@ public class TemporaryQueueReplyManager
public String registerReply(ReplyManager replyManager, Exchange exchange, AsyncCallback callback,
String originalCorrelationId, String correlationId, long requestTimeout) {
// add to correlation map
- TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, requestTimeout);
+ TemporaryQueueReplyHandler handler = new TemporaryQueueReplyHandler(this, exchange, callback, originalCorrelationId, correlationId, requestTimeout);
correlation.put(correlationId, handler, requestTimeout);
return correlationId;
}
@@ -66,9 +66,9 @@ public class TemporaryQueueReplyManager
} else {
// we could not correlate the received reply message to a matching request and therefore
// we cannot continue routing the unknown message
- String text = "Reply received for unknown correlationID [" + correlationID + "] -> " + message;
+ String text = "Reply received for unknown correlationID [" + correlationID + "]. The message will be ignored: " + message;
+ // log a warn and then ignore the message
log.warn(text);
- throw new UnknownReplyMessageException(text, message, correlationID);
}
}