You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@camel.apache.org by ts...@apache.org on 2021/03/05 04:15:14 UTC
[camel] branch master updated: camel-netty - Small refactorings
This is an automated email from the ASF dual-hosted git repository.
tsato pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/camel.git
The following commit(s) were added to refs/heads/master by this push:
new 9757aa1 camel-netty - Small refactorings
9757aa1 is described below
commit 9757aa12e8a9c554cd93ebebceecf5dcc08b6a3c
Author: Tadayoshi Sato <sa...@gmail.com>
AuthorDate: Fri Mar 5 13:13:25 2021 +0900
camel-netty - Small refactorings
---
.../netty/http/DefaultNettyHttpBinding.java | 11 ++++-----
.../http/handlers/HttpServerChannelHandler.java | 2 +-
.../netty/handlers/ServerChannelHandler.java | 27 +++++++++-------------
3 files changed, 16 insertions(+), 24 deletions(-)
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
index 1bef299..c4fbcf8 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/DefaultNettyHttpBinding.java
@@ -309,8 +309,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
}
@Override
- public Message toCamelMessage(FullHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration)
- throws Exception {
+ public Message toCamelMessage(FullHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) {
LOG.trace("toCamelMessage: {}", response);
NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), null, response);
@@ -336,8 +335,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
}
@Override
- public Message toCamelMessage(InboundStreamHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration)
- throws Exception {
+ public Message toCamelMessage(InboundStreamHttpResponse response, Exchange exchange, NettyHttpConfiguration configuration) {
LOG.trace("toCamelMessage: {}", response);
NettyHttpMessage answer = new NettyHttpMessage(exchange.getContext(), null, null);
@@ -352,8 +350,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
@Override
public void populateCamelHeaders(
- HttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration)
- throws Exception {
+ HttpResponse response, Map<String, Object> headers, Exchange exchange, NettyHttpConfiguration configuration) {
LOG.trace("populateCamelHeaders: {}", response);
headers.put(Exchange.HTTP_RESPONSE_CODE, response.status().code());
@@ -445,7 +442,7 @@ public class DefaultNettyHttpBinding implements NettyHttpBinding, Cloneable {
HttpResponse response = null;
- if (response == null && body instanceof InputStream && configuration.isDisableStreamCache()) {
+ if (body instanceof InputStream && configuration.isDisableStreamCache()) {
response = new OutboundStreamHttpResponse(
(InputStream) body, new DefaultHttpResponse(HttpVersion.HTTP_1_1, HttpResponseStatus.valueOf(code)));
response.headers().set(TRANSFER_ENCODING, CHUNKED);
diff --git a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
index ded58a9..1831d64 100644
--- a/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
+++ b/components/camel-netty-http/src/main/java/org/apache/camel/component/netty/http/handlers/HttpServerChannelHandler.java
@@ -321,7 +321,7 @@ public class HttpServerChannelHandler extends ServerChannelHandler {
}
@Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) {
// only close if we are still allowed to run
if (consumer.isRunAllowed()) {
diff --git a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
index b40e0c6..7c0eaed 100644
--- a/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
+++ b/components/camel-netty/src/main/java/org/apache/camel/component/netty/handlers/ServerChannelHandler.java
@@ -21,7 +21,6 @@ import java.net.SocketAddress;
import io.netty.channel.ChannelFutureListener;
import io.netty.channel.ChannelHandlerContext;
import io.netty.channel.SimpleChannelInboundHandler;
-import org.apache.camel.AsyncCallback;
import org.apache.camel.Exchange;
import org.apache.camel.ExchangePattern;
import org.apache.camel.component.netty.NettyConstants;
@@ -82,9 +81,8 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
@Override
protected void channelRead0(ChannelHandlerContext ctx, Object msg) throws Exception {
- Object in = msg;
if (LOG.isDebugEnabled()) {
- LOG.debug("Channel: {} received body: {}", ctx.channel(), in);
+ LOG.debug("Channel: {} received body: {}", ctx.channel(), msg);
}
// create Exchange and let the consumer process it
@@ -147,20 +145,17 @@ public class ServerChannelHandler extends SimpleChannelInboundHandler<Object> {
}
private void processAsynchronously(final Exchange exchange, final ChannelHandlerContext ctx, final Object message) {
- consumer.getAsyncProcessor().process(exchange, new AsyncCallback() {
- @Override
- public void done(boolean doneSync) {
- // send back response if the communication is synchronous
- try {
- if (consumer.getConfiguration().isSync()) {
- sendResponse(message, ctx, exchange);
- }
- } catch (Throwable e) {
- consumer.getExceptionHandler().handleException(e);
- } finally {
- consumer.doneUoW(exchange);
- consumer.releaseExchange(exchange, false);
+ consumer.getAsyncProcessor().process(exchange, doneSync -> {
+ // send back response if the communication is synchronous
+ try {
+ if (consumer.getConfiguration().isSync()) {
+ sendResponse(message, ctx, exchange);
}
+ } catch (Throwable e) {
+ consumer.getExceptionHandler().handleException(e);
+ } finally {
+ consumer.doneUoW(exchange);
+ consumer.releaseExchange(exchange, false);
}
});
}