You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2011/11/09 15:36:23 UTC

svn commit: r1199774 - in /camel/branches/camel-2.7.x: ./ components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/

Author: davsclaus
Date: Wed Nov  9 14:36:23 2011
New Revision: 1199774

URL: http://svn.apache.org/viewvc?rev=1199774&view=rev
Log:
CAMEL-2740: Fixed memory leak when doing request/reply over JMS with a fixed replyTo queue. Due correlationIDs in JMSMessage Selector not in sync with current active correlation ids.

Added:
    camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationListener.java
      - copied unchanged from r1199766, camel/branches/camel-2.8.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationListener.java
Modified:
    camel/branches/camel-2.7.x/   (props changed)
    camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
    camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
    camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
    camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svn:mergeinfo (original)
+++ svn:mergeinfo Wed Nov  9 14:36:23 2011
@@ -1,2 +1,2 @@
-/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732
-/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703
+/camel/branches/camel-2.8.x:1170965,1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766
+/camel/trunk:1146608,1146903,1147216,1170956,1171396,1174565,1175321,1176274,1176781-1176782,1177394,1177945,1177948,1180597,1187221,1189693,1199137,1199703,1199739

Propchange: camel/branches/camel-2.7.x/
------------------------------------------------------------------------------
--- svnmerge-integrated (original)
+++ svnmerge-integrated Wed Nov  9 14:36:23 2011
@@ -1 +1 @@
-/camel/branches/camel-2.8.x:1-1146127,1146608,1146653,1146771,1146903,1147216,1170965-1171083,1171085-1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732
+/camel/branches/camel-2.8.x:1-1146127,1146608,1146653,1146771,1146903,1147216,1170965-1171083,1171085-1171400,1174571,1175323,1176329,1176787,1177397,1177946,1177949,1180598,1187226,1189704,1199138,1199732,1199766

Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/CorrelationMap.java Wed Nov  9 14:36:23 2011
@@ -21,15 +21,30 @@ import java.util.concurrent.ScheduledExe
 import org.apache.camel.util.DefaultTimeoutMap;
 
 /**
- * @version 
+ * @version
  */
 public class CorrelationMap extends DefaultTimeoutMap<String, ReplyHandler> {
 
+    private CorrelationListener listener;
+
     public CorrelationMap(ScheduledExecutorService executor, long requestMapPollTimeMillis) {
         super(executor, requestMapPollTimeMillis);
     }
 
+    public void setListener(CorrelationListener listener) {
+        // there is only one listener needed
+        this.listener = listener;
+    }
+
     public boolean onEviction(String key, ReplyHandler value) {
+        try {
+            if (listener != null) {
+                listener.onEviction(key);
+            }
+        } catch (Throwable e) {
+            // ignore
+        }
+
         // trigger timeout
         value.onTimeout(key);
         // return true to remove the element
@@ -38,6 +53,14 @@ public class CorrelationMap extends Defa
 
     @Override
     public void put(String key, ReplyHandler value, long timeoutMillis) {
+        try {
+            if (listener != null) {
+                listener.onPut(key);
+            }
+        } catch (Throwable e) {
+            // ignore
+        }
+
         if (timeoutMillis <= 0) {
             // no timeout (must use Integer.MAX_VALUE)
             super.put(key, value, Integer.MAX_VALUE);
@@ -45,4 +68,19 @@ public class CorrelationMap extends Defa
             super.put(key, value, timeoutMillis);
         }
     }
+
+    @Override
+    public ReplyHandler remove(String key) {
+        try {
+            if (listener != null) {
+                listener.onRemove(key);
+            }
+        } catch (Throwable e) {
+            // ignore
+        }
+
+        ReplyHandler answer = super.remove(key);
+        return answer;
+    }
+
 }

Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/MessageSelectorCreator.java Wed Nov  9 14:36:23 2011
@@ -16,30 +16,25 @@
  */
 package org.apache.camel.component.jms.reply;
 
-import java.util.HashMap;
-import java.util.Map;
+import java.util.LinkedHashSet;
+import java.util.Set;
 
 /**
  * A creator which can build the JMS message selector query string to use
  * with a shared persistent reply-to queue, so we can select the correct messages we expect as replies.
  */
-public class MessageSelectorCreator {
-    protected Map<String, String> correlationIds;
+public class MessageSelectorCreator implements CorrelationListener {
+    protected final CorrelationMap timeoutMap;
+    protected final Set<String> correlationIds;
     protected boolean dirty = true;
     protected StringBuilder expression;
 
-    public MessageSelectorCreator() {
-        correlationIds = new HashMap<String, String>();
-    }
-
-    public synchronized void addCorrelationID(String id) {
-        correlationIds.put(id, id);
-        dirty = true;
-    }
-
-    public synchronized void removeCorrelationID(String id) {
-        correlationIds.remove(id);
-        dirty = true;
+    public MessageSelectorCreator(CorrelationMap timeoutMap) {
+        this.timeoutMap = timeoutMap;
+        this.timeoutMap.setListener(this);
+        // create local set of correlation ids, as its easier to keep track
+        // using the listener so we can flag the dirty flag upon changes
+        this.correlationIds = new LinkedHashSet<String>();
     }
 
     public synchronized String get() {
@@ -49,16 +44,16 @@ public class MessageSelectorCreator {
 
         expression = new StringBuilder("JMSCorrelationID='");
 
-        if (correlationIds.isEmpty()) {
+        if (correlationIds.size() == 0) {
             // no id's so use a dummy to select nothing
             expression.append("CamelDummyJmsMessageSelector'");
         } else {
             boolean first = true;
-            for (Map.Entry<String, String> entry : correlationIds.entrySet()) {
+            for (String value : correlationIds) {
                 if (!first) {
                     expression.append(" OR JMSCorrelationID='");
                 }
-                expression.append(entry.getValue()).append("'");
+                expression.append(value).append("'");
                 if (first) {
                     first = false;
                 }
@@ -69,4 +64,18 @@ public class MessageSelectorCreator {
         return expression.toString();
     }
 
+    public void onPut(String key) {
+        dirty = true;
+        correlationIds.add(key);
+    }
+
+    public void onRemove(String key) {
+        dirty = true;
+        correlationIds.remove(key);
+    }
+
+    public void onEviction(String key) {
+        dirty = true;
+        correlationIds.remove(key);
+    }
 }
\ No newline at end of file

Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyHandler.java Wed Nov  9 14:36:23 2011
@@ -16,8 +16,6 @@
  */
 package org.apache.camel.component.jms.reply;
 
-import javax.jms.Message;
-
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 
@@ -36,21 +34,4 @@ public class PersistentQueueReplyHandler
         this.dynamicMessageSelector = dynamicMessageSelector;
     }
 
-    @Override
-    public void onReply(String correlationId, Message reply) {
-        if (dynamicMessageSelector != null) {
-            // remove correlation id from message selector
-            dynamicMessageSelector.removeCorrelationID(correlationId);
-        }
-        super.onReply(correlationId, reply);
-    }
-
-    @Override
-    public void onTimeout(String correlationId) {
-        if (dynamicMessageSelector != null) {
-            // remove correlation id from message selector
-            dynamicMessageSelector.removeCorrelationID(correlationId);
-        }
-        super.onTimeout(correlationId);
-    }
 }

Modified: camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java
URL: http://svn.apache.org/viewvc/camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java?rev=1199774&r1=1199773&r2=1199774&view=diff
==============================================================================
--- camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java (original)
+++ camel/branches/camel-2.7.x/components/camel-jms/src/main/java/org/apache/camel/component/jms/reply/PersistentQueueReplyManager.java Wed Nov  9 14:36:23 2011
@@ -45,10 +45,6 @@ public class PersistentQueueReplyManager
         PersistentQueueReplyHandler handler = new PersistentQueueReplyHandler(replyManager, exchange, callback,
                 originalCorrelationId, requestTimeout, dynamicMessageSelector);
         correlation.put(correlationId, handler, requestTimeout);
-        if (dynamicMessageSelector != null) {
-            // also remember to keep the dynamic selector updated with the new correlation id
-            dynamicMessageSelector.addCorrelationID(correlationId);
-        }
         return correlationId;
     }
 
@@ -64,14 +60,6 @@ public class PersistentQueueReplyManager
         }
 
         correlation.put(newCorrelationId, handler, requestTimeout);
-
-        // no not arrived early
-        if (dynamicMessageSelector != null) {
-            // also remember to keep the dynamic selector updated with the new correlation id
-            // at first removing the old correlationID and then add the new correlationID
-            dynamicMessageSelector.removeCorrelationID(correlationId);
-            dynamicMessageSelector.addCorrelationID(newCorrelationId);
-        }
     }
 
     protected void handleReplyMessage(String correlationID, Message message) {
@@ -84,10 +72,6 @@ public class PersistentQueueReplyManager
             try {
                 handler.onReply(correlationID, message);
             } finally {
-                if (dynamicMessageSelector != null) {
-                    // also remember to keep the dynamic selector updated with the new correlation id
-                    dynamicMessageSelector.removeCorrelationID(correlationID);
-                }
                 correlation.remove(correlationID);
             }
         } else {
@@ -167,7 +151,7 @@ public class PersistentQueueReplyManager
             answer = new PersistentQueueMessageListenerContainer(fixedMessageSelector);
         } else {
             // use a dynamic message selector which will select the message we want to receive as reply
-            dynamicMessageSelector = new MessageSelectorCreator();
+            dynamicMessageSelector = new MessageSelectorCreator(correlation);
             answer = new PersistentQueueMessageListenerContainer(dynamicMessageSelector);
         }