You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/09 15:36:23 UTC
svn commit: r1199774 - in /camel/branches/camel-2.7.x: ./
components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/
Author: davsclaus
Date: Wed Nov 9 14:36:23 2011
New Revision: 1199774
URL: http://svn.apache.org/viewvc?rev=1199774&view=rev
Log:
CAMEL-2740: Fixed memory leak when doing request/reply over JMS with a fixed replyTo queue. Due correlationIDs in JMSMessage Selector not in sync with current active correlation ids.
Added:
camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationListener.java
- copied unchanged from r1199766, camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationListener.java
Modified:
camel/branches/camel-2.7.x/ (props changed)
camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov 9 14:36:23 2011
@@ -1,2 +1,2 @@
-/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732
-/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703
+/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766
+/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739
Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Nov 9 14:36:23 2011
@@ -1 +1 @@
-/camel/branches/camel-2.8.x:1-1146127,1146608,1146653,1146771,1146903,1147216,1170965-1171083,1171085-1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732
+/camel/branches/camel-2.8.x:1-1146127,1146608,1146653,1146771,1146903,1147216,1170965-1171083,1171085-1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766
Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java Wed Nov 9 14:36:23 2011
@@ -21,15 +21,30 @@ import java.util.concurrent.ScheduledExe
import org.apache.camel.util.DefaultTimeoutMap;
/**
- * @version
+ * @version
*/
public class CorrelationMap extends DefaultTimeoutMap<String, ReplyHandler> {
+ private CorrelationListener listener;
+
public CorrelationMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
super(executor, requestMapPollTimeMillis);
}
+ public void setListener(CorrelationListener listener) {
+ // there is only one listener needed
+ this.listener = listener;
+ }
+
public boolean onEviction(String key, ReplyHandler value) {
+ try {
+ if (listener != null) {
+ listener.onEviction(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
// trigger timeout
value.onTimeout(key);
// return true to remove the element
@@ -38,6 +53,14 @@ public class CorrelationMap extends Defa
@Override
public void put(String key, ReplyHandler value, long timeoutMillis) {
+ try {
+ if (listener != null) {
+ listener.onPut(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
if (timeoutMillis <= 0) {
// no timeout (must use Integer.MAX_VALUE)
super.put(key, value, Integer.MAX_VALUE);
@@ -45,4 +68,19 @@ public class CorrelationMap extends Defa
super.put(key, value, timeoutMillis);
}
}
+
+ @Override
+ public ReplyHandler remove(String key) {
+ try {
+ if (listener != null) {
+ listener.onRemove(key);
+ }
+ } catch (Throwable e) {
+ // ignore
+ }
+
+ ReplyHandler answer = super.remove(key);
+ return answer;
+ }
+
}
Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java Wed Nov 9 14:36:23 2011
@@ -16,30 +16,25 @@
*/
package org.apache.camel.component.jms.reply;
-import java.util.HashMap;
-import java.util.Map;
+import java.util.LinkedHashSet;
+import java.util.Set;
/**
* A creator which can build the JMS message selector query string to use
* with a shared persistent reply-to queue, so we can select the correct messages we expect as replies.
*/
-public class MessageSelectorCreator {
- protected Map<String, String> correlationIds;
+public class MessageSelectorCreator implements CorrelationListener {
+ protected final CorrelationMap timeoutMap;
+ protected final Set<String> correlationIds;
protected boolean dirty = true;
protected StringBuilder expression;
- public MessageSelectorCreator() {
- correlationIds = new HashMap<String, String>();
- }
-
- public synchronized void addCorrelationID(String id) {
- correlationIds.put(id, id);
- dirty = true;
- }
-
- public synchronized void removeCorrelationID(String id) {
- correlationIds.remove(id);
- dirty = true;
+ public MessageSelectorCreator(CorrelationMap timeoutMap) {
+ this.timeoutMap = timeoutMap;
+ this.timeoutMap.setListener(this);
+ // create local set of correlation ids, as its easier to keep track
+ // using the listener so we can flag the dirty flag upon changes
+ this.correlationIds = new LinkedHashSet<String>();
}
public synchronized String get() {
@@ -49,16 +44,16 @@ public class MessageSelectorCreator {
expression = new StringBuilder("JMSCorrelationID='");
- if (correlationIds.isEmpty()) {
+ if (correlationIds.size() == 0) {
// no id's so use a dummy to select nothing
expression.append("CamelDummyJmsMessageSelector'");
} else {
boolean first = true;
- for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
+ for (String value : correlationIds) {
if (!first) {
expression.append(" OR JMSCorrelationID='");
}
- expression.append(entry.getValue()).append("'");
+ expression.append(value).append("'");
if (first) {
first = false;
}
@@ -69,4 +64,18 @@ public class MessageSelectorCreator {
return expression.toString();
}
+ public void onPut(String key) {
+ dirty = true;
+ correlationIds.add(key);
+ }
+
+ public void onRemove(String key) {
+ dirty = true;
+ correlationIds.remove(key);
+ }
+
+ public void onEviction(String key) {
+ dirty = true;
+ correlationIds.remove(key);
+ }
}
\ No newline at end of file
Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Wed Nov 9 14:36:23 2011
@@ -16,8 +16,6 @@
*/
package org.apache.camel.component.jms.reply;
-import javax.jms.Message;
-
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -36,21 +34,4 @@ public class PersistentQueueReplyHandler
this.dynamicMessageSelector = dynamicMessageSelector;
}
- @Override
- public void onReply(String correlationId, Message reply) {
- if (dynamicMessageSelector != null) {
- // remove correlation id from message selector
- dynamicMessageSelector.removeCorrelationID(correlationId);
- }
- super.onReply(correlationId, reply);
- }
-
- @Override
- public void onTimeout(String correlationId) {
- if (dynamicMessageSelector != null) {
- // remove correlation id from message selector
- dynamicMessageSelector.removeCorrelationID(correlationId);
- }
- super.onTimeout(correlationId);
- }
}
Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Wed Nov 9 14:36:23 2011
@@ -45,10 +45,6 @@ public class PersistentQueueReplyManager
PersistentQueueReplyHandler handler = new PersistentQueueReplyHandler(replyManager, exchange, callback,
originalCorrelationId, requestTimeout, dynamicMessageSelector);
correlation.put(correlationId, handler, requestTimeout);
- if (dynamicMessageSelector != null) {
- // also remember to keep the dynamic selector updated with the new correlation id
- dynamicMessageSelector.addCorrelationID(correlationId);
- }
return correlationId;
}
@@ -64,14 +60,6 @@ public class PersistentQueueReplyManager
}
correlation.put(newCorrelationId, handler, requestTimeout);
-
- // no not arrived early
- if (dynamicMessageSelector != null) {
- // also remember to keep the dynamic selector updated with the new correlation id
- // at first removing the old correlationID and then add the new correlationID
- dynamicMessageSelector.removeCorrelationID(correlationId);
- dynamicMessageSelector.addCorrelationID(newCorrelationId);
- }
}
protected void handleReplyMessage(String correlationID, Message message) {
@@ -84,10 +72,6 @@ public class PersistentQueueReplyManager
try {
handler.onReply(correlationID, message);
} finally {
- if (dynamicMessageSelector != null) {
- // also remember to keep the dynamic selector updated with the new correlation id
- dynamicMessageSelector.removeCorrelationID(correlationID);
- }
correlation.remove(correlationID);
}
} else {
@@ -167,7 +151,7 @@ public class PersistentQueueReplyManager
answer = new PersistentQueueMessageListenerContainer(fixedMessageSelector);
} else {
// use a dynamic message selector which will select the message we want to receive as reply
- dynamicMessageSelector = new MessageSelectorCreator();
+ dynamicMessageSelector = new MessageSelectorCreator(correlation);
answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
}