You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/18 16:01:58 UTC

[camel] branch master updated: CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)

This is an automated email from the ASF dual-hosted git repository.

davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git


The following commit(s) were added to refs/heads/master by this push:
     new 63d8f64  CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)
63d8f64 is described below

commit 63d8f64a1f8db141c2a3865b11fad3d08566852c
Author: k-jamroz <79...@users.noreply.github.com>
AuthorDate: Thu Feb 18 17:01:28 2021 +0100

    CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)
    
    * CAMEL-9527: Should not log stacktrace when client has received reply - regression test
    
    * CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork
    
    Co-authored-by: Krzysztof Jamróz <kr...@apdu.pl>
---
 components/camel-netty/pom.xml                     |  5 ++
 .../camel/component/netty/NettyCamelState.java     | 16 +++++
 .../netty/handlers/ClientChannelHandler.java       | 22 +++---
 .../camel/component/netty/BaseNettyTest.java       |  2 +-
 .../netty/EnrichWithoutRestResponseTest.java       | 81 ++++++++++++++++++++++
 .../netty/ErrorDuringGracefullShutdownTest.java    | 62 +++++++++++++++++
 .../camel/component/netty/LogCaptureAppender.java  | 16 ++++-
 .../camel/component/netty/LogCaptureTest.java      |  6 +-
 .../src/test/resources/log4j2.properties           |  2 +
 9 files changed, 195 insertions(+), 17 deletions(-)

diff --git a/components/camel-netty/pom.xml b/components/camel-netty/pom.xml
index 38f9fc7..5245142 100644
--- a/components/camel-netty/pom.xml
+++ b/components/camel-netty/pom.xml
@@ -101,6 +101,11 @@
             <artifactId>junit-jupiter</artifactId>
             <scope>test</scope>
         </dependency>
+        <dependency>
+            <groupId>org.assertj</groupId>
+            <artifactId>assertj-core</artifactId>
+            <scope>test</scope>
+        </dependency>
         <!-- logging -->
         <dependency>
             <groupId>org.apache.logging.log4j</groupId>
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
index 074dab5..389ac99 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
@@ -16,6 +16,8 @@
  */
 package org.apache.camel.component.netty;
 
+import java.util.concurrent.atomic.AtomicBoolean;
+
 import org.apache.camel.AsyncCallback;
 import org.apache.camel.Exchange;
 
@@ -30,16 +32,30 @@ public final class NettyCamelState {
 
     private final Exchange exchange;
     private final AsyncCallback callback;
+    // It is never a good idea to call the same callback twice
+    private final AtomicBoolean callbackCalled;
 
     public NettyCamelState(AsyncCallback callback, Exchange exchange) {
         this.callback = callback;
         this.exchange = exchange;
+        this.callbackCalled = new AtomicBoolean();
     }
 
     public AsyncCallback getCallback() {
         return callback;
     }
 
+    public boolean isDone() {
+        return callbackCalled.get();
+    }
+
+    public void callbackDoneOnce(boolean doneSync) {
+        if (!callbackCalled.getAndSet(true)) {
+            // this is the first time we call the callback
+            callback.done(doneSync);
+        }
+    }
+
     public Exchange getExchange() {
         return exchange;
     }
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index ee7a43c..0000818 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.netty.handlers;
 import io.netty.channel.ChannelHandler;
 import io.netty.channel.ChannelHandlerContext;
 import io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.camel.AsyncCallback;
 import org.apache.camel.CamelExchangeException;
 import org.apache.camel.Exchange;
 import org.apache.camel.Message;
@@ -83,10 +82,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
 
         NettyCamelState state = getState(ctx, cause);
         Exchange exchange = state != null ? state.getExchange() : null;
-        AsyncCallback callback = state != null ? state.getCallback() : null;
 
         // the state may not be set
-        if (exchange != null && callback != null) {
+        if (exchange != null) {
             Throwable initialCause = exchange.getException();
             if (initialCause != null && initialCause.getCause() == null) {
                 initialCause.initCause(cause);
@@ -99,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             NettyHelper.close(ctx.channel());
 
             // signal callback
-            callback.done(false);
+            state.callbackDoneOnce(false);
         }
     }
 
@@ -111,7 +109,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
 
         NettyCamelState state = getState(ctx, null);
         Exchange exchange = state != null ? state.getExchange() : null;
-        AsyncCallback callback = state != null ? state.getCallback() : null;
+        // this channel is maybe closing graceful and the callback could already have been called
+        // and if so we should not trigger an exception nor invoke callback second time
+        boolean doneUoW = state != null ? state.isDone() : false;
 
         // remove state
         producer.getCorrelationManager().removeState(ctx, ctx.channel());
@@ -120,10 +120,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
         producer.getAllChannels().remove(ctx.channel());
 
         if (exchange != null && !disconnecting) {
-            // this channel is maybe closing graceful and the exchange is already done
-            // and if so we should not trigger an exception
-            boolean doneUoW = exchange.getUnitOfWork() == null;
-
             NettyConfiguration configuration = producer.getConfiguration();
             if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) {
                 // To avoid call the callback.done twice
@@ -140,7 +136,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
                             new CamelExchangeException("No response received from remote server: " + address, exchange));
                 }
                 // signal callback
-                callback.done(false);
+                state.callbackDoneOnce(false);
             }
         }
 
@@ -171,14 +167,13 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
             // we just ignore the received message as the channel is closed
             return;
         }
-        AsyncCallback callback = state.getCallback();
 
         Message message;
         try {
             message = getResponseMessage(exchange, ctx, msg);
         } catch (Exception e) {
             exchange.setException(e);
-            callback.done(false);
+            state.callbackDoneOnce(false);
             return;
         }
 
@@ -225,8 +220,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
                 NettyHelper.close(ctx.channel());
             }
         } finally {
-            // signal callback
-            callback.done(false);
+            state.callbackDoneOnce(false);
         }
     }
 
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
index 283f9318..b98e501 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
@@ -58,7 +58,7 @@ public class BaseNettyTest extends CamelTestSupport {
         System.gc();
         // Kick leak detection logging
         ByteBufAllocator.DEFAULT.buffer(1).release();
-        Collection<LogEvent> events = LogCaptureAppender.getEvents();
+        Collection<LogEvent> events = LogCaptureAppender.getEvents(ResourceLeakDetector.class);
         if (!events.isEmpty()) {
             String message = "Leaks detected while running tests: " + events;
             // Just write the message into log to help debug
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java
new file mode 100644
index 0000000..5789360
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/**
+ * Regression test for CAMEL-16178
+ */
+class EnrichWithoutRestResponseTest extends BaseNettyTest {
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                // mock server - accepts connection and immediately disconnects without any response
+                from("netty:tcp://0.0.0.0:{{port}}?disconnect=true")
+                        .log("Got request ${body}")
+                        .setBody(constant(null));
+
+                // test routes
+                final String nettyClientUri
+                        = "netty:tcp://127.0.0.1:{{port}}?textline=true&connectTimeout=1000&requestTimeout=1000";
+                from("direct:reqTo")
+                        .to(nettyClientUri);
+                from("direct:reqEnrich")
+                        .enrich(nettyClientUri);
+                from("direct:reqEnrichShareUoW")
+                        .enrich(nettyClientUri, new UseLatestAggregationStrategy(), true, true);
+            }
+        };
+    }
+
+    @Test
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    void toTest() {
+        assertThatExceptionOfType(CamelExecutionException.class)
+                .isThrownBy(() -> template.requestBody("direct:reqTo", ""))
+                .havingCause().withMessageContaining("No response received from remote server");
+    }
+
+    @Test
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    void enrichTest() {
+        assertThatExceptionOfType(CamelExecutionException.class)
+                .isThrownBy(() -> template.requestBody("direct:reqEnrich", ""))
+                .havingCause().withMessageContaining("No response received from remote server");
+    }
+
+    @Test
+    @Timeout(value = 10, unit = TimeUnit.SECONDS)
+    void enrichShareUoWTest() {
+        assertThatExceptionOfType(CamelExecutionException.class)
+                .isThrownBy(() -> template.requestBody("direct:reqEnrichShareUoW", ""))
+                .havingCause().withMessageContaining("No response received from remote server");
+    }
+}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
new file mode 100644
index 0000000..6771cc5
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements.  See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License.  You may obtain a copy of the License at
+ *
+ *      http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Regression test for CAMEL-9527
+ */
+class ErrorDuringGracefullShutdownTest extends BaseNettyTest {
+    @Override
+    protected RoutesBuilder createRouteBuilder() throws Exception {
+        return new RouteBuilder() {
+
+            @Override
+            public void configure() throws Exception {
+                // mock server
+                from("netty:tcp://0.0.0.0:{{port}}?textline=true&disconnect=false")
+                        .log("Got request ${body}")
+                        .setBody(constant("response"));
+
+                from("direct:req")
+                        .to("netty:tcp://127.0.0.1:{{port}}?textline=true");
+            }
+        };
+    }
+
+    @Test
+    void shouldNotTriggerErrorDuringGracefullShutdown() throws Exception {
+        // given: successful request
+        assertThat(template.requestBody("direct:req", "test", String.class)).isEqualTo("response");
+
+        // when: context is closed
+        context().close();
+        while (context.getStatus() != ServiceStatus.Stopped) {
+            Thread.sleep(1);
+        }
+
+        // then: there should be no entries in log indicating that the callback was called twice
+        assertThat(LogCaptureAppender.hasEventsFor(DefaultErrorHandler.class)).isFalse();
+    }
+}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
index 89dce73..a5d8bfa 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
 import java.util.ArrayDeque;
 import java.util.Collection;
 import java.util.Deque;
+import java.util.stream.Collectors;
 
 import org.apache.logging.log4j.core.Filter;
 import org.apache.logging.log4j.core.Layout;
@@ -29,6 +30,7 @@ import org.apache.logging.log4j.core.config.plugins.Plugin;
 import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
 import org.apache.logging.log4j.core.config.plugins.PluginElement;
 import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.impl.MutableLogEvent;
 
 @Plugin(name = "LogCaptureAppender", category = "Core", elementType = "appender", printObject = true)
 public class LogCaptureAppender extends AbstractAppender {
@@ -52,7 +54,11 @@ public class LogCaptureAppender extends AbstractAppender {
 
     @Override
     public void append(LogEvent logEvent) {
-        LOG_EVENTS.add(logEvent);
+        if (logEvent instanceof MutableLogEvent) {
+            LOG_EVENTS.add(((MutableLogEvent) logEvent).createMemento());
+        } else {
+            LOG_EVENTS.add(logEvent);
+        }
     }
 
     public static void reset() {
@@ -62,4 +68,12 @@ public class LogCaptureAppender extends AbstractAppender {
     public static Collection<LogEvent> getEvents() {
         return LOG_EVENTS;
     }
+
+    public static Collection<LogEvent> getEvents(Class<?> cls) {
+        return LOG_EVENTS.stream().filter(e -> e.getLoggerName().equals(cls.getName())).collect(Collectors.toList());
+    }
+
+    public static boolean hasEventsFor(Class<?> cls) {
+        return LOG_EVENTS.stream().anyMatch(e -> e.getLoggerName().equals(cls.getName()));
+    }
 }
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
index 8b76fa1..e9656ff 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
@@ -18,9 +18,11 @@ package org.apache.camel.component.netty;
 
 import io.netty.util.ResourceLeakDetector;
 import io.netty.util.internal.logging.InternalLoggerFactory;
+import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
 import org.junit.jupiter.api.Test;
 
 import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
 
 /**
  * This test ensures LogCaptureAppender is configured properly
@@ -29,7 +31,9 @@ public class LogCaptureTest {
     @Test
     public void testCapture() {
         InternalLoggerFactory.getInstance(ResourceLeakDetector.class).error("testError");
-        assertFalse(LogCaptureAppender.getEvents().isEmpty());
+        assertFalse(LogCaptureAppender.getEvents(ResourceLeakDetector.class).isEmpty());
+        assertTrue(LogCaptureAppender.hasEventsFor(ResourceLeakDetector.class));
+        assertTrue(LogCaptureAppender.getEvents(DefaultErrorHandler.class).isEmpty());
         LogCaptureAppender.reset();
     }
 }
diff --git a/components/camel-netty/src/test/resources/log4j2.properties b/components/camel-netty/src/test/resources/log4j2.properties
index 5005aef..1646981 100644
--- a/components/camel-netty/src/test/resources/log4j2.properties
+++ b/components/camel-netty/src/test/resources/log4j2.properties
@@ -30,5 +30,7 @@ appender.capture.name=capture
 
 logger.leak.name = io.netty.util.ResourceLeakDetector
 logger.leak.appenderRef.capture.ref = capture
+logger.errorHandler.name = org.apache.camel.processor.errorhandler.DefaultErrorHandler
+logger.errorHandler.appenderRef.capture.ref = capture
 rootLogger.level = INFO
 rootLogger.appenderRef.file.ref = file