You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/18 16:08:23 UTC

[camel] branch camel-3.7.x updated (8c4c8a0 -> 1508f25)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a change to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git.


    from 8c4c8a0  CAMEL-16233: Fix camel-http - Optimize to avoid type convertion that would do deep checking.
     new 864c65b  CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)
     new 1508f25  CAMEL-16227: Netty with reuseChannel invokes wrong callback (#5101)

The 2 revisions listed above as "new" are entirely new to this
repository and will be described in separate emails.  The revisions
listed as "add" were already present in the repository and have only
been added to this reference.


Summary of changes:
 components/camel-netty/pom.xml                     |  5 ++
 .../camel/component/netty/NettyCamelState.java     | 16 +++++
 .../camel/component/netty/NettyProducer.java       | 16 ++++-
 .../netty/handlers/ClientChannelHandler.java       | 22 +++---
 .../camel/component/netty/BaseNettyTest.java       |  2 +-
 .../netty/EnrichWithoutRestResponseTest.java       | 81 ++++++++++++++++++++++
 .../netty/ErrorDuringGracefullShutdownTest.java    | 62 +++++++++++++++++
 .../camel/component/netty/LogCaptureAppender.java  | 16 ++++-
 .../camel/component/netty/LogCaptureTest.java      |  6 +-
 ...est.java => NettyReuseChannelCallbackTest.java} | 64 +++++++++++++++--
 .../src/test/resources/log4j2.properties           |  2 +
 11 files changed, 269 insertions(+), 23 deletions(-)
 create mode 100644 components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java
 create mode 100644 components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
 copy components/camel-netty/src/test/java/org/apache/camel/component/netty/{NettyReuseChannelTest.java => NettyReuseChannelCallbackTest.java} (55%)


[camel] 02/02: CAMEL-16227: Netty with reuseChannel invokes wrong callback (#5101)

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 1508f25e51302af1cf02434a7ca6c8d61e3d4263
Author: k-jamroz <79...@users.noreply.github.com>
AuthorDate: Thu Feb 18 17:01:40 2021 +0100

    CAMEL-16227: Netty with reuseChannel invokes wrong callback (#5101)
    
    Co-authored-by: Krzysztof Jamróz <kr...@apdu.pl>
---
 .../camel/component/netty/NettyProducer.java       |  16 ++-
 .../netty/NettyReuseChannelCallbackTest.java       | 145 +++++++++++++++++++++
 2 files changed, 160 insertions(+), 1 deletion(-)

diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
index 0b74d89..ee0e0f2 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyProducer.java
@@ -18,6 +18,7 @@ package org.apache.camel.component.netty;
 
 import java.net.ConnectException;
 import java.net.InetSocketAddress;
+import java.util.Optional;
 import java.util.concurrent.RejectedExecutionException;
 import java.util.concurrent.TimeUnit;
 
@@ -36,6 +37,7 @@ import io.netty.channel.group.DefaultChannelGroup;
 import io.netty.channel.socket.nio.NioDatagramChannel;
 import io.netty.channel.socket.nio.NioSocketChannel;
 import io.netty.handler.timeout.ReadTimeoutHandler;
+import io.netty.util.AttributeKey;
 import io.netty.util.ReferenceCountUtil;
 import io.netty.util.concurrent.ImmediateEventExecutor;
 import org.apache.camel.AsyncCallback;
@@ -60,6 +62,9 @@ public class NettyProducer extends DefaultAsyncProducer {
 
     private static final Logger LOG = LoggerFactory.getLogger(NettyProducer.class);
 
+    private static final AttributeKey<NettyCamelStateCorrelationManager> CORRELATION_MANAGER_ATTR
+            = AttributeKey.valueOf("NettyCamelStateCorrelationManager");
+
     private ChannelGroup allChannels;
     private CamelContext context;
     private NettyConfiguration configuration;
@@ -269,6 +274,9 @@ public class NettyProducer extends DefaultAsyncProducer {
         // remember channel so we can reuse it
         final Channel channel = channelFuture.channel();
         if (getConfiguration().isReuseChannel() && exchange.getProperty(NettyConstants.NETTY_CHANNEL) == null) {
+            // remember correlation manager for this channel
+            // for use when sending subsequent messages reusing this channel
+            channel.attr(CORRELATION_MANAGER_ATTR).set(correlationManager);
             exchange.setProperty(NettyConstants.NETTY_CHANNEL, channel);
             // and defer closing the channel until we are done routing the exchange
             exchange.adapt(ExtendedExchange.class).addOnCompletion(new SynchronizationAdapter() {
@@ -298,6 +306,12 @@ public class NettyProducer extends DefaultAsyncProducer {
             });
         }
 
+        // Get appropriate correlation manager.
+        // If we reuse channel then get it from channel. CORRELATION_MANAGER_ATTR should be set at this point.
+        // Otherwise use correlation manager for this producer.
+        final NettyCamelStateCorrelationManager channelCorrelationManager
+                = Optional.ofNullable(channel.attr(CORRELATION_MANAGER_ATTR).get()).orElse(correlationManager);
+
         if (exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT) != null) {
             long timeoutInMs = exchange.getIn().getHeader(NettyConstants.NETTY_REQUEST_TIMEOUT, Long.class);
             ChannelHandler oldHandler = channel.pipeline().get("timeout");
@@ -321,7 +335,7 @@ public class NettyProducer extends DefaultAsyncProducer {
         }
 
         // setup state as attachment on the channel, so we can access the state later when needed
-        correlationManager.putState(channel, new NettyCamelState(producerCallback, exchange));
+        channelCorrelationManager.putState(channel, new NettyCamelState(producerCallback, exchange));
         // here we need to setup the remote address information here
         InetSocketAddress remoteAddress = null;
         if (!isTcp()) {
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java
new file mode 100644
index 0000000..412af2e
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/NettyReuseChannelCallbackTest.java
@@ -0,0 +1,145 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.ArrayList;
+import java.util.List;
+import java.util.concurrent.TimeUnit;
+
+import io.netty.channel.Channel;
+import org.apache.camel.Endpoint;
+import org.apache.camel.Exchange;
+import org.apache.camel.Processor;
+import org.apache.camel.builder.NotifyBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.impl.event.ExchangeSentEvent;
+import org.apache.camel.spi.CamelEvent;
+import org.apache.camel.spi.CamelEvent.ExchangeSendingEvent;
+import org.apache.camel.support.EventNotifierSupport;
+import org.junit.jupiter.api.Test;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import static org.junit.jupiter.api.Assertions.assertEquals;
+import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertSame;
+import static org.junit.jupiter.api.Assertions.assertTrue;
+
+/**
+ * Regression test for CAMEL-16227
+ */
+class NettyReuseChannelCallbackTest extends BaseNettyTest {
+
+    private static final Logger LOG = LoggerFactory.getLogger(NettyReuseChannelCallbackTest.class);
+
+    private final List<Channel> channels = new ArrayList<>();
+
+    @Test
+    void testReuse() throws Exception {
+        final List<Endpoint> eventEndpoints = new ArrayList<>(4);
+        final List<Long> times = new ArrayList<>(2);
+
+        final EventNotifierSupport nettyEventRecorder = new EventNotifierSupport() {
+            @Override
+            public void notify(CamelEvent event) throws Exception {
+                if (event instanceof ExchangeSendingEvent) {
+                    LOG.info("Got event {}", event);
+                    add(((ExchangeSendingEvent) event).getEndpoint());
+                }
+                if (event instanceof ExchangeSentEvent) {
+                    LOG.info("Got event {}", event);
+                    add(((ExchangeSentEvent) event).getEndpoint());
+                    if (((ExchangeSentEvent) event).getEndpoint() instanceof NettyEndpoint) {
+                        times.add(((ExchangeSentEvent) event).getTimeTaken());
+                    }
+                }
+            }
+
+            private void add(Endpoint endpoint) {
+                if (endpoint instanceof NettyEndpoint) {
+                    eventEndpoints.add(endpoint);
+                }
+            }
+        };
+        nettyEventRecorder.start();
+        context.getManagementStrategy().addEventNotifier(nettyEventRecorder);
+
+        NotifyBuilder notify = new NotifyBuilder(context).whenDone(1).create();
+
+        getMockEndpoint("mock:a").expectedBodiesReceived("Hello World");
+        getMockEndpoint("mock:b").expectedBodiesReceived("Hello Hello World");
+        getMockEndpoint("mock:result").expectedBodiesReceived("Hello World", "Hello Hello World");
+
+        template.sendBody("direct:start", "World\n");
+
+        assertMockEndpointsSatisfied(10, TimeUnit.SECONDS);
+
+        assertTrue(notify.matchesWaitTime());
+
+        assertEquals(2, channels.size());
+        assertSame(channels.get(0), channels.get(1), "Should reuse channel");
+        assertFalse(channels.get(0).isOpen(), "And closed when routing done");
+        assertFalse(channels.get(1).isOpen(), "And closed when routing done");
+
+        assertEquals(4, eventEndpoints.size(), "Should get 4 events for netty endpoints");
+        assertSame(eventEndpoints.get(0), eventEndpoints.get(1), "Sending and sent event should contain the same endpoint");
+        assertSame(eventEndpoints.get(2), eventEndpoints.get(3), "Sending and sent event should contain the same endpoint");
+        assertEquals(2, times.size(), "Should get 2 ExchangeSent events");
+        // one side effect of mixing callbacks in wrong time taken reported in event
+        times.forEach(time -> assertTrue(time < 900));
+    }
+
+    @Override
+    protected RouteBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+            @Override
+            public void configure() throws Exception {
+                // Netty URIs are slightly different (different requestTimeout)
+                // so there is different NettyEndpoint instance for each of them.
+                // This makes distinguishing events easier in test.
+                // If they URIs would be the same there would be one NettyEndpoint instance
+                // but still 2 separate NettyProducer instances.
+                from("direct:start")
+                        .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true&requestTimeout=1000")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+                                channels.add(channel);
+                                assertTrue(channel.isActive(), "Should be active");
+                            }
+                        })
+                        .to("mock:a")
+                        .to("netty:tcp://localhost:{{port}}?textline=true&sync=true&reuseChannel=true&disconnect=true&requestTimeout=2000")
+                        .process(new Processor() {
+                            @Override
+                            public void process(Exchange exchange) throws Exception {
+                                Channel channel = exchange.getProperty(NettyConstants.NETTY_CHANNEL, Channel.class);
+                                channels.add(channel);
+                                assertTrue(channel.isActive(), "Should be active");
+                            }
+                        })
+                        .to("mock:b");
+
+                from("netty:tcp://localhost:{{port}}?textline=true&sync=true")
+                        .transform(body().prepend("Hello "))
+                        .delay(500)
+                        .to("mock:result");
+            }
+        };
+    }
+}


[camel] 01/02: CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)

Posted by da...@apache.org.
This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch camel-3.7.x
in repository https://gitbox.apache.org/repos/asf/camel.git

commit 864c65b8b455b46f6271c5093f6c2dcf635a71db
Author: k-jamroz <79...@users.noreply.github.com>
AuthorDate: Thu Feb 18 17:01:28 2021 +0100

    CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)
    
    * CAMEL-9527: Should not log stacktrace when client has received reply - regression test
    
    * CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork
    
    Co-authored-by: Krzysztof Jamróz <kr...@apdu.pl>
---
 components/camel-netty/pom.xml                     |  5 ++
 .../camel/component/netty/NettyCamelState.java     | 16 +++++
 .../netty/handlers/ClientChannelHandler.java       | 22 +++---
 .../camel/component/netty/BaseNettyTest.java       |  2 +-
 .../netty/EnrichWithoutRestResponseTest.java       | 81 ++++++++++++++++++++++
 .../netty/ErrorDuringGracefullShutdownTest.java    | 62 +++++++++++++++++
 .../camel/component/netty/LogCaptureAppender.java  | 16 ++++-
 .../camel/component/netty/LogCaptureTest.java      |  6 +-
 .../src/test/resources/log4j2.properties           |  2 +
 9 files changed, 195 insertions(+), 17 deletions(-)

diff --git a/components/camel-netty/pom.xml b/components/camel-netty/pom.xml
index 6e5fbf7..7583921 100644
--- a/components/camel-netty/pom.xml
+++ b/components/camel-netty/pom.xml
@@ -100,6 +100,11 @@
             <artifactId>junit-jupiter</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
         <!-- logging -->
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
index 074dab5..389ac99 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.netty;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 
@@ -30,16 +32,30 @@ public final class NettyCamelState {
 
     private final Exchange exchange;
     private final AsyncCallback callback;
+    // It is never a good idea to call the same callback twice
+    private final AtomicBoolean callbackCalled;
 
     public NettyCamelState(AsyncCallback callback, Exchange exchange) {
         this.callback = callback;
         this.exchange = exchange;
+        this.callbackCalled = new AtomicBoolean();
     }
 
     public AsyncCallback getCallback() {
         return callback;
     }
 
+    public boolean isDone() {
+        return callbackCalled.get();
+    }
+
+    public void callbackDoneOnce(boolean doneSync) {
+        if (!callbackCalled.getAndSet(true)) {
+            // this is the first time we call the callback
+            callback.done(doneSync);
+        }
+    }
+
     public Exchange getExchange() {
         return exchange;
     }
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index ee7a43c..0000818 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.netty.handlers;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -83,10 +82,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
 
         NettyCamelState state = getState(ctx, cause);
         Exchange exchange = state != null ? state.getExchange() : null;
-        AsyncCallback callback = state != null ? state.getCallback() : null;
 
         // the state may not be set
-        if (exchange != null && callback != null) {
+        if (exchange != null) {
             Throwable initialCause = exchange.getException();
             if (initialCause != null && initialCause.getCause() == null) {
                 initialCause.initCause(cause);
@@ -99,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             NettyHelper.close(ctx.channel());
 
             // signal callback
-            callback.done(false);
+            state.callbackDoneOnce(false);
         }
     }
 
@@ -111,7 +109,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
 
         NettyCamelState state = getState(ctx, null);
         Exchange exchange = state != null ? state.getExchange() : null;
-        AsyncCallback callback = state != null ? state.getCallback() : null;
+        // this channel is maybe closing graceful and the callback could already have been called
+        // and if so we should not trigger an exception nor invoke callback second time
+        boolean doneUoW = state != null ? state.isDone() : false;
 
         // remove state
         producer.getCorrelationManager().removeState(ctx, ctx.channel());
@@ -120,10 +120,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
         producer.getAllChannels().remove(ctx.channel());
 
         if (exchange != null && !disconnecting) {
-            // this channel is maybe closing graceful and the exchange is already done
-            // and if so we should not trigger an exception
-            boolean doneUoW = exchange.getUnitOfWork() == null;
-
             NettyConfiguration configuration = producer.getConfiguration();
             if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) {
                 // To avoid call the callback.done twice
@@ -140,7 +136,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
                             new CamelExchangeException("No response received from remote server: " + address, exchange));
                 }
                 // signal callback
-                callback.done(false);
+                state.callbackDoneOnce(false);
             }
         }
 
@@ -171,14 +167,13 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             // we just ignore the received message as the channel is closed
             return;
         }
-        AsyncCallback callback = state.getCallback();
 
         Message message;
         try {
             message = getResponseMessage(exchange, ctx, msg);
         } catch (Exception e) {
             exchange.setException(e);
-            callback.done(false);
+            state.callbackDoneOnce(false);
             return;
         }
 
@@ -225,8 +220,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
                 NettyHelper.close(ctx.channel());
             }
         } finally {
-            // signal callback
-            callback.done(false);
+            state.callbackDoneOnce(false);
         }
     }
 
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
index 283f9318..b98e501 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
@@ -58,7 +58,7 @@ public class BaseNettyTest extends CamelTestSupport {
         System.gc();
         // Kick leak detection logging
         ByteBufAllocator.DEFAULT.buffer(1).release();
-        Collection<LogEvent> events = LogCaptureAppender.getEvents();
+        Collection<LogEvent> events = LogCaptureAppender.getEvents(ResourceLeakDetector.class);
         if (!events.isEmpty()) {
             String message = "Leaks detected while running tests: " + events;
             // Just write the message into log to help debug
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java
new file mode 100644
index 0000000..5789360
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/**
+ * Regression test for CAMEL-16178
+ */
+class EnrichWithoutRestResponseTest extends BaseNettyTest {
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                // mock server - accepts connection and immediately disconnects without any response
+                from("netty:tcp://0.0.0.0:{{port}}?disconnect=true")
+                        .log("Got request ${body}")
+                        .setBody(constant(null));
+
+                // test routes
+                final String nettyClientUri
+                        = "netty:tcp://127.0.0.1:{{port}}?textline=true&connectTimeout=1000&requestTimeout=1000";
+                from("direct:reqTo")
+                        .to(nettyClientUri);
+                from("direct:reqEnrich")
+                        .enrich(nettyClientUri);
+                from("direct:reqEnrichShareUoW")
+                        .enrich(nettyClientUri, new UseLatestAggregationStrategy(), true, true);
+            }
+        };
+    }
+
+    @Test
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    void toTest() {
+        assertThatExceptionOfType(CamelExecutionException.class)
+                .isThrownBy(() -> template.requestBody("direct:reqTo", ""))
+                .havingCause().withMessageContaining("No response received from remote server");
+    }
+
+    @Test
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    void enrichTest() {
+        assertThatExceptionOfType(CamelExecutionException.class)
+                .isThrownBy(() -> template.requestBody("direct:reqEnrich", ""))
+                .havingCause().withMessageContaining("No response received from remote server");
+    }
+
+    @Test
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    void enrichShareUoWTest() {
+        assertThatExceptionOfType(CamelExecutionException.class)
+                .isThrownBy(() -> template.requestBody("direct:reqEnrichShareUoW", ""))
+                .havingCause().withMessageContaining("No response received from remote server");
+    }
+}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
new file mode 100644
index 0000000..6771cc5
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Regression test for CAMEL-9527
+ */
+class ErrorDuringGracefullShutdownTest extends BaseNettyTest {
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                // mock server
+                from("netty:tcp://0.0.0.0:{{port}}?textline=true&disconnect=false")
+                        .log("Got request ${body}")
+                        .setBody(constant("response"));
+
+                from("direct:req")
+                        .to("netty:tcp://127.0.0.1:{{port}}?textline=true");
+            }
+        };
+    }
+
+    @Test
+    void shouldNotTriggerErrorDuringGracefullShutdown() throws Exception {
+        // given: successful request
+        assertThat(template.requestBody("direct:req", "test", String.class)).isEqualTo("response");
+
+        // when: context is closed
+        context().close();
+        while (context.getStatus() != ServiceStatus.Stopped) {
+            Thread.sleep(1);
+        }
+
+        // then: there should be no entries in log indicating that the callback was called twice
+        assertThat(LogCaptureAppender.hasEventsFor(DefaultErrorHandler.class)).isFalse();
+    }
+}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
index 89dce73..a5d8bfa 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Deque;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
@@ -29,6 +30,7 @@ import org.apache.logging.log4j.core.config.plugins.Plugin;
 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
 import org.apache.logging.log4j.core.config.plugins.PluginElement;
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.impl.MutableLogEvent;
 
 @Plugin(name = "LogCaptureAppender", category = "Core", elementType = "appender", printObject = true)
 public class LogCaptureAppender extends AbstractAppender {
@@ -52,7 +54,11 @@ public class LogCaptureAppender extends AbstractAppender {
 
     @Override
     public void append(LogEvent logEvent) {
-        LOG_EVENTS.add(logEvent);
+        if (logEvent instanceof MutableLogEvent) {
+            LOG_EVENTS.add(((MutableLogEvent) logEvent).createMemento());
+        } else {
+            LOG_EVENTS.add(logEvent);
+        }
     }
 
     public static void reset() {
@@ -62,4 +68,12 @@ public class LogCaptureAppender extends AbstractAppender {
     public static Collection<LogEvent> getEvents() {
         return LOG_EVENTS;
     }
+
+    public static Collection<LogEvent> getEvents(Class<?> cls) {
+        return LOG_EVENTS.stream().filter(e -> e.getLoggerName().equals(cls.getName())).collect(Collectors.toList());
+    }
+
+    public static boolean hasEventsFor(Class<?> cls) {
+        return LOG_EVENTS.stream().anyMatch(e -> e.getLoggerName().equals(cls.getName()));
+    }
 }
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
index 8b76fa1..e9656ff 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
@@ -18,9 +18,11 @@ package org.apache.camel.component.netty;
 
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.internal.logging.InternalLoggerFactory;
+import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * This test ensures LogCaptureAppender is configured properly
@@ -29,7 +31,9 @@ public class LogCaptureTest {
     @Test
     public void testCapture() {
         InternalLoggerFactory.getInstance(ResourceLeakDetector.class).error("testError");
-        assertFalse(LogCaptureAppender.getEvents().isEmpty());
+        assertFalse(LogCaptureAppender.getEvents(ResourceLeakDetector.class).isEmpty());
+        assertTrue(LogCaptureAppender.hasEventsFor(ResourceLeakDetector.class));
+        assertTrue(LogCaptureAppender.getEvents(DefaultErrorHandler.class).isEmpty());
         LogCaptureAppender.reset();
     }
 }
diff --git a/components/camel-netty/src/test/resources/log4j2.properties b/components/camel-netty/src/test/resources/log4j2.properties
index 5005aef..1646981 100644
--- a/components/camel-netty/src/test/resources/log4j2.properties
+++ b/components/camel-netty/src/test/resources/log4j2.properties
@@ -30,5 +30,7 @@ appender.capture.name=capture
 
 logger.leak.name = io.netty.util.ResourceLeakDetector
 logger.leak.appenderRef.capture.ref = capture
+logger.errorHandler.name = org.apache.camel.processor.errorhandler.DefaultErrorHandler
+logger.errorHandler.appenderRef.capture.ref = capture
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file