You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2015/06/04 11:08:21 UTC

[1/3] camel git commit: CAMEL-8829: Do a defensive copy of the message when creating correlated copy, to eavoid any ConcurrentModificationException such as routing to logs using concurrent threads or if using wire tap EIP etc.

Repository: camel
Updated Branches:
  refs/heads/master a704d0331 -> f6aa990c0


CAMEL-8829: Do a defensive copy of the message when creating correlated copy, to eavoid any ConcurrentModificationException such as routing to logs using concurrent threads or if using wire tap EIP etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d13afd1c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d13afd1c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d13afd1c

Branch: refs/heads/master
Commit: d13afd1ce6593303acee2d50cd016dbabe18d912
Parents: a704d03
Author: Claus Ibsen <da...@apache.org>
Authored: Wed Jun 3 18:28:43 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jun 4 08:55:59 2015 +0200

----------------------------------------------------------------------
 .../main/java/org/apache/camel/Exchange.java    | 11 ++++++++
 .../org/apache/camel/impl/DefaultExchange.java  | 28 +++++++++++++++++---
 .../org/apache/camel/util/ExchangeHelper.java   |  3 ++-
 3 files changed, 37 insertions(+), 5 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/d13afd1c/camel-core/src/main/java/org/apache/camel/Exchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/Exchange.java b/camel-core/src/main/java/org/apache/camel/Exchange.java
index fbf6176..d3f841b 100644
--- a/camel-core/src/main/java/org/apache/camel/Exchange.java
+++ b/camel-core/src/main/java/org/apache/camel/Exchange.java
@@ -476,10 +476,21 @@ public interface Exchange {
     /**
      * Creates a copy of the current message exchange so that it can be
      * forwarded to another destination
+     * <p/>
+     * Notice this operation invokes <tt>copy(false)</tt>
      */
     Exchange copy();
 
     /**
+     * Creates a copy of the current message exchange so that it can be
+     * forwarded to another destination
+     *
+     * @param safeCopy whether to copy exchange properties and message headers safely to a new map instance,
+     *                 or allow sharing the same map instances in the returned copy.
+     */
+    Exchange copy(boolean safeCopy);
+
+    /**
      * Returns the endpoint which originated this message exchange if a consumer on an endpoint
      * created the message exchange, otherwise this property will be <tt>null</tt>
      */

http://git-wip-us.apache.org/repos/asf/camel/blob/d13afd1c/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
index eb6db98..d24eed6 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
@@ -83,15 +83,35 @@ public final class DefaultExchange implements Exchange {
     }
 
     public Exchange copy() {
+        // to be backwards compatible as today
+        return copy(false);
+    }
+
+    public Exchange copy(boolean safeCopy) {
         DefaultExchange exchange = new DefaultExchange(this);
 
         if (hasProperties()) {
             exchange.setProperties(safeCopy(getProperties()));
         }
-        
-        exchange.setIn(getIn().copy());
-        if (hasOut()) {
-            exchange.setOut(getOut().copy());
+
+        if (safeCopy) {
+            exchange.getIn().setBody(getIn().getBody());
+            if (getIn().hasHeaders()) {
+                exchange.getIn().setHeaders(safeCopy(getIn().getHeaders()));
+            }
+            if (hasOut()) {
+                exchange.getOut().setBody(getOut().getBody());
+                if (getOut().hasHeaders()) {
+                    exchange.getOut().setHeaders(safeCopy(getOut().getHeaders()));
+                }
+            }
+        } else {
+            // old way of doing copy which is @deprecated
+            // TODO: remove this in Camel 3.0, and always do a safe copy
+            exchange.setIn(getIn().copy());
+            if (hasOut()) {
+                exchange.setOut(getOut().copy());
+            }
         }
         exchange.setException(getException());
         return exchange;

http://git-wip-us.apache.org/repos/asf/camel/blob/d13afd1c/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
index 6ca425d..cb8aa92 100644
--- a/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
+++ b/camel-core/src/main/java/org/apache/camel/util/ExchangeHelper.java
@@ -231,7 +231,8 @@ public final class ExchangeHelper {
     public static Exchange createCorrelatedCopy(Exchange exchange, boolean handover, boolean useSameMessageId) {
         String id = exchange.getExchangeId();
 
-        Exchange copy = exchange.copy();
+        // make sure to do a safe copy as the correlated copy can be routed independently of the source.
+        Exchange copy = exchange.copy(true);
         // do not reuse message id on copy
         if (!useSameMessageId) {
             if (copy.hasOut()) {


[2/3] camel git commit: CAMEL-8829: Do a defensive copy of the message when creating correlated copy, to eavoid any ConcurrentModificationException such as routing to logs using concurrent threads or if using wire tap EIP etc.

Posted by da...@apache.org.
CAMEL-8829: Do a defensive copy of the message when creating correlated copy, to eavoid any ConcurrentModificationException such as routing to logs using concurrent threads or if using wire tap EIP etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/6d6d1b73
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/6d6d1b73
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/6d6d1b73

Branch: refs/heads/master
Commit: 6d6d1b736e25b139671ba9a652b6b7ac9e524a3b
Parents: d13afd1
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Jun 4 10:16:39 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jun 4 10:16:39 2015 +0200

----------------------------------------------------------------------
 .../org/apache/camel/impl/DefaultExchange.java  | 21 ++++++++++++++++----
 1 file changed, 17 insertions(+), 4 deletions(-)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/6d6d1b73/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
index d24eed6..6704cdf 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultExchange.java
@@ -29,6 +29,7 @@ import org.apache.camel.Message;
 import org.apache.camel.MessageHistory;
 import org.apache.camel.spi.Synchronization;
 import org.apache.camel.spi.UnitOfWork;
+import org.apache.camel.util.CaseInsensitiveMap;
 import org.apache.camel.util.EndpointHelper;
 import org.apache.camel.util.ExchangeHelper;
 import org.apache.camel.util.ObjectHelper;
@@ -91,18 +92,18 @@ public final class DefaultExchange implements Exchange {
         DefaultExchange exchange = new DefaultExchange(this);
 
         if (hasProperties()) {
-            exchange.setProperties(safeCopy(getProperties()));
+            exchange.setProperties(safeCopyProperties(getProperties()));
         }
 
         if (safeCopy) {
             exchange.getIn().setBody(getIn().getBody());
             if (getIn().hasHeaders()) {
-                exchange.getIn().setHeaders(safeCopy(getIn().getHeaders()));
+                exchange.getIn().setHeaders(safeCopyHeaders(getIn().getHeaders()));
             }
             if (hasOut()) {
                 exchange.getOut().setBody(getOut().getBody());
                 if (getOut().hasHeaders()) {
-                    exchange.getOut().setHeaders(safeCopy(getOut().getHeaders()));
+                    exchange.getOut().setHeaders(safeCopyHeaders(getOut().getHeaders()));
                 }
             }
         } else {
@@ -118,11 +119,23 @@ public final class DefaultExchange implements Exchange {
     }
 
     @SuppressWarnings("unchecked")
-    private static Map<String, Object> safeCopy(Map<String, Object> properties) {
+    private static Map<String, Object> safeCopyHeaders(Map<String, Object> headers) {
+        if (headers == null) {
+            return null;
+        }
+
+        Map<String, Object> answer = new CaseInsensitiveMap();
+        answer.putAll(headers);
+        return answer;
+    }
+
+    @SuppressWarnings("unchecked")
+    private static Map<String, Object> safeCopyProperties(Map<String, Object> properties) {
         if (properties == null) {
             return null;
         }
 
+        // TODO: properties should use same map kind as headers
         Map<String, Object> answer = new ConcurrentHashMap<String, Object>(properties);
 
         // safe copy message history using a defensive copy


[3/3] camel git commit: CAMEL-8829: Do a defensive copy of the message when creating correlated copy, to eavoid any ConcurrentModificationException such as routing to logs using concurrent threads or if using wire tap EIP etc.

Posted by da...@apache.org.
CAMEL-8829: Do a defensive copy of the message when creating correlated copy, to eavoid any ConcurrentModificationException such as routing to logs using concurrent threads or if using wire tap EIP etc.


Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/f6aa990c
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/f6aa990c
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/f6aa990c

Branch: refs/heads/master
Commit: f6aa990c00cfbe8a70bd36cbad69f7268f891fde
Parents: 6d6d1b7
Author: Claus Ibsen <da...@apache.org>
Authored: Thu Jun 4 11:11:45 2015 +0200
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Jun 4 11:11:45 2015 +0200

----------------------------------------------------------------------
 .../src/main/scala/org/apache/camel/scala/RichExchange.scala       | 2 ++
 1 file changed, 2 insertions(+)
----------------------------------------------------------------------


http://git-wip-us.apache.org/repos/asf/camel/blob/f6aa990c/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
----------------------------------------------------------------------
diff --git a/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala b/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
index 3cb9262..e908e39 100644
--- a/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
+++ b/components/camel-scala/src/main/scala/org/apache/camel/scala/RichExchange.scala
@@ -127,6 +127,8 @@ class RichExchange(val exchange : Exchange) extends Exchange {
 
   def copy = new RichExchange(exchange.copy)
 
+  def copy(safeCopy: Boolean) = new RichExchange(exchange.copy(safeCopy))
+
   def addOnCompletion(onCompletion: Synchronization) { exchange.addOnCompletion(onCompletion) }
   
   def containsOnCompletion(onCompletion: Synchronization) = exchange.containsOnCompletion(onCompletion)