You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/09 15:15:07 UTC
svn commit: r1199766 - in /camel/branches/camel-2.8.x: ./
components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/
Author: davsclaus
Date: Wed Nov 9 14:15:06 2011
New Revision: 1199766
URL: http://svn.apache.org/viewvc?rev=1199766&view=rev
Log:
CAMEL-2740: Fixed memory leak when doing request/reply over JMS with a fixed replyTo queue. Due correlationIDs in JMSMessage Selector not in sync with current active correlation ids.
Added:
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationListener.java
- copied unchanged from r1199739, camel/trunk/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationListener.java
Modified:
camel/branches/camel-2.8.x/ (props changed)
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 9 14:15:06 2011
@@ -1 +1 @@
-/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703
+/camel/trunk:1186106,1186625,1186772,1187221,1187485,1187882,1187893,1188070-1188085,1188642,1188674,1188879,1188881,1189139,1189600,1189681,1189693,1189737,1190212-1190213,1190246,1190303,1195317,1195616,1196210,1197450,1197933,1197948,1198199,1198338,1198340,1199123,1199137,1199654,1199683,1199703,1199739
Propchange: camel/branches/camel-2.8.x/
------------------------------------------------------------------------------
Binary property 'svnmerge-integrated' - no diff available.
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java?rev=1199766&r1=1199765&r2=1199766&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java Wed Nov 9 14:15:06 2011
@@ -21,15 +21,30 @@ import java.util.concurrent.ScheduledExe
import org.apache.camel.util.DefaultTimeoutMap;
/**
- * @version
+ * @version
*/
public class CorrelationMap extends DefaultTimeoutMap<String, ReplyHandler> {
+ private CorrelationListener listener;
+
public CorrelationMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
super(executor, requestMapPollTimeMillis);
}
+ public void setListener(CorrelationListener listener) {
+ // there is only one listener needed
+ this.listener = listener;
+ }
+
public boolean onEviction(String key, ReplyHandler value) {
+ try {
+ if (listener != null) {
+ listener.onEviction(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
// trigger timeout
value.onTimeout(key);
// return true to remove the element
@@ -38,6 +53,14 @@ public class CorrelationMap extends Defa
@Override
public void put(String key, ReplyHandler value, long timeoutMillis) {
+ try {
+ if (listener != null) {
+ listener.onPut(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
if (timeoutMillis <= 0) {
// no timeout (must use Integer.MAX_VALUE)
super.put(key, value, Integer.MAX_VALUE);
@@ -45,4 +68,19 @@ public class CorrelationMap extends Defa
super.put(key, value, timeoutMillis);
}
}
+
+ @Override
+ public ReplyHandler remove(String key) {
+ try {
+ if (listener != null) {
+ listener.onRemove(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
+ ReplyHandler answer = super.remove(key);
+ return answer;
+ }
+
}
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java?rev=1199766&r1=1199765&r2=1199766&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java Wed Nov 9 14:15:06 2011
@@ -16,30 +16,25 @@
*/
package org.apache.camel.component.jms.reply;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.LinkedHashSet;
+import java.util.Set;
/**
* A creator which can build the JMS message selector query string to use
* with a shared persistent reply-to queue, so we can select the correct messages we expect as replies.
*/
-public class MessageSelectorCreator {
- protected Map<String, String> correlationIds;
+public class MessageSelectorCreator implements CorrelationListener {
+ protected final CorrelationMap timeoutMap;
+ protected final Set<String> correlationIds;
protected boolean dirty = true;
protected StringBuilder expression;
- public MessageSelectorCreator() {
- correlationIds = new HashMap<String, String>();
- }
-
- public synchronized void addCorrelationID(String id) {
- correlationIds.put(id, id);
- dirty = true;
- }
-
- public synchronized void removeCorrelationID(String id) {
- correlationIds.remove(id);
- dirty = true;
+ public MessageSelectorCreator(CorrelationMap timeoutMap) {
+ this.timeoutMap = timeoutMap;
+ this.timeoutMap.setListener(this);
+ // create local set of correlation ids, as its easier to keep track
+ // using the listener so we can flag the dirty flag upon changes
+ this.correlationIds = new LinkedHashSet<String>();
}
public synchronized String get() {
@@ -49,16 +44,16 @@ public class MessageSelectorCreator {
expression = new StringBuilder("JMSCorrelationID='");
- if (correlationIds.isEmpty()) {
+ if (correlationIds.size() == 0) {
// no id's so use a dummy to select nothing
expression.append("CamelDummyJmsMessageSelector'");
} else {
boolean first = true;
- for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
+ for (String value : correlationIds) {
if (!first) {
expression.append(" OR JMSCorrelationID='");
}
- expression.append(entry.getValue()).append("'");
+ expression.append(value).append("'");
if (first) {
first = false;
}
@@ -69,4 +64,18 @@ public class MessageSelectorCreator {
return expression.toString();
}
+ public void onPut(String key) {
+ dirty = true;
+ correlationIds.add(key);
+ }
+
+ public void onRemove(String key) {
+ dirty = true;
+ correlationIds.remove(key);
+ }
+
+ public void onEviction(String key) {
+ dirty = true;
+ correlationIds.remove(key);
+ }
}
\ No newline at end of file
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1199766&r1=1199765&r2=1199766&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Wed Nov 9 14:15:06 2011
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.jms.reply;
-import javax.jms.Message;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -36,21 +34,4 @@ public class PersistentQueueReplyHandler
this.dynamicMessageSelector = dynamicMessageSelector;
}
- @Override
- public void onReply(String correlationId, Message reply) {
- if (dynamicMessageSelector != null) {
- // remove correlation id from message selector
- dynamicMessageSelector.removeCorrelationID(correlationId);
- }
- super.onReply(correlationId, reply);
- }
-
- @Override
- public void onTimeout(String correlationId) {
- if (dynamicMessageSelector != null) {
- // remove correlation id from message selector
- dynamicMessageSelector.removeCorrelationID(correlationId);
- }
- super.onTimeout(correlationId);
- }
}
Modified: camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1199766&r1=1199765&r2=1199766&view=diff
==============================================================================
--- camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Wed Nov 9 14:15:06 2011
@@ -46,10 +46,6 @@ public class PersistentQueueReplyManager
PersistentQueueReplyHandler handler = new PersistentQueueReplyHandler(replyManager, exchange, callback,
originalCorrelationId, requestTimeout, dynamicMessageSelector);
correlation.put(correlationId, handler, requestTimeout);
- if (dynamicMessageSelector != null) {
- // also remember to keep the dynamic selector updated with the new correlation id
- dynamicMessageSelector.addCorrelationID(correlationId);
- }
return correlationId;
}
@@ -63,14 +59,6 @@ public class PersistentQueueReplyManager
}
correlation.put(newCorrelationId, handler, requestTimeout);
-
- // no not arrived early
- if (dynamicMessageSelector != null) {
- // also remember to keep the dynamic selector updated with the new correlation id
- // at first removing the old correlationID and then add the new correlationID
- dynamicMessageSelector.removeCorrelationID(correlationId);
- dynamicMessageSelector.addCorrelationID(newCorrelationId);
- }
}
protected void handleReplyMessage(String correlationID, Message message) {
@@ -83,10 +71,6 @@ public class PersistentQueueReplyManager
try {
handler.onReply(correlationID, message);
} finally {
- if (dynamicMessageSelector != null) {
- // also remember to keep the dynamic selector updated with the new correlation id
- dynamicMessageSelector.removeCorrelationID(correlationID);
- }
correlation.remove(correlationID);
}
} else {
@@ -147,7 +131,7 @@ public class PersistentQueueReplyManager
log.debug("Using shared queue: " + endpoint.getReplyTo() + " with fixed message selector [" + fixedMessageSelector + "] as reply listener: " + answer);
} else {
// use a dynamic message selector which will select the message we want to receive as reply
- dynamicMessageSelector = new MessageSelectorCreator();
+ dynamicMessageSelector = new MessageSelectorCreator(correlation);
answer = new SharedPersistentQueueMessageListenerContainer(dynamicMessageSelector);
log.debug("Using shared queue: " + endpoint.getReplyTo() + " with dynamic message selector as reply listener: " + answer);
}