You are viewing a plain text version of this content. The canonical link for it is here.
Posted to notifications@james.apache.org by bt...@apache.org on 2022/03/22 02:01:59 UTC
[james-project] 28/29: JAMES-3715 Rename: UpstreamHandler -> InboundHandler
This is an automated email from the ASF dual-hosted git repository.
btellier pushed a commit to branch master
in repository https://gitbox.apache.org/repos/asf/james-project.git
commit 1a5c3a7edacc3c07103be77d6e883d9fcdccb06e
Author: Benoit Tellier <bt...@linagora.com>
AuthorDate: Mon Mar 21 10:44:00 2022 +0700
JAMES-3715 Rename: UpstreamHandler -> InboundHandler
---
...andler.java => BasicChannelInboundHandler.java} | 508 ++++++++++-----------
.../apache/james/protocols/netty/NettyServer.java | 2 +-
.../apache/james/lmtpserver/netty/LMTPServer.java | 4 +-
.../apache/james/pop3server/netty/POP3Server.java | 4 +-
...Handler.java => SMTPChannelInboundHandler.java} | 166 +++----
.../apache/james/smtpserver/netty/SMTPServer.java | 2 +-
6 files changed, 343 insertions(+), 343 deletions(-)
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelInboundHandler.java
similarity index 94%
rename from protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
rename to protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelInboundHandler.java
index 96a9d0d..7fc2fbc 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelUpstreamHandler.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/BasicChannelInboundHandler.java
@@ -1,254 +1,254 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.protocols.netty;
-
-import static org.apache.james.protocols.api.ProtocolSession.State.Connection;
-
-import java.io.Closeable;
-import java.nio.channels.ClosedChannelException;
-import java.util.Deque;
-import java.util.LinkedList;
-import java.util.List;
-import java.util.Optional;
-import java.util.concurrent.ConcurrentLinkedDeque;
-
-import javax.net.ssl.SSLEngine;
-
-import org.apache.james.protocols.api.CommandDetectionSession;
-import org.apache.james.protocols.api.Encryption;
-import org.apache.james.protocols.api.Protocol;
-import org.apache.james.protocols.api.ProtocolSession;
-import org.apache.james.protocols.api.ProtocolSessionImpl;
-import org.apache.james.protocols.api.ProtocolTransport;
-import org.apache.james.protocols.api.Response;
-import org.apache.james.protocols.api.handler.ConnectHandler;
-import org.apache.james.protocols.api.handler.DisconnectHandler;
-import org.apache.james.protocols.api.handler.LineHandler;
-import org.apache.james.protocols.api.handler.ProtocolHandlerChain;
-import org.apache.james.protocols.api.handler.ProtocolHandlerResultHandler;
-import org.apache.james.util.MDCBuilder;
-import org.slf4j.Logger;
-import org.slf4j.LoggerFactory;
-
-import com.google.common.collect.Iterables;
-
-import io.netty.buffer.ByteBuf;
-import io.netty.channel.Channel;
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.channel.ChannelInboundHandlerAdapter;
-import io.netty.handler.codec.TooLongFrameException;
-import io.netty.util.AttributeKey;
-
-/**
- * {@link ChannelInboundHandlerAdapter} which is used by the SMTPServer and other line based protocols
- */
-public class BasicChannelUpstreamHandler extends ChannelInboundHandlerAdapter implements LineHandlerAware {
- private static final Logger LOGGER = LoggerFactory.getLogger(BasicChannelUpstreamHandler.class);
- public static final ProtocolSession.AttachmentKey<MDCBuilder> MDC_ATTRIBUTE_KEY = ProtocolSession.AttachmentKey.of("bound_MDC", MDCBuilder.class);
- public static final AttributeKey<CommandDetectionSession> SESSION_ATTRIBUTE_KEY =
- AttributeKey.valueOf("session");
-
- private final ProtocolMDCContextFactory mdcContextFactory;
- protected final Protocol protocol;
- protected final ProtocolHandlerChain chain;
- protected final Encryption secure;
- private final Deque<ChannelInboundHandlerAdapter> behaviourOverrides = new ConcurrentLinkedDeque<>();
-
- public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol) {
- this(mdcContextFactory, protocol, null);
- }
-
- public BasicChannelUpstreamHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure) {
- this.mdcContextFactory = mdcContextFactory;
- this.protocol = protocol;
- this.chain = protocol.getProtocolChain();
- this.secure = secure;
- }
-
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- MDCBuilder boundMDC = mdcContextFactory.onBound(protocol, ctx);
- try (Closeable closeable = boundMDC.build()) {
- ProtocolSession session = createSession(ctx);
- session.setAttachment(MDC_ATTRIBUTE_KEY, boundMDC, Connection);
- ctx.channel().attr(SESSION_ATTRIBUTE_KEY).set(session);
-
- List<ConnectHandler> connectHandlers = chain.getHandlers(ConnectHandler.class);
- List<ProtocolHandlerResultHandler> resultHandlers = chain.getHandlers(ProtocolHandlerResultHandler.class);
-
- LOGGER.info("Connection established from {}", session.getRemoteAddress().getAddress().getHostAddress());
- if (connectHandlers != null) {
- for (ConnectHandler cHandler : connectHandlers) {
- long start = System.currentTimeMillis();
- Response response = cHandler.onConnect(session);
- long executionTime = System.currentTimeMillis() - start;
-
- for (ProtocolHandlerResultHandler resultHandler : resultHandlers) {
- resultHandler.onResponse(session, response, executionTime, cHandler);
- }
- if (response != null) {
- // TODO: This kind of sucks but I was able to come up with something more elegant here
- ((ProtocolSessionImpl) session).getProtocolTransport().writeResponse(response, session);
- }
-
- }
- }
-
- super.channelActive(ctx);
- }
- }
-
- private MDCBuilder mdc(ChannelHandlerContext ctx) {
- ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
-
- return Optional.ofNullable(session)
- .flatMap(s -> s.getAttachment(MDC_ATTRIBUTE_KEY, Connection))
- .map(mdc -> mdcContextFactory.withContext(session)
- .addToContext(mdc))
- .orElseGet(MDCBuilder::create);
- }
-
- @SuppressWarnings({ "rawtypes", "unchecked" })
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- try (Closeable closeable = mdc(ctx).build()) {
- List<DisconnectHandler> connectHandlers = chain.getHandlers(DisconnectHandler.class);
- ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
- if (connectHandlers != null) {
- for (DisconnectHandler connectHandler : connectHandlers) {
- connectHandler.onDisconnect(session);
- }
- }
- LOGGER.info("Connection closed for {}", session.getRemoteAddress().getAddress().getHostAddress());
- cleanup(ctx);
- super.channelInactive(ctx);
- }
- }
-
-
- /**
- * Call the {@link LineHandler}
- */
- @SuppressWarnings({ "unchecked", "rawtypes" })
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- ChannelInboundHandlerAdapter override = Iterables.getFirst(behaviourOverrides, null);
- if (override != null) {
- override.channelRead(ctx, msg);
- return;
- }
-
- try (Closeable closeable = mdc(ctx).build()) {
- ProtocolSession pSession = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
- LinkedList<LineHandler> lineHandlers = chain.getHandlers(LineHandler.class);
- LinkedList<ProtocolHandlerResultHandler> resultHandlers = chain.getHandlers(ProtocolHandlerResultHandler.class);
-
-
- if (lineHandlers.size() > 0) {
-
- ByteBuf buf = (ByteBuf) msg;
- LineHandler lHandler = (LineHandler) lineHandlers.getLast();
- long start = System.currentTimeMillis();
- Response response = lHandler.onLine(pSession, buf.nioBuffer());
- long executionTime = System.currentTimeMillis() - start;
-
- for (ProtocolHandlerResultHandler resultHandler : resultHandlers) {
- response = resultHandler.onResponse(pSession, response, executionTime, lHandler);
- }
- if (response != null) {
- // TODO: This kind of sucks but I was able to come up with something more elegant here
- ((ProtocolSessionImpl) pSession).getProtocolTransport().writeResponse(response, pSession);
- }
-
- }
-
- ((ByteBuf) msg).release();
- super.channelReadComplete(ctx);
- }
- }
-
-
- /**
- * Cleanup the channel
- */
- protected void cleanup(ChannelHandlerContext ctx) {
- ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).getAndRemove();
- if (session != null) {
- session.resetState();
- }
- ctx.close();
- }
-
-
-
- protected ProtocolSession createSession(ChannelHandlerContext ctx) throws Exception {
- SSLEngine engine = null;
- if (secure != null) {
- engine = secure.createSSLEngine();
- }
-
- return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine));
- }
-
- @Override
- public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
- try (Closeable closeable = mdc(ctx).build()) {
- Channel channel = ctx.channel();
- ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
- if (cause instanceof TooLongFrameException && session != null) {
- Response r = session.newLineTooLongResponse();
- ProtocolTransport transport = ((ProtocolSessionImpl) session).getProtocolTransport();
- if (r != null) {
- transport.writeResponse(r, session);
- }
- } else {
- if (channel.isActive() && session != null) {
- ProtocolTransport transport = ((ProtocolSessionImpl) session).getProtocolTransport();
-
- Response r = session.newFatalErrorResponse();
- if (r != null) {
- transport.writeResponse(r, session);
- }
- transport.writeResponse(Response.DISCONNECT, session);
- }
- if (cause instanceof ClosedChannelException) {
- LOGGER.info("Channel closed before we could send in flight messages to the users (ClosedChannelException): {}", cause.getMessage());
- } else {
- LOGGER.error("Unable to process request", cause);
- }
- ctx.close();
- }
- }
- }
-
- @Override
- public void pushLineHandler(ChannelInboundHandlerAdapter lineHandlerUpstreamHandler) {
- behaviourOverrides.addFirst(lineHandlerUpstreamHandler);
- }
-
- @Override
- public void popLineHandler() {
- if (!behaviourOverrides.isEmpty()) {
- behaviourOverrides.removeFirst();
- }
- }
-
-}
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.protocols.netty;
+
+import static org.apache.james.protocols.api.ProtocolSession.State.Connection;
+
+import java.io.Closeable;
+import java.nio.channels.ClosedChannelException;
+import java.util.Deque;
+import java.util.LinkedList;
+import java.util.List;
+import java.util.Optional;
+import java.util.concurrent.ConcurrentLinkedDeque;
+
+import javax.net.ssl.SSLEngine;
+
+import org.apache.james.protocols.api.CommandDetectionSession;
+import org.apache.james.protocols.api.Encryption;
+import org.apache.james.protocols.api.Protocol;
+import org.apache.james.protocols.api.ProtocolSession;
+import org.apache.james.protocols.api.ProtocolSessionImpl;
+import org.apache.james.protocols.api.ProtocolTransport;
+import org.apache.james.protocols.api.Response;
+import org.apache.james.protocols.api.handler.ConnectHandler;
+import org.apache.james.protocols.api.handler.DisconnectHandler;
+import org.apache.james.protocols.api.handler.LineHandler;
+import org.apache.james.protocols.api.handler.ProtocolHandlerChain;
+import org.apache.james.protocols.api.handler.ProtocolHandlerResultHandler;
+import org.apache.james.util.MDCBuilder;
+import org.slf4j.Logger;
+import org.slf4j.LoggerFactory;
+
+import com.google.common.collect.Iterables;
+
+import io.netty.buffer.ByteBuf;
+import io.netty.channel.Channel;
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.channel.ChannelInboundHandlerAdapter;
+import io.netty.handler.codec.TooLongFrameException;
+import io.netty.util.AttributeKey;
+
+/**
+ * {@link ChannelInboundHandlerAdapter} which is used by the SMTPServer and other line based protocols
+ */
+public class BasicChannelInboundHandler extends ChannelInboundHandlerAdapter implements LineHandlerAware {
+ private static final Logger LOGGER = LoggerFactory.getLogger(BasicChannelInboundHandler.class);
+ public static final ProtocolSession.AttachmentKey<MDCBuilder> MDC_ATTRIBUTE_KEY = ProtocolSession.AttachmentKey.of("bound_MDC", MDCBuilder.class);
+ public static final AttributeKey<CommandDetectionSession> SESSION_ATTRIBUTE_KEY =
+ AttributeKey.valueOf("session");
+
+ private final ProtocolMDCContextFactory mdcContextFactory;
+ protected final Protocol protocol;
+ protected final ProtocolHandlerChain chain;
+ protected final Encryption secure;
+ private final Deque<ChannelInboundHandlerAdapter> behaviourOverrides = new ConcurrentLinkedDeque<>();
+
+ public BasicChannelInboundHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol) {
+ this(mdcContextFactory, protocol, null);
+ }
+
+ public BasicChannelInboundHandler(ProtocolMDCContextFactory mdcContextFactory, Protocol protocol, Encryption secure) {
+ this.mdcContextFactory = mdcContextFactory;
+ this.protocol = protocol;
+ this.chain = protocol.getProtocolChain();
+ this.secure = secure;
+ }
+
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ MDCBuilder boundMDC = mdcContextFactory.onBound(protocol, ctx);
+ try (Closeable closeable = boundMDC.build()) {
+ ProtocolSession session = createSession(ctx);
+ session.setAttachment(MDC_ATTRIBUTE_KEY, boundMDC, Connection);
+ ctx.channel().attr(SESSION_ATTRIBUTE_KEY).set(session);
+
+ List<ConnectHandler> connectHandlers = chain.getHandlers(ConnectHandler.class);
+ List<ProtocolHandlerResultHandler> resultHandlers = chain.getHandlers(ProtocolHandlerResultHandler.class);
+
+ LOGGER.info("Connection established from {}", session.getRemoteAddress().getAddress().getHostAddress());
+ if (connectHandlers != null) {
+ for (ConnectHandler cHandler : connectHandlers) {
+ long start = System.currentTimeMillis();
+ Response response = cHandler.onConnect(session);
+ long executionTime = System.currentTimeMillis() - start;
+
+ for (ProtocolHandlerResultHandler resultHandler : resultHandlers) {
+ resultHandler.onResponse(session, response, executionTime, cHandler);
+ }
+ if (response != null) {
+ // TODO: This kind of sucks but I was able to come up with something more elegant here
+ ((ProtocolSessionImpl) session).getProtocolTransport().writeResponse(response, session);
+ }
+
+ }
+ }
+
+ super.channelActive(ctx);
+ }
+ }
+
+ private MDCBuilder mdc(ChannelHandlerContext ctx) {
+ ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
+
+ return Optional.ofNullable(session)
+ .flatMap(s -> s.getAttachment(MDC_ATTRIBUTE_KEY, Connection))
+ .map(mdc -> mdcContextFactory.withContext(session)
+ .addToContext(mdc))
+ .orElseGet(MDCBuilder::create);
+ }
+
+ @SuppressWarnings({ "rawtypes", "unchecked" })
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ try (Closeable closeable = mdc(ctx).build()) {
+ List<DisconnectHandler> connectHandlers = chain.getHandlers(DisconnectHandler.class);
+ ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
+ if (connectHandlers != null) {
+ for (DisconnectHandler connectHandler : connectHandlers) {
+ connectHandler.onDisconnect(session);
+ }
+ }
+ LOGGER.info("Connection closed for {}", session.getRemoteAddress().getAddress().getHostAddress());
+ cleanup(ctx);
+ super.channelInactive(ctx);
+ }
+ }
+
+
+ /**
+ * Call the {@link LineHandler}
+ */
+ @SuppressWarnings({ "unchecked", "rawtypes" })
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ ChannelInboundHandlerAdapter override = Iterables.getFirst(behaviourOverrides, null);
+ if (override != null) {
+ override.channelRead(ctx, msg);
+ return;
+ }
+
+ try (Closeable closeable = mdc(ctx).build()) {
+ ProtocolSession pSession = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
+ LinkedList<LineHandler> lineHandlers = chain.getHandlers(LineHandler.class);
+ LinkedList<ProtocolHandlerResultHandler> resultHandlers = chain.getHandlers(ProtocolHandlerResultHandler.class);
+
+
+ if (lineHandlers.size() > 0) {
+
+ ByteBuf buf = (ByteBuf) msg;
+ LineHandler lHandler = (LineHandler) lineHandlers.getLast();
+ long start = System.currentTimeMillis();
+ Response response = lHandler.onLine(pSession, buf.nioBuffer());
+ long executionTime = System.currentTimeMillis() - start;
+
+ for (ProtocolHandlerResultHandler resultHandler : resultHandlers) {
+ response = resultHandler.onResponse(pSession, response, executionTime, lHandler);
+ }
+ if (response != null) {
+ // TODO: This kind of sucks but I was able to come up with something more elegant here
+ ((ProtocolSessionImpl) pSession).getProtocolTransport().writeResponse(response, pSession);
+ }
+
+ }
+
+ ((ByteBuf) msg).release();
+ super.channelReadComplete(ctx);
+ }
+ }
+
+
+ /**
+ * Cleanup the channel
+ */
+ protected void cleanup(ChannelHandlerContext ctx) {
+ ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).getAndRemove();
+ if (session != null) {
+ session.resetState();
+ }
+ ctx.close();
+ }
+
+
+
+ protected ProtocolSession createSession(ChannelHandlerContext ctx) throws Exception {
+ SSLEngine engine = null;
+ if (secure != null) {
+ engine = secure.createSSLEngine();
+ }
+
+ return protocol.newSession(new NettyProtocolTransport(ctx.channel(), engine));
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, Throwable cause) throws Exception {
+ try (Closeable closeable = mdc(ctx).build()) {
+ Channel channel = ctx.channel();
+ ProtocolSession session = (ProtocolSession) ctx.channel().attr(SESSION_ATTRIBUTE_KEY).get();
+ if (cause instanceof TooLongFrameException && session != null) {
+ Response r = session.newLineTooLongResponse();
+ ProtocolTransport transport = ((ProtocolSessionImpl) session).getProtocolTransport();
+ if (r != null) {
+ transport.writeResponse(r, session);
+ }
+ } else {
+ if (channel.isActive() && session != null) {
+ ProtocolTransport transport = ((ProtocolSessionImpl) session).getProtocolTransport();
+
+ Response r = session.newFatalErrorResponse();
+ if (r != null) {
+ transport.writeResponse(r, session);
+ }
+ transport.writeResponse(Response.DISCONNECT, session);
+ }
+ if (cause instanceof ClosedChannelException) {
+ LOGGER.info("Channel closed before we could send in flight messages to the users (ClosedChannelException): {}", cause.getMessage());
+ } else {
+ LOGGER.error("Unable to process request", cause);
+ }
+ ctx.close();
+ }
+ }
+ }
+
+ @Override
+ public void pushLineHandler(ChannelInboundHandlerAdapter lineHandlerUpstreamHandler) {
+ behaviourOverrides.addFirst(lineHandlerUpstreamHandler);
+ }
+
+ @Override
+ public void popLineHandler() {
+ if (!behaviourOverrides.isEmpty()) {
+ behaviourOverrides.removeFirst();
+ }
+ }
+
+}
diff --git a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
index 951cdda..1eda6a5 100644
--- a/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
+++ b/protocols/netty/src/main/java/org/apache/james/protocols/netty/NettyServer.java
@@ -97,7 +97,7 @@ public class NettyServer extends AbstractAsyncServer {
}
protected ChannelInboundHandlerAdapter createCoreHandler() {
- return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure);
+ return new BasicChannelInboundHandler(new ProtocolMDCContextFactory.Standard(), protocol, secure);
}
@Override
diff --git a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
index 0596813..c51fe75 100644
--- a/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
+++ b/server/protocols/protocols-lmtp/src/main/java/org/apache/james/lmtpserver/netty/LMTPServer.java
@@ -33,7 +33,7 @@ import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
import org.apache.james.protocols.netty.ChannelHandlerFactory;
import org.apache.james.protocols.netty.LineDelimiterBasedChannelHandlerFactory;
import org.apache.james.protocols.smtp.SMTPProtocol;
-import org.apache.james.smtpserver.netty.SMTPChannelUpstreamHandler;
+import org.apache.james.smtpserver.netty.SMTPChannelInboundHandler;
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
@@ -142,7 +142,7 @@ public class LMTPServer extends AbstractProtocolAsyncServer implements LMTPServe
@Override
protected ChannelInboundHandlerAdapter createCoreHandler() {
SMTPProtocol protocol = new SMTPProtocol(getProtocolHandlerChain(), lmtpConfig);
- return new SMTPChannelUpstreamHandler(protocol, lmtpMetrics, getExecutorGroup());
+ return new SMTPChannelInboundHandler(protocol, lmtpMetrics, getExecutorGroup());
}
@Override
diff --git a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
index 8d18599..89725cc 100644
--- a/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
+++ b/server/protocols/protocols-pop3/src/main/java/org/apache/james/pop3server/netty/POP3Server.java
@@ -25,7 +25,7 @@ import org.apache.james.protocols.lib.handler.HandlersPackage;
import org.apache.james.protocols.lib.netty.AbstractProtocolAsyncServer;
import org.apache.james.protocols.netty.AbstractChannelPipelineFactory;
import org.apache.james.protocols.netty.AllButStartTlsLineChannelHandlerFactory;
-import org.apache.james.protocols.netty.BasicChannelUpstreamHandler;
+import org.apache.james.protocols.netty.BasicChannelInboundHandler;
import org.apache.james.protocols.netty.ChannelHandlerFactory;
import org.apache.james.protocols.netty.ProtocolMDCContextFactory;
import org.apache.james.protocols.pop3.POP3Protocol;
@@ -86,7 +86,7 @@ public class POP3Server extends AbstractProtocolAsyncServer implements POP3Serve
@Override
protected ChannelInboundHandlerAdapter createCoreHandler() {
- return new BasicChannelUpstreamHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption());
+ return new BasicChannelInboundHandler(new ProtocolMDCContextFactory.Standard(), protocol, getEncryption());
}
@Override
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java
similarity index 84%
rename from server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
rename to server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java
index df984d7..2515226 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelUpstreamHandler.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPChannelInboundHandler.java
@@ -1,83 +1,83 @@
-/****************************************************************
- * Licensed to the Apache Software Foundation (ASF) under one *
- * or more contributor license agreements. See the NOTICE file *
- * distributed with this work for additional information *
- * regarding copyright ownership. The ASF licenses this file *
- * to you under the Apache License, Version 2.0 (the *
- * "License"); you may not use this file except in compliance *
- * with the License. You may obtain a copy of the License at *
- * *
- * http://www.apache.org/licenses/LICENSE-2.0 *
- * *
- * Unless required by applicable law or agreed to in writing, *
- * software distributed under the License is distributed on an *
- * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
- * KIND, either express or implied. See the License for the *
- * specific language governing permissions and limitations *
- * under the License. *
- ****************************************************************/
-package org.apache.james.smtpserver.netty;
-
-import org.apache.james.lifecycle.api.LifecycleUtil;
-import org.apache.james.protocols.api.Encryption;
-import org.apache.james.protocols.api.Protocol;
-import org.apache.james.protocols.api.ProtocolSession.State;
-import org.apache.james.protocols.netty.BasicChannelUpstreamHandler;
-import org.apache.james.protocols.smtp.SMTPSession;
-import org.apache.james.protocols.smtp.core.SMTPMDCContextFactory;
-import org.apache.james.smtpserver.SMTPConstants;
-
-import io.netty.channel.ChannelHandlerContext;
-import io.netty.util.concurrent.EventExecutorGroup;
-
-/**
- * {@link BasicChannelUpstreamHandler} which is used by the SMTPServer
- */
-public class SMTPChannelUpstreamHandler extends BasicChannelUpstreamHandler {
-
- private final SmtpMetrics smtpMetrics;
-
- public SMTPChannelUpstreamHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
- super(new SMTPMDCContextFactory(), protocol, encryption);
- this.smtpMetrics = smtpMetrics;
- }
-
- public SMTPChannelUpstreamHandler(Protocol protocol, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
- super(new SMTPMDCContextFactory(), protocol);
- this.smtpMetrics = smtpMetrics;
- }
-
- @Override
- public void channelActive(ChannelHandlerContext ctx) throws Exception {
- super.channelActive(ctx);
- smtpMetrics.getConnectionMetric().increment();
- }
-
- @Override
- public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
- super.channelRead(ctx, msg);
- smtpMetrics.getCommandsMetric().increment();
- }
-
- @Override
- public void channelInactive(ChannelHandlerContext ctx) throws Exception {
- super.channelInactive(ctx);
- smtpMetrics.getConnectionMetric().decrement();
- }
-
- /**
- * Cleanup temporary files
- */
- @Override
- protected void cleanup(ChannelHandlerContext ctx) {
- // Make sure we dispose everything on exit on session close
- SMTPSession smtpSession = ctx.channel().attr(SMTPConstants.SMTP_SESSION_ATTRIBUTE_KEY).get();
-
- if (smtpSession != null) {
- smtpSession.getAttachment(SMTPConstants.MAIL, State.Transaction).ifPresent(LifecycleUtil::dispose);
- smtpSession.getAttachment(SMTPConstants.DATA_MIMEMESSAGE_STREAMSOURCE, State.Transaction).ifPresent(LifecycleUtil::dispose);
- }
-
- super.cleanup(ctx);
- }
-}
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.smtpserver.netty;
+
+import org.apache.james.lifecycle.api.LifecycleUtil;
+import org.apache.james.protocols.api.Encryption;
+import org.apache.james.protocols.api.Protocol;
+import org.apache.james.protocols.api.ProtocolSession.State;
+import org.apache.james.protocols.netty.BasicChannelInboundHandler;
+import org.apache.james.protocols.smtp.SMTPSession;
+import org.apache.james.protocols.smtp.core.SMTPMDCContextFactory;
+import org.apache.james.smtpserver.SMTPConstants;
+
+import io.netty.channel.ChannelHandlerContext;
+import io.netty.util.concurrent.EventExecutorGroup;
+
+/**
+ * {@link BasicChannelInboundHandler} which is used by the SMTPServer
+ */
+public class SMTPChannelInboundHandler extends BasicChannelInboundHandler {
+
+ private final SmtpMetrics smtpMetrics;
+
+ public SMTPChannelInboundHandler(Protocol protocol, Encryption encryption, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+ super(new SMTPMDCContextFactory(), protocol, encryption);
+ this.smtpMetrics = smtpMetrics;
+ }
+
+ public SMTPChannelInboundHandler(Protocol protocol, SmtpMetrics smtpMetrics, EventExecutorGroup eventExecutorGroup) {
+ super(new SMTPMDCContextFactory(), protocol);
+ this.smtpMetrics = smtpMetrics;
+ }
+
+ @Override
+ public void channelActive(ChannelHandlerContext ctx) throws Exception {
+ super.channelActive(ctx);
+ smtpMetrics.getConnectionMetric().increment();
+ }
+
+ @Override
+ public void channelRead(ChannelHandlerContext ctx, Object msg) throws Exception {
+ super.channelRead(ctx, msg);
+ smtpMetrics.getCommandsMetric().increment();
+ }
+
+ @Override
+ public void channelInactive(ChannelHandlerContext ctx) throws Exception {
+ super.channelInactive(ctx);
+ smtpMetrics.getConnectionMetric().decrement();
+ }
+
+ /**
+ * Cleanup temporary files
+ */
+ @Override
+ protected void cleanup(ChannelHandlerContext ctx) {
+ // Make sure we dispose everything on exit on session close
+ SMTPSession smtpSession = ctx.channel().attr(SMTPConstants.SMTP_SESSION_ATTRIBUTE_KEY).get();
+
+ if (smtpSession != null) {
+ smtpSession.getAttachment(SMTPConstants.MAIL, State.Transaction).ifPresent(LifecycleUtil::dispose);
+ smtpSession.getAttachment(SMTPConstants.DATA_MIMEMESSAGE_STREAMSOURCE, State.Transaction).ifPresent(LifecycleUtil::dispose);
+ }
+
+ super.cleanup(ctx);
+ }
+}
diff --git a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
index 78dfa59..4f07978 100644
--- a/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
+++ b/server/protocols/protocols-smtp/src/main/java/org/apache/james/smtpserver/netty/SMTPServer.java
@@ -385,7 +385,7 @@ public class SMTPServer extends AbstractProtocolAsyncServer implements SMTPServe
@Override
protected ChannelInboundHandlerAdapter createCoreHandler() {
- return new SMTPChannelUpstreamHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
+ return new SMTPChannelInboundHandler(transport, getEncryption(), smtpMetrics, getExecutorGroup());
}
@Override
---------------------------------------------------------------------
To unsubscribe, e-mail: notifications-unsubscribe@james.apache.org
For additional commands, e-mail: notifications-help@james.apache.org