You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by da...@apache.org on 2021/02/18 16:01:58 UTC
[camel] branch master updated: CAMEL-16178: camel-netty should
invoke callback exactly once even when there was no UnitOfWork (#5093)
This is an automated email from the ASF dual-hosted git repository.
davsclaus pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 63d8f64 CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)
63d8f64 is described below
commit 63d8f64a1f8db141c2a3865b11fad3d08566852c
Author: k-jamroz <79...@users.noreply.github.com>
AuthorDate: Thu Feb 18 17:01:28 2021 +0100
CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork (#5093)
* CAMEL-9527: Should not log stacktrace when client has received reply - regression test
* CAMEL-16178: camel-netty should invoke callback exactly once even when there was no UnitOfWork
Co-authored-by: Krzysztof Jamróz <kr...@apdu.pl>
---
components/camel-netty/pom.xml | 5 ++
.../camel/component/netty/NettyCamelState.java | 16 +++++
.../netty/handlers/ClientChannelHandler.java | 22 +++---
.../camel/component/netty/BaseNettyTest.java | 2 +-
.../netty/EnrichWithoutRestResponseTest.java | 81 ++++++++++++++++++++++
.../netty/ErrorDuringGracefullShutdownTest.java | 62 +++++++++++++++++
.../camel/component/netty/LogCaptureAppender.java | 16 ++++-
.../camel/component/netty/LogCaptureTest.java | 6 +-
.../src/test/resources/log4j2.properties | 2 +
9 files changed, 195 insertions(+), 17 deletions(-)
diff --git a/components/camel-netty/pom.xml b/components/camel-netty/pom.xml
index 38f9fc7..5245142 100644
--- a/components/camel-netty/pom.xml
+++ b/components/camel-netty/pom.xml
@@ -101,6 +101,11 @@
<artifactId>junit-jupiter</artifactId>
<scope>test</scope>
</dependency>
+ <dependency>
+ <groupId>org.assertj</groupId>
+ <artifactId>assertj-core</artifactId>
+ <scope>test</scope>
+ </dependency>
<!-- logging -->
<dependency>
<groupId>org.apache.logging.log4j</groupId>
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
index 074dab5..389ac99 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/NettyCamelState.java
@@ -16,6 +16,8 @@
*/
package org.apache.camel.component.netty;
+import java.util.concurrent.atomic.AtomicBoolean;
+
import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
@@ -30,16 +32,30 @@ public final class NettyCamelState {
private final Exchange exchange;
private final AsyncCallback callback;
+ // It is never a good idea to call the same callback twice
+ private final AtomicBoolean callbackCalled;
public NettyCamelState(AsyncCallback callback, Exchange exchange) {
this.callback = callback;
this.exchange = exchange;
+ this.callbackCalled = new AtomicBoolean();
}
public AsyncCallback getCallback() {
return callback;
}
+ public boolean isDone() {
+ return callbackCalled.get();
+ }
+
+ public void callbackDoneOnce(boolean doneSync) {
+ if (!callbackCalled.getAndSet(true)) {
+ // this is the first time we call the callback
+ callback.done(doneSync);
+ }
+ }
+
public Exchange getExchange() {
return exchange;
}
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
index ee7a43c..0000818 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ClientChannelHandler.java
@@ -19,7 +19,6 @@ package org.apache.camel.component.netty.handlers;
import io.netty.channel.ChannelHandler;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.CamelExchangeException;
import org.apache.camel.Exchange;
import org.apache.camel.Message;
@@ -83,10 +82,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
NettyCamelState state = getState(ctx, cause);
Exchange exchange = state != null ? state.getExchange() : null;
- AsyncCallback callback = state != null ? state.getCallback() : null;
// the state may not be set
- if (exchange != null && callback != null) {
+ if (exchange != null) {
Throwable initialCause = exchange.getException();
if (initialCause != null && initialCause.getCause() == null) {
initialCause.initCause(cause);
@@ -99,7 +97,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
NettyHelper.close(ctx.channel());
// signal callback
- callback.done(false);
+ state.callbackDoneOnce(false);
}
}
@@ -111,7 +109,9 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
NettyCamelState state = getState(ctx, null);
Exchange exchange = state != null ? state.getExchange() : null;
- AsyncCallback callback = state != null ? state.getCallback() : null;
+ // this channel is maybe closing graceful and the callback could already have been called
+ // and if so we should not trigger an exception nor invoke callback second time
+ boolean doneUoW = state != null ? state.isDone() : false;
// remove state
producer.getCorrelationManager().removeState(ctx, ctx.channel());
@@ -120,10 +120,6 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
producer.getAllChannels().remove(ctx.channel());
if (exchange != null && !disconnecting) {
- // this channel is maybe closing graceful and the exchange is already done
- // and if so we should not trigger an exception
- boolean doneUoW = exchange.getUnitOfWork() == null;
-
NettyConfiguration configuration = producer.getConfiguration();
if (configuration.isSync() && !doneUoW && !messageReceived && !exceptionHandled) {
// To avoid call the callback.done twice
@@ -140,7 +136,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
new CamelExchangeException("No response received from remote server: " + address, exchange));
}
// signal callback
- callback.done(false);
+ state.callbackDoneOnce(false);
}
}
@@ -171,14 +167,13 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
// we just ignore the received message as the channel is closed
return;
}
- AsyncCallback callback = state.getCallback();
Message message;
try {
message = getResponseMessage(exchange, ctx, msg);
} catch (Exception e) {
exchange.setException(e);
- callback.done(false);
+ state.callbackDoneOnce(false);
return;
}
@@ -225,8 +220,7 @@ public class ClientChannelHandler extends SimpleChannelInboundHandler<Object> {
NettyHelper.close(ctx.channel());
}
} finally {
- // signal callback
- callback.done(false);
+ state.callbackDoneOnce(false);
}
}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
index 283f9318..b98e501 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/BaseNettyTest.java
@@ -58,7 +58,7 @@ public class BaseNettyTest extends CamelTestSupport {
System.gc();
// Kick leak detection logging
ByteBufAllocator.DEFAULT.buffer(1).release();
- Collection<LogEvent> events = LogCaptureAppender.getEvents();
+ Collection<LogEvent> events = LogCaptureAppender.getEvents(ResourceLeakDetector.class);
if (!events.isEmpty()) {
String message = "Leaks detected while running tests: " + events;
// Just write the message into log to help debug
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java
new file mode 100644
index 0000000..5789360
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/EnrichWithoutRestResponseTest.java
@@ -0,0 +1,81 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import java.util.concurrent.TimeUnit;
+
+import org.apache.camel.CamelExecutionException;
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.aggregate.UseLatestAggregationStrategy;
+import org.junit.jupiter.api.Test;
+import org.junit.jupiter.api.Timeout;
+
+import static org.assertj.core.api.Assertions.assertThatExceptionOfType;
+
+/**
+ * Regression test for CAMEL-16178
+ */
+class EnrichWithoutRestResponseTest extends BaseNettyTest {
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ // mock server - accepts connection and immediately disconnects without any response
+ from("netty:tcp://0.0.0.0:{{port}}?disconnect=true")
+ .log("Got request ${body}")
+ .setBody(constant(null));
+
+ // test routes
+ final String nettyClientUri
+ = "netty:tcp://127.0.0.1:{{port}}?textline=true&connectTimeout=1000&requestTimeout=1000";
+ from("direct:reqTo")
+ .to(nettyClientUri);
+ from("direct:reqEnrich")
+ .enrich(nettyClientUri);
+ from("direct:reqEnrichShareUoW")
+ .enrich(nettyClientUri, new UseLatestAggregationStrategy(), true, true);
+ }
+ };
+ }
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.SECONDS)
+ void toTest() {
+ assertThatExceptionOfType(CamelExecutionException.class)
+ .isThrownBy(() -> template.requestBody("direct:reqTo", ""))
+ .havingCause().withMessageContaining("No response received from remote server");
+ }
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.SECONDS)
+ void enrichTest() {
+ assertThatExceptionOfType(CamelExecutionException.class)
+ .isThrownBy(() -> template.requestBody("direct:reqEnrich", ""))
+ .havingCause().withMessageContaining("No response received from remote server");
+ }
+
+ @Test
+ @Timeout(value = 10, unit = TimeUnit.SECONDS)
+ void enrichShareUoWTest() {
+ assertThatExceptionOfType(CamelExecutionException.class)
+ .isThrownBy(() -> template.requestBody("direct:reqEnrichShareUoW", ""))
+ .havingCause().withMessageContaining("No response received from remote server");
+ }
+}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
new file mode 100644
index 0000000..6771cc5
--- /dev/null
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/ErrorDuringGracefullShutdownTest.java
@@ -0,0 +1,62 @@
+/*
+ * Licensed to the Apache Software Foundation (ASF) under one or more
+ * contributor license agreements. See the NOTICE file distributed with
+ * this work for additional information regarding copyright ownership.
+ * The ASF licenses this file to You under the Apache License, Version 2.0
+ * (the "License"); you may not use this file except in compliance with
+ * the License. You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ */
+package org.apache.camel.component.netty;
+
+import org.apache.camel.RoutesBuilder;
+import org.apache.camel.ServiceStatus;
+import org.apache.camel.builder.RouteBuilder;
+import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
+import org.junit.jupiter.api.Test;
+
+import static org.assertj.core.api.Assertions.assertThat;
+
+/**
+ * Regression test for CAMEL-9527
+ */
+class ErrorDuringGracefullShutdownTest extends BaseNettyTest {
+ @Override
+ protected RoutesBuilder createRouteBuilder() throws Exception {
+ return new RouteBuilder() {
+
+ @Override
+ public void configure() throws Exception {
+ // mock server
+ from("netty:tcp://0.0.0.0:{{port}}?textline=true&disconnect=false")
+ .log("Got request ${body}")
+ .setBody(constant("response"));
+
+ from("direct:req")
+ .to("netty:tcp://127.0.0.1:{{port}}?textline=true");
+ }
+ };
+ }
+
+ @Test
+ void shouldNotTriggerErrorDuringGracefullShutdown() throws Exception {
+ // given: successful request
+ assertThat(template.requestBody("direct:req", "test", String.class)).isEqualTo("response");
+
+ // when: context is closed
+ context().close();
+ while (context.getStatus() != ServiceStatus.Stopped) {
+ Thread.sleep(1);
+ }
+
+ // then: there should be no entries in log indicating that the callback was called twice
+ assertThat(LogCaptureAppender.hasEventsFor(DefaultErrorHandler.class)).isFalse();
+ }
+}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
index 89dce73..a5d8bfa 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureAppender.java
@@ -20,6 +20,7 @@ import java.io.Serializable;
import java.util.ArrayDeque;
import java.util.Collection;
import java.util.Deque;
+import java.util.stream.Collectors;
import org.apache.logging.log4j.core.Filter;
import org.apache.logging.log4j.core.Layout;
@@ -29,6 +30,7 @@ import org.apache.logging.log4j.core.config.plugins.Plugin;
import org.apache.logging.log4j.core.config.plugins.PluginAttribute;
import org.apache.logging.log4j.core.config.plugins.PluginElement;
import org.apache.logging.log4j.core.config.plugins.PluginFactory;
+import org.apache.logging.log4j.core.impl.MutableLogEvent;
@Plugin(name = "LogCaptureAppender", category = "Core", elementType = "appender", printObject = true)
public class LogCaptureAppender extends AbstractAppender {
@@ -52,7 +54,11 @@ public class LogCaptureAppender extends AbstractAppender {
@Override
public void append(LogEvent logEvent) {
- LOG_EVENTS.add(logEvent);
+ if (logEvent instanceof MutableLogEvent) {
+ LOG_EVENTS.add(((MutableLogEvent) logEvent).createMemento());
+ } else {
+ LOG_EVENTS.add(logEvent);
+ }
}
public static void reset() {
@@ -62,4 +68,12 @@ public class LogCaptureAppender extends AbstractAppender {
public static Collection<LogEvent> getEvents() {
return LOG_EVENTS;
}
+
+ public static Collection<LogEvent> getEvents(Class<?> cls) {
+ return LOG_EVENTS.stream().filter(e -> e.getLoggerName().equals(cls.getName())).collect(Collectors.toList());
+ }
+
+ public static boolean hasEventsFor(Class<?> cls) {
+ return LOG_EVENTS.stream().anyMatch(e -> e.getLoggerName().equals(cls.getName()));
+ }
}
diff --git a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
index 8b76fa1..e9656ff 100644
--- a/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
+++ b/components/camel-netty/src/test/java/org/apache/camel/component/netty/LogCaptureTest.java
@@ -18,9 +18,11 @@ package org.apache.camel.component.netty;
import io.netty.util.ResourceLeakDetector;
import io.netty.util.internal.logging.InternalLoggerFactory;
+import org.apache.camel.processor.errorhandler.DefaultErrorHandler;
import org.junit.jupiter.api.Test;
import static org.junit.jupiter.api.Assertions.assertFalse;
+import static org.junit.jupiter.api.Assertions.assertTrue;
/**
* This test ensures LogCaptureAppender is configured properly
@@ -29,7 +31,9 @@ public class LogCaptureTest {
@Test
public void testCapture() {
InternalLoggerFactory.getInstance(ResourceLeakDetector.class).error("testError");
- assertFalse(LogCaptureAppender.getEvents().isEmpty());
+ assertFalse(LogCaptureAppender.getEvents(ResourceLeakDetector.class).isEmpty());
+ assertTrue(LogCaptureAppender.hasEventsFor(ResourceLeakDetector.class));
+ assertTrue(LogCaptureAppender.getEvents(DefaultErrorHandler.class).isEmpty());
LogCaptureAppender.reset();
}
}
diff --git a/components/camel-netty/src/test/resources/log4j2.properties b/components/camel-netty/src/test/resources/log4j2.properties
index 5005aef..1646981 100644
--- a/components/camel-netty/src/test/resources/log4j2.properties
+++ b/components/camel-netty/src/test/resources/log4j2.properties
@@ -30,5 +30,7 @@ appender.capture.name=capture
logger.leak.name = io.netty.util.ResourceLeakDetector
logger.leak.appenderRef.capture.ref = capture
+logger.errorHandler.name = org.apache.camel.processor.errorhandler.DefaultErrorHandler
+logger.errorHandler.appenderRef.capture.ref = capture
rootLogger.level = INFO
rootLogger.appenderRef.file.ref = file