You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2017/04/06 09:02:47 UTC
camel git commit: CAMEL-11054 Create SPI for Log EIP to enable other
components to intercept/enrich logged messages
Repository: camel
Updated Branches:
refs/heads/master f096e2608 -> d1ee73ec8
CAMEL-11054 Create SPI for Log EIP to enable other components to intercept/enrich logged messages
Project: http://git-wip-us.apache.org/repos/asf/camel/repo
Commit: http://git-wip-us.apache.org/repos/asf/camel/commit/d1ee73ec
Tree: http://git-wip-us.apache.org/repos/asf/camel/tree/d1ee73ec
Diff: http://git-wip-us.apache.org/repos/asf/camel/diff/d1ee73ec
Branch: refs/heads/master
Commit: d1ee73ec8533cb9295cd198de2f987231e4a9d91
Parents: f096e26
Author: Tomohisa Igarashi <tm...@gmail.com>
Authored: Thu Mar 30 22:56:21 2017 +0900
Committer: Claus Ibsen <da...@apache.org>
Committed: Thu Apr 6 11:00:49 2017 +0200
----------------------------------------------------------------------
.../java/org/apache/camel/CamelContext.java | 12 ++++
.../apache/camel/component/log/LogEndpoint.java | 3 +-
.../apache/camel/impl/DefaultCamelContext.java | 11 ++++
.../org/apache/camel/model/LogDefinition.java | 3 +-
.../camel/processor/CamelLogProcessor.java | 37 ++++++++++-
.../apache/camel/processor/LogProcessor.java | 34 +++++++++-
.../camel/processor/interceptor/Tracer.java | 2 +-
.../java/org/apache/camel/spi/LogListener.java | 45 +++++++++++++
.../camel/component/log/LogListenerTest.java | 66 ++++++++++++++++++++
.../camel/processor/LogEipListenerTest.java | 66 ++++++++++++++++++++
10 files changed, 274 insertions(+), 5 deletions(-)
----------------------------------------------------------------------
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/CamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/CamelContext.java b/camel-core/src/main/java/org/apache/camel/CamelContext.java
index 1d5be88..8aec529 100644
--- a/camel-core/src/main/java/org/apache/camel/CamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/CamelContext.java
@@ -22,6 +22,7 @@ import java.util.Collection;
import java.util.List;
import java.util.Map;
import java.util.Properties;
+import java.util.Set;
import java.util.concurrent.ScheduledExecutorService;
import java.util.concurrent.TimeUnit;
@@ -57,6 +58,7 @@ import org.apache.camel.spi.Injector;
import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.LifecycleStrategy;
+import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.ManagementMBeanAssembler;
import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.ManagementStrategy;
@@ -1965,4 +1967,14 @@ public interface CamelContext extends SuspendableService, RuntimeConfiguration {
*/
RuntimeCamelCatalog getRuntimeCamelCatalog();
+ /**
+ * Gets a list of {@link LogListener}.
+ */
+ Set<LogListener> getLogListeners();
+
+ /**
+ * Adds a {@link LogListener}.
+ */
+ void addlogListener(LogListener listener);
+
}
\ No newline at end of file
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java b/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
index 05c4e50..29d5b3a 100644
--- a/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
+++ b/camel-core/src/main/java/org/apache/camel/component/log/LogEndpoint.java
@@ -27,6 +27,7 @@ import org.apache.camel.processor.DefaultExchangeFormatter;
import org.apache.camel.processor.DefaultMaskingFormatter;
import org.apache.camel.processor.ThroughputLogger;
import org.apache.camel.spi.ExchangeFormatter;
+import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.UriEndpoint;
@@ -141,7 +142,7 @@ public class LogEndpoint extends ProcessorEndpoint {
Long groupDelay = getGroupDelay();
answer = new ThroughputLogger(camelLogger, this.getCamelContext(), getGroupInterval(), groupDelay, groupActiveOnly);
} else {
- answer = new CamelLogProcessor(camelLogger, localFormatter, getMaskingFormatter());
+ answer = new CamelLogProcessor(camelLogger, localFormatter, getMaskingFormatter(), getCamelContext().getLogListeners());
}
// the logger is the processor
setProcessor(answer);
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
index 1633616..f16de74 100644
--- a/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
+++ b/camel-core/src/main/java/org/apache/camel/impl/DefaultCamelContext.java
@@ -26,6 +26,7 @@ import java.util.Collections;
import java.util.Comparator;
import java.util.Date;
import java.util.HashMap;
+import java.util.HashSet;
import java.util.Iterator;
import java.util.LinkedHashMap;
import java.util.LinkedHashSet;
@@ -141,6 +142,7 @@ import org.apache.camel.spi.InterceptStrategy;
import org.apache.camel.spi.Language;
import org.apache.camel.spi.LanguageResolver;
import org.apache.camel.spi.LifecycleStrategy;
+import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.ManagementMBeanAssembler;
import org.apache.camel.spi.ManagementNameStrategy;
import org.apache.camel.spi.ManagementStrategy;
@@ -231,6 +233,7 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
private RestRegistry restRegistry = new DefaultRestRegistry();
private List<InterceptStrategy> interceptStrategies = new ArrayList<InterceptStrategy>();
private List<RoutePolicyFactory> routePolicyFactories = new ArrayList<RoutePolicyFactory>();
+ private Set<LogListener> logListeners = new LinkedHashSet<>();
// special flags to control the first startup which can are special
private volatile boolean firstStartDone;
@@ -2684,6 +2687,14 @@ public class DefaultCamelContext extends ServiceSupport implements ModelCamelCon
getRoutePolicyFactories().add(routePolicyFactory);
}
+ public Set<LogListener> getLogListeners() {
+ return logListeners;
+ }
+
+ public void addlogListener(LogListener listener) {
+ logListeners.add(listener);
+ }
+
public void setStreamCaching(Boolean cache) {
this.streamCache = cache;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java b/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
index 3e18d9a..66493e6 100644
--- a/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
+++ b/camel-core/src/main/java/org/apache/camel/model/LogDefinition.java
@@ -29,6 +29,7 @@ import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.processor.DefaultMaskingFormatter;
import org.apache.camel.processor.LogProcessor;
+import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
import org.apache.camel.spi.Metadata;
import org.apache.camel.spi.RouteContext;
@@ -125,7 +126,7 @@ public class LogDefinition extends NoOutputDefinition<LogDefinition> {
LoggingLevel level = getLoggingLevel() != null ? getLoggingLevel() : LoggingLevel.INFO;
CamelLogger camelLogger = new CamelLogger(logger, level, getMarker());
- return new LogProcessor(exp, camelLogger, getMaskingFormatter(routeContext));
+ return new LogProcessor(exp, camelLogger, getMaskingFormatter(routeContext), routeContext.getCamelContext().getLogListeners());
}
private MaskingFormatter getMaskingFormatter(RouteContext routeContext) {
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java
index e5a3e5c..88a3a9d 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/CamelLogProcessor.java
@@ -16,6 +16,10 @@
*/
package org.apache.camel.processor;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
@@ -23,9 +27,12 @@ import org.apache.camel.LoggingLevel;
import org.apache.camel.Processor;
import org.apache.camel.spi.ExchangeFormatter;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.CamelLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A {@link Processor} which just logs to a {@link CamelLogger} object which can be used
@@ -37,10 +44,13 @@ import org.apache.camel.util.CamelLogger;
* @version
*/
public class CamelLogProcessor implements AsyncProcessor, IdAware {
+
+ private static final Logger LOG = LoggerFactory.getLogger(CamelLogProcessor.class);
private String id;
private CamelLogger log;
private ExchangeFormatter formatter;
private MaskingFormatter maskingFormatter;
+ private Set<LogListener> listeners;
public CamelLogProcessor() {
this(new CamelLogger(CamelLogProcessor.class.getName()));
@@ -51,10 +61,11 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
this.log = log;
}
- public CamelLogProcessor(CamelLogger log, ExchangeFormatter formatter, MaskingFormatter maskingFormatter) {
+ public CamelLogProcessor(CamelLogger log, ExchangeFormatter formatter, MaskingFormatter maskingFormatter, Set<LogListener> listeners) {
this(log);
this.formatter = formatter;
this.maskingFormatter = maskingFormatter;
+ this.listeners = listeners;
}
@Override
@@ -80,6 +91,7 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
if (maskingFormatter != null) {
output = maskingFormatter.format(output);
}
+ output = fireListeners(exchange, output);
log.log(output);
}
callback.done(true);
@@ -92,6 +104,7 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
if (maskingFormatter != null) {
output = maskingFormatter.format(output);
}
+ output = fireListeners(exchange, output);
log.log(output, exception);
}
}
@@ -102,10 +115,32 @@ public class CamelLogProcessor implements AsyncProcessor, IdAware {
if (maskingFormatter != null) {
output = maskingFormatter.format(output);
}
+ output = fireListeners(exchange, output);
log.log(output);
}
}
+ private String fireListeners(Exchange exchange, String message) {
+ if (listeners == null) {
+ return message;
+ }
+ for (LogListener listener : listeners) {
+ if (listener == null) {
+ continue;
+ }
+ try {
+ String output = listener.onLog(exchange, log, message);
+ message = output != null ? output : message;
+ } catch (Throwable t) {
+ LOG.warn("Ignoring an exception thrown by {}: {}", listener.getClass().getName(), t.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("", t);
+ }
+ }
+ }
+ return message;
+ }
+
public CamelLogger getLogger() {
return log;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
index 1d4884c..c8fa263 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/LogProcessor.java
@@ -16,16 +16,23 @@
*/
package org.apache.camel.processor;
+import java.util.HashSet;
+import java.util.LinkedHashSet;
+import java.util.Set;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.AsyncProcessor;
import org.apache.camel.Exchange;
import org.apache.camel.Expression;
import org.apache.camel.Traceable;
import org.apache.camel.spi.IdAware;
+import org.apache.camel.spi.LogListener;
import org.apache.camel.spi.MaskingFormatter;
import org.apache.camel.support.ServiceSupport;
import org.apache.camel.util.AsyncProcessorHelper;
import org.apache.camel.util.CamelLogger;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
/**
* A processor which evaluates an {@link Expression} and logs it.
@@ -34,15 +41,18 @@ import org.apache.camel.util.CamelLogger;
*/
public class LogProcessor extends ServiceSupport implements AsyncProcessor, Traceable, IdAware {
+ private static final Logger LOG = LoggerFactory.getLogger(LogProcessor.class);
private String id;
private final Expression expression;
private final CamelLogger logger;
private final MaskingFormatter formatter;
+ private final Set<LogListener> listeners;
- public LogProcessor(Expression expression, CamelLogger logger, MaskingFormatter formatter) {
+ public LogProcessor(Expression expression, CamelLogger logger, MaskingFormatter formatter, Set<LogListener> listeners) {
this.expression = expression;
this.logger = logger;
this.formatter = formatter;
+ this.listeners = listeners;
}
public void process(Exchange exchange) throws Exception {
@@ -57,6 +67,7 @@ public class LogProcessor extends ServiceSupport implements AsyncProcessor, Trac
if (formatter != null) {
msg = formatter.format(msg);
}
+ msg = fireListeners(exchange, msg);
logger.doLog(msg);
}
} catch (Exception e) {
@@ -68,6 +79,27 @@ public class LogProcessor extends ServiceSupport implements AsyncProcessor, Trac
return true;
}
+ private String fireListeners(Exchange exchange, String message) {
+ if (listeners == null) {
+ return message;
+ }
+ for (LogListener listener : listeners) {
+ if (listener == null) {
+ continue;
+ }
+ try {
+ String output = listener.onLog(exchange, logger, message);
+ message = output != null ? output : message;
+ } catch (Throwable t) {
+ LOG.warn("Ignoring an exception thrown by {}: {}", listener.getClass().getName(), t.getMessage());
+ if (LOG.isDebugEnabled()) {
+ LOG.debug("", t);
+ }
+ }
+ }
+ return message;
+ }
+
@Override
public String toString() {
return "Log(" + logger.getLog().getName() + ")[" + expression + "]";
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java b/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
index 5d8747f..5c7a1cc 100644
--- a/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
+++ b/camel-core/src/main/java/org/apache/camel/processor/interceptor/Tracer.java
@@ -102,7 +102,7 @@ public class Tracer implements InterceptStrategy, Service {
*/
public synchronized CamelLogProcessor getLogger(ExchangeFormatter formatter) {
if (logger == null) {
- logger = new CamelLogProcessor(new CamelLogger(getLogName(), getLogLevel()), formatter, null);
+ logger = new CamelLogProcessor(new CamelLogger(getLogName(), getLogLevel()), formatter, null, null);
}
return logger;
}
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/main/java/org/apache/camel/spi/LogListener.java
----------------------------------------------------------------------
diff --git a/camel-core/src/main/java/org/apache/camel/spi/LogListener.java b/camel-core/src/main/java/org/apache/camel/spi/LogListener.java
new file mode 100644
index 0000000..6d7151e
--- /dev/null
+++ b/camel-core/src/main/java/org/apache/camel/spi/LogListener.java
@@ -0,0 +1,45 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.spi;
+
+import org.apache.camel.Exchange;
+import org.apache.camel.LoggingLevel;
+import org.apache.camel.util.CamelLogger;
+import org.slf4j.Logger;
+import org.slf4j.Marker;
+
+/**
+ * An event listener SPI for logging. Listeners are registered into {@link LogProcessor} and
+ * {@link CamelLogProcessor} so that the logging events are delivered for both of Log Component and Log EIP.
+ *
+ */
+public interface LogListener {
+
+ /**
+ * Invoked right before Log component or Log EIP logs.
+ * Note that {@link CamelLogger} holds the {@link LoggingLevel} and {@link Marker}.
+ * The listener can check {@link CamelLogger#getLevel()} to see in which log level
+ * this is going to be logged.
+ *
+ * @param exchange camel exchange
+ * @param camelLogger {@link CamelLogger}
+ * @param message log message
+ * @return log message, possibly enriched by the listener
+ */
+ String onLog(Exchange exchange, CamelLogger camelLogger, String message);
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java b/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java
new file mode 100644
index 0000000..dbdaf12
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/component/log/LogListenerTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.log;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.model.Constants;
+import org.apache.camel.util.jndi.JndiTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LogListenerTest {
+ private static boolean listenerFired;
+
+ @Test
+ public void testLogMask() throws Exception {
+ listenerFired = false;
+ CamelContext context = createCamelContext();
+ MockEndpoint mock = context.getEndpoint("mock:foo", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ context.addlogListener((exchange, camelLogger, message) -> {
+ Assert.assertEquals("Exchange[ExchangePattern: InOnly, BodyType: String, Body: hello]", message);
+ listenerFired = true;
+ return message + " - modified by listener";
+ });
+ context.start();
+ context.createProducerTemplate().sendBody("direct:foo", "hello");
+ mock.assertIsSatisfied();
+ Assert.assertEquals(true, listenerFired);
+ context.stop();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ JndiRegistry registry = new JndiRegistry(JndiTest.createInitialContext());
+ CamelContext context = new DefaultCamelContext(registry);
+ context.addRoutes(createRouteBuilder());
+ return context;
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:foo").routeId("foo").to("log:foo").to("mock:foo");
+ }
+ };
+ }
+
+}
http://git-wip-us.apache.org/repos/asf/camel/blob/d1ee73ec/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java
----------------------------------------------------------------------
diff --git a/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java b/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java
new file mode 100644
index 0000000..0e52fde
--- /dev/null
+++ b/camel-core/src/test/java/org/apache/camel/processor/LogEipListenerTest.java
@@ -0,0 +1,66 @@
+/**
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.processor;
+
+import org.apache.camel.CamelContext;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.component.mock.MockEndpoint;
+import org.apache.camel.impl.DefaultCamelContext;
+import org.apache.camel.impl.JndiRegistry;
+import org.apache.camel.spi.LogListener;
+import org.apache.camel.util.jndi.JndiTest;
+import org.junit.Assert;
+import org.junit.Test;
+
+public class LogEipListenerTest {
+ private static boolean listenerFired;
+
+ @Test
+ public void testLogListener() throws Exception {
+ listenerFired = false;
+ CamelContext context = createCamelContext();
+ MockEndpoint mock = context.getEndpoint("mock:foo", MockEndpoint.class);
+ mock.expectedMessageCount(1);
+ context.addlogListener((exchange, camelLogger, message) -> {
+ Assert.assertEquals("Got hello", message);
+ listenerFired = true;
+ return message + " - modified by listener";
+ });
+ context.start();
+ context.createProducerTemplate().sendBody("direct:foo", "hello");
+ mock.assertIsSatisfied();
+ Assert.assertEquals(true, listenerFired);
+ context.stop();
+ }
+
+ protected CamelContext createCamelContext() throws Exception {
+ JndiRegistry registry = new JndiRegistry(JndiTest.createInitialContext());
+ CamelContext context = new DefaultCamelContext(registry);
+ context.addRoutes(createRouteBuilder());
+ return context;
+ }
+
+ protected RouteBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+ @Override
+ public void configure() throws Exception {
+ from("direct:foo").routeId("foo").log("Got ${body}").to("mock:foo");
+ }
+ };
+ }
+
+}