You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/03 12:15:38 UTC

[camel] branch master updated: CAMEL-15628: Fixed ArrayIndexOutOfBoundsException for concurrent / high throughtput routing with message history enabled.

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 880b7b1  CAMEL-15628: Fixed ArrayIndexOutOfBoundsException for concurrent / high throughtput routing with message history enabled.
880b7b1 is described below

commit 880b7b173ef0edaab37419ae88a43ad88bd0a1ac
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Oct 3 14:15:05 2020 +0200

    CAMEL-15628: Fixed ArrayIndexOutOfBoundsException for concurrent / high throughtput routing with message history enabled.
---
 .../org/apache/camel/impl/engine/DefaultInflightRepository.java     | 5 ++---
 .../java/org/apache/camel/processor/CamelInternalProcessor.java     | 5 +++--
 .../src/main/java/org/apache/camel/processor/Splitter.java          | 6 ++++--
 .../org/apache/camel/processor/interceptor/DefaultDebugger.java     | 4 ++--
 .../src/main/docs/modules/eips/pages/message-history.adoc           | 3 +++
 .../src/main/java/org/apache/camel/support/DefaultExchange.java     | 5 +++--
 .../src/main/java/org/apache/camel/support/ExchangeHelper.java      | 5 +++--
 7 files changed, 20 insertions(+), 13 deletions(-)

diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
index 7438f00..b2e7db8 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
@@ -19,7 +19,6 @@ package org.apache.camel.impl.engine;
 import java.util.Collection;
 import java.util.Collections;
 import java.util.Comparator;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.concurrent.ConcurrentHashMap;
 import java.util.concurrent.ConcurrentMap;
@@ -234,13 +233,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
         @SuppressWarnings("unchecked")
         public long getElapsed() {
             // this can only be calculate if message history is enabled
-            LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
+            List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
             if (list == null || list.isEmpty()) {
                 return 0;
             }
 
             // get latest entry
-            MessageHistory history = list.getLast();
+            MessageHistory history = list.get(list.size() - 1);
             if (history != null) {
                 long elapsed = history.getElapsed();
                 if (elapsed == 0 && history.getTime() > 0) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 1899481..d244cc8 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -17,9 +17,9 @@
 package org.apache.camel.processor;
 
 import java.util.ArrayList;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.RejectedExecutionException;
 
 import org.apache.camel.AsyncCallback;
@@ -723,7 +723,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
             if (history != null) {
                 List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
                 if (list == null) {
-                    list = new LinkedList<>();
+                    // use thread-safe list as message history may be accessed concurrently
+                    list = new CopyOnWriteArrayList<>();
                     exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
                 }
                 list.add(history);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
index a6c8847..8efa0c7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
@@ -276,8 +276,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
 
     private static Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) {
         Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId);
-        // we do not want to copy the message history for splitted sub-messages
-        answer.getProperties().remove(Exchange.MESSAGE_HISTORY);
+        if (exchange.getContext().isMessageHistory()) {
+            // we do not want to copy the message history for splitted sub-messages
+            answer.getProperties().remove(Exchange.MESSAGE_HISTORY);
+        }
         return answer;
     }
 }
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java b/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java
index 2f54f27..d25123b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java
@@ -300,8 +300,8 @@ public class DefaultDebugger extends ServiceSupport implements Debugger, CamelCo
     @SuppressWarnings("unchecked")
     protected void onEvent(Exchange exchange, ExchangeEvent event, Breakpoint breakpoint) {
         // try to get the last known definition
-        LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
-        MessageHistory last = list != null ? list.getLast() : null;
+        List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+        MessageHistory last = list != null ? list.get(list.size() - 1) : null;
         NamedNode definition = last != null ? last.getNode() : null;
 
         try {
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc
index 170f59a..43cb853 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc
@@ -16,6 +16,9 @@ if needed, such as during development, where Camel can report route stack-traces
 But for production usage, then message history should only be enabled if you have monitoring systems that rely on gather these
 fine grained details.
 
+IMPORTANT: When message history is enabled then there is a slight performance overhead as the history data is stored
+in a `java.util.concurrent.CopyOnWriteArrayList` due to the need of being thread safe.
+
 == Enabling or disabling Message History
 
 The Message History can be enabled or disabled per CamelContext or per route (disabled by default).
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index d48e6bd..ccadefe 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -19,11 +19,11 @@ package org.apache.camel.support;
 import java.util.ArrayList;
 import java.util.HashMap;
 import java.util.HashSet;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.Set;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 
 import org.apache.camel.CamelContext;
 import org.apache.camel.CamelExecutionException;
@@ -176,7 +176,8 @@ public final class DefaultExchange implements ExtendedExchange {
             // safe copy message history using a defensive copy
             List<MessageHistory> history = (List<MessageHistory>) target.remove(Exchange.MESSAGE_HISTORY);
             if (history != null) {
-                target.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
+                // use thread-safe list as message history may be accessed concurrently
+                target.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history));
             }
         }
     }
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 41315fe..b207cf0 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -21,10 +21,10 @@ import java.io.IOException;
 import java.io.InputStream;
 import java.nio.channels.ReadableByteChannel;
 import java.util.HashMap;
-import java.util.LinkedList;
 import java.util.List;
 import java.util.Map;
 import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
 import java.util.concurrent.ExecutionException;
 import java.util.concurrent.Future;
 import java.util.concurrent.TimeUnit;
@@ -929,7 +929,8 @@ public final class ExchangeHelper {
         // safe copy message history using a defensive copy
         List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
         if (history != null) {
-            answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
+            // use thread-safe list as message history may be accessed concurrently
+            answer.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history));
         }
 
         return answer;