You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2020/10/03 12:15:38 UTC
[camel] branch master updated: CAMEL-15628: Fixed
ArrayIndexOutOfBoundsException for concurrent / high throughtput routing
with message history enabled.
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 880b7b1 CAMEL-15628: Fixed ArrayIndexOutOfBoundsException for concurrent / high throughtput routing with message history enabled.
880b7b1 is described below
commit 880b7b173ef0edaab37419ae88a43ad88bd0a1ac
Author: Claus Ibsen <cl...@gmail.com>
AuthorDate: Sat Oct 3 14:15:05 2020 +0200
CAMEL-15628: Fixed ArrayIndexOutOfBoundsException for concurrent / high throughtput routing with message history enabled.
---
.../org/apache/camel/impl/engine/DefaultInflightRepository.java | 5 ++---
.../java/org/apache/camel/processor/CamelInternalProcessor.java | 5 +++--
.../src/main/java/org/apache/camel/processor/Splitter.java | 6 ++++--
.../org/apache/camel/processor/interceptor/DefaultDebugger.java | 4 ++--
.../src/main/docs/modules/eips/pages/message-history.adoc | 3 +++
.../src/main/java/org/apache/camel/support/DefaultExchange.java | 5 +++--
.../src/main/java/org/apache/camel/support/ExchangeHelper.java | 5 +++--
7 files changed, 20 insertions(+), 13 deletions(-)
diff --git a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
index 7438f00..b2e7db8 100644
--- a/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
+++ b/core/camel-base/src/main/java/org/apache/camel/impl/engine/DefaultInflightRepository.java
@@ -19,7 +19,6 @@ package org.apache.camel.impl.engine;
import java.util.Collection;
import java.util.Collections;
import java.util.Comparator;
-import java.util.LinkedList;
import java.util.List;
import java.util.concurrent.ConcurrentHashMap;
import java.util.concurrent.ConcurrentMap;
@@ -234,13 +233,13 @@ public class DefaultInflightRepository extends ServiceSupport implements Infligh
@SuppressWarnings("unchecked")
public long getElapsed() {
// this can only be calculate if message history is enabled
- LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
+ List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
if (list == null || list.isEmpty()) {
return 0;
}
// get latest entry
- MessageHistory history = list.getLast();
+ MessageHistory history = list.get(list.size() - 1);
if (history != null) {
long elapsed = history.getElapsed();
if (elapsed == 0 && history.getTime() > 0) {
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
index 1899481..d244cc8 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/CamelInternalProcessor.java
@@ -17,9 +17,9 @@
package org.apache.camel.processor;
import java.util.ArrayList;
-import java.util.LinkedList;
import java.util.List;
import java.util.Objects;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.RejectedExecutionException;
import org.apache.camel.AsyncCallback;
@@ -723,7 +723,8 @@ public class CamelInternalProcessor extends DelegateAsyncProcessor {
if (history != null) {
List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
if (list == null) {
- list = new LinkedList<>();
+ // use thread-safe list as message history may be accessed concurrently
+ list = new CopyOnWriteArrayList<>();
exchange.setProperty(Exchange.MESSAGE_HISTORY, list);
}
list.add(history);
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
index a6c8847..8efa0c7 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/Splitter.java
@@ -276,8 +276,10 @@ public class Splitter extends MulticastProcessor implements AsyncProcessor, Trac
private static Exchange copyAndPrepareSubExchange(Exchange exchange, boolean preserveExchangeId) {
Exchange answer = ExchangeHelper.createCopy(exchange, preserveExchangeId);
- // we do not want to copy the message history for splitted sub-messages
- answer.getProperties().remove(Exchange.MESSAGE_HISTORY);
+ if (exchange.getContext().isMessageHistory()) {
+ // we do not want to copy the message history for splitted sub-messages
+ answer.getProperties().remove(Exchange.MESSAGE_HISTORY);
+ }
return answer;
}
}
diff --git a/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java b/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java
index 2f54f27..d25123b 100644
--- a/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java
+++ b/core/camel-base/src/main/java/org/apache/camel/processor/interceptor/DefaultDebugger.java
@@ -300,8 +300,8 @@ public class DefaultDebugger extends ServiceSupport implements Debugger, CamelCo
@SuppressWarnings("unchecked")
protected void onEvent(Exchange exchange, ExchangeEvent event, Breakpoint breakpoint) {
// try to get the last known definition
- LinkedList<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, LinkedList.class);
- MessageHistory last = list != null ? list.getLast() : null;
+ List<MessageHistory> list = exchange.getProperty(Exchange.MESSAGE_HISTORY, List.class);
+ MessageHistory last = list != null ? list.get(list.size() - 1) : null;
NamedNode definition = last != null ? last.getNode() : null;
try {
diff --git a/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc b/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc
index 170f59a..43cb853 100644
--- a/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc
+++ b/core/camel-core-engine/src/main/docs/modules/eips/pages/message-history.adoc
@@ -16,6 +16,9 @@ if needed, such as during development, where Camel can report route stack-traces
But for production usage, then message history should only be enabled if you have monitoring systems that rely on gather these
fine grained details.
+IMPORTANT: When message history is enabled then there is a slight performance overhead as the history data is stored
+in a `java.util.concurrent.CopyOnWriteArrayList` due to the need of being thread safe.
+
== Enabling or disabling Message History
The Message History can be enabled or disabled per CamelContext or per route (disabled by default).
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
index d48e6bd..ccadefe 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/DefaultExchange.java
@@ -19,11 +19,11 @@ package org.apache.camel.support;
import java.util.ArrayList;
import java.util.HashMap;
import java.util.HashSet;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.Set;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import org.apache.camel.CamelContext;
import org.apache.camel.CamelExecutionException;
@@ -176,7 +176,8 @@ public final class DefaultExchange implements ExtendedExchange {
// safe copy message history using a defensive copy
List<MessageHistory> history = (List<MessageHistory>) target.remove(Exchange.MESSAGE_HISTORY);
if (history != null) {
- target.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
+ // use thread-safe list as message history may be accessed concurrently
+ target.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history));
}
}
}
diff --git a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
index 41315fe..b207cf0 100644
--- a/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
+++ b/core/camel-support/src/main/java/org/apache/camel/support/ExchangeHelper.java
@@ -21,10 +21,10 @@ import java.io.IOException;
import java.io.InputStream;
import java.nio.channels.ReadableByteChannel;
import java.util.HashMap;
-import java.util.LinkedList;
import java.util.List;
import java.util.Map;
import java.util.concurrent.ConcurrentHashMap;
+import java.util.concurrent.CopyOnWriteArrayList;
import java.util.concurrent.ExecutionException;
import java.util.concurrent.Future;
import java.util.concurrent.TimeUnit;
@@ -929,7 +929,8 @@ public final class ExchangeHelper {
// safe copy message history using a defensive copy
List<MessageHistory> history = (List<MessageHistory>) answer.remove(Exchange.MESSAGE_HISTORY);
if (history != null) {
- answer.put(Exchange.MESSAGE_HISTORY, new LinkedList<>(history));
+ // use thread-safe list as message history may be accessed concurrently
+ answer.put(Exchange.MESSAGE_HISTORY, new CopyOnWriteArrayList<>(history));
}
return answer;