You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/10/09 13:57:30 UTC
svn commit: r1180587 -
/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/
Author: davsclaus
Date: Sun Oct 9 11:57:29 2011
New Revision: 1180587
URL: http://svn.apache.org/viewvc?rev=1180587&view=rev
Log:
CAMEL-4529: Better trace logging for correlationID add/remove to track if something is wrong with that.
Added:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java (contents, props changed)
- copied, changed from r1180554, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
Removed:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
Modified:
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
Copied: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java (from r1180554, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java)
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java?p2=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java&p1=camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java&r1=1180554&r2=1180587&rev=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java Sun Oct 9 11:57:29 2011
@@ -21,11 +21,15 @@ import java.util.concurrent.ScheduledExe
import org.apache.camel.support.DefaultTimeoutMap;
/**
+ * A {@link org.apache.camel.TimeoutMap} which is used to track reply messages which
+ * has been timed out, and thus should trigger the waiting {@link org.apache.camel.Exchange} to
+ * timeout as well.
+ *
* @version
*/
-public class CorrelationMap extends DefaultTimeoutMap<String, ReplyHandler> {
+public class CorrelationTimeoutMap extends DefaultTimeoutMap<String, ReplyHandler> {
- public CorrelationMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
+ public CorrelationTimeoutMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
super(executor, requestMapPollTimeMillis);
}
@@ -33,6 +37,7 @@ public class CorrelationMap extends Defa
// trigger timeout
value.onTimeout(key);
// return true to remove the element
+ log.trace("Evicted correlationID: {}", key);
return true;
}
@@ -44,5 +49,14 @@ public class CorrelationMap extends Defa
} else {
super.put(key, value, timeoutMillis);
}
+ log.trace("Added correlationID: {} to timeout after: {} millis", key, timeoutMillis);
}
+
+ @Override
+ public ReplyHandler remove(String id) {
+ ReplyHandler answer = super.remove(id);
+ log.trace("Removed correlationID: {} -> {}", id, answer != null);
+ return answer;
+ }
+
}
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
------------------------------------------------------------------------------
svn:eol-style = native
Propchange: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationTimeoutMap.java
------------------------------------------------------------------------------
svn:keywords = Rev Date
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java Sun Oct 9 11:57:29 2011
@@ -19,11 +19,15 @@ package org.apache.camel.component.jms.r
import java.util.HashMap;
import java.util.Map;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
/**
* A creator which can build the JMS message selector query string to use
* with a shared persistent reply-to queue, so we can select the correct messages we expect as replies.
*/
public class MessageSelectorCreator {
+ protected static final Logger LOG = LoggerFactory.getLogger(MessageSelectorCreator.class);
protected Map<String, String> correlationIds;
protected boolean dirty = true;
protected StringBuilder expression;
@@ -34,11 +38,13 @@ public class MessageSelectorCreator {
public synchronized void addCorrelationID(String id) {
correlationIds.put(id, id);
+ LOG.trace("Added correlationID: {}", id);
dirty = true;
}
public synchronized void removeCorrelationID(String id) {
- correlationIds.remove(id);
+ boolean answer = correlationIds.remove(id) != null;
+ LOG.trace("Removed correlationID: {} -> {}", id, answer);
dirty = true;
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Sun Oct 9 11:57:29 2011
@@ -38,19 +38,25 @@ public class PersistentQueueReplyHandler
@Override
public void onReply(String correlationId, Message reply) {
- if (dynamicMessageSelector != null) {
- // remove correlation id from message selector
- dynamicMessageSelector.removeCorrelationID(correlationId);
+ try {
+ if (dynamicMessageSelector != null) {
+ // remove correlation id from message selector
+ dynamicMessageSelector.removeCorrelationID(correlationId);
+ }
+ } finally {
+ super.onReply(correlationId, reply);
}
- super.onReply(correlationId, reply);
}
@Override
public void onTimeout(String correlationId) {
- if (dynamicMessageSelector != null) {
- // remove correlation id from message selector
- dynamicMessageSelector.removeCorrelationID(correlationId);
+ try {
+ if (dynamicMessageSelector != null) {
+ // remove correlation id from message selector
+ dynamicMessageSelector.removeCorrelationID(correlationId);
+ }
+ } finally {
+ super.onTimeout(correlationId);
}
- super.onTimeout(correlationId);
}
}
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Sun Oct 9 11:57:29 2011
@@ -81,10 +81,6 @@ public class PersistentQueueReplyManager
try {
handler.onReply(correlationID, message);
} finally {
- if (dynamicMessageSelector != null) {
- // also remember to keep the dynamic selector updated with the new correlation id
- dynamicMessageSelector.removeCorrelationID(correlationID);
- }
correlation.remove(correlationID);
}
} else {
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/ReplyManagerSupport.java Sun Oct 9 11:57:29 2011
@@ -50,7 +50,7 @@ public abstract class ReplyManagerSuppor
protected AbstractMessageListenerContainer listenerContainer;
protected final CountDownLatch replyToLatch = new CountDownLatch(1);
protected final long replyToTimeout = 10000;
- protected CorrelationMap correlation;
+ protected CorrelationTimeoutMap correlation;
public void setScheduledExecutorService(ScheduledExecutorService executorService) {
this.executorService = executorService;
@@ -108,38 +108,40 @@ public abstract class ReplyManagerSuppor
public void processReply(ReplyHolder holder) {
if (holder != null && isRunAllowed()) {
- Exchange exchange = holder.getExchange();
- Message message = holder.getMessage();
-
- boolean timeout = holder.isTimeout();
- if (timeout) {
- // no response, so lets set a timed out exception
- exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
- } else {
- JmsMessage response = new JmsMessage(message, endpoint.getBinding());
- Object body = response.getBody();
+ try {
+ Exchange exchange = holder.getExchange();
+ Message message = holder.getMessage();
- if (endpoint.isTransferException() && body instanceof Exception) {
- log.debug("Reply received. Setting reply as an Exception: {}", body);
- // we got an exception back and endpoint was configured to transfer exception
- // therefore set response as exception
- exchange.setException((Exception) body);
+ boolean timeout = holder.isTimeout();
+ if (timeout) {
+ // no response, so lets set a timed out exception
+ exchange.setException(new ExchangeTimedOutException(exchange, holder.getRequestTimeout()));
} else {
- log.debug("Reply received. Setting reply as OUT message: {}", body);
- // regular response
- exchange.setOut(response);
- }
+ JmsMessage response = new JmsMessage(message, endpoint.getBinding());
+ Object body = response.getBody();
- // restore correlation id in case the remote server messed with it
- if (holder.getOriginalCorrelationId() != null) {
- JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
- exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
+ if (endpoint.isTransferException() && body instanceof Exception) {
+ log.debug("Reply received. Setting reply as an Exception: {}", body);
+ // we got an exception back and endpoint was configured to transfer exception
+ // therefore set response as exception
+ exchange.setException((Exception) body);
+ } else {
+ log.debug("Reply received. Setting reply as OUT message: {}", body);
+ // regular response
+ exchange.setOut(response);
+ }
+
+ // restore correlation id in case the remote server messed with it
+ if (holder.getOriginalCorrelationId() != null) {
+ JmsMessageHelper.setCorrelationId(message, holder.getOriginalCorrelationId());
+ exchange.getOut().setHeader("JMSCorrelationID", holder.getOriginalCorrelationId());
+ }
}
+ } finally {
+ // notify callback
+ AsyncCallback callback = holder.getCallback();
+ callback.done(false);
}
-
- // notify callback
- AsyncCallback callback = holder.getCallback();
- callback.done(false);
}
}
@@ -196,7 +198,7 @@ public abstract class ReplyManagerSuppor
ObjectHelper.notNull(endpoint, "endpoint", this);
// purge for timeout every second
- correlation = new CorrelationMap(executorService, 1000);
+ correlation = new CorrelationTimeoutMap(executorService, 1000);
ServiceHelper.startService(correlation);
// create JMS listener and start it
Modified: camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java
URL: http://svn.apache.org/viewvc/camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java?rev=1180587&r1=1180586&r2=1180587&view=diff
==============================================================================
--- camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java (original)
+++ camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/UseMessageIdAsCorrelationIdMessageSentCallback.java Sun Oct 9 11:57:29 2011
@@ -22,6 +22,8 @@ import javax.jms.Message;
import javax.jms.Session;
import org.apache.camel.component.jms.MessageSentCallback;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* Callback to be used when using the option <tt>useMessageIDAsCorrelationID</tt>.
@@ -33,6 +35,7 @@ import org.apache.camel.component.jms.Me
*/
public class UseMessageIdAsCorrelationIdMessageSentCallback implements MessageSentCallback {
+ private static final Logger LOG = LoggerFactory.getLogger(MessageSelectorCreator.class);
private ReplyManager replyManager;
private String correlationId;
private long requestTimeout;
@@ -49,6 +52,7 @@ public class UseMessageIdAsCorrelationId
newCorrelationID = message.getJMSMessageID();
} catch (JMSException e) {
// ignore
+ LOG.warn("Cannot get JMSMessageID from message: " + message, e);
}
if (newCorrelationID != null) {
replyManager.updateCorrelationId(correlationId, newCorrelationID, requestTimeout);