You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/04/16 16:07:08 UTC
svn commit: r934888 - in /james/server/trunk/imapserver/src/main/java/org:
apache/james/imapserver/netty/ jboss/ jboss/netty/ jboss/netty/handler/
jboss/netty/handler/stream/
Author: norman
Date: Fri Apr 16 14:07:08 2010
New Revision: 934888
URL: http://svn.apache.org/viewvc?rev=934888&view=rev
Log:
Add/Fix Netty version for IMAP.
Added:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NioImapServer.java
james/server/trunk/imapserver/src/main/java/org/jboss/
james/server/trunk/imapserver/src/main/java/org/jboss/netty/
james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/
james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/
james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java
- copied, changed from r934704, james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java
james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java
- copied, changed from r934704, james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java
james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java
- copied, changed from r934704, james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java
Removed:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java
Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java?rev=934888&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ImapChannelUpstreamHandler.java Fri Apr 16 14:07:08 2010
@@ -0,0 +1,110 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.concurrent.TimeUnit;
+
+import org.apache.commons.logging.Log;
+import org.apache.james.imap.api.ImapConstants;
+import org.apache.james.imap.main.ImapRequestHandler;
+import org.apache.james.imap.main.ImapSessionImpl;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.handler.stream.StreamHandler;
+import org.jboss.netty.util.HashedWheelTimer;
+
+/**
+ * {@link StreamHandler} which handles IMAP
+ *
+ *
+ */
+public class ImapChannelUpstreamHandler extends StreamHandler{
+
+ private final Log logger;
+
+ private final String hello;
+
+ private final ImapRequestHandler handler;
+
+ private final static String IMAP_SESSION = "IMAP_SESSION";
+
+ public ImapChannelUpstreamHandler(final String hello, final ImapRequestHandler handler, final Log logger, final long readTimeout) {
+ super(new HashedWheelTimer(), readTimeout, TimeUnit.SECONDS);
+ this.logger = logger;
+ this.hello = hello;
+ this.handler = handler;
+ }
+
+ @Override
+ protected void processStreamIo(final ChannelHandlerContext ctx, final InputStream in, final OutputStream out) {
+ final ImapSessionImpl imapSession = (ImapSessionImpl) getAttachment(ctx).get(IMAP_SESSION);
+
+ // handle requests in a loop
+ while (handler.handleRequest(in, out, imapSession));
+ if (imapSession != null) imapSession.logout();
+
+ Channel channel = ctx.getChannel();
+ logger.debug("Thread execution complete for session " + channel.getId());
+
+ channel.close();
+ }
+
+
+ @Override
+ public void channelBound(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+
+ // create the imap session and store it in the IoSession for later usage
+ final ImapSessionImpl imapsession = new ImapSessionImpl();
+ imapsession.setLog(logger);
+
+ getAttachment(ctx).put(IMAP_SESSION, imapsession);
+ super.channelBound(ctx, e);
+ }
+
+
+ @Override
+ public void channelConnected(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ // write hello to client
+ ctx.getChannel().write(ChannelBuffers.copiedBuffer((ImapConstants.UNTAGGED + " OK " + hello +" " + new String(ImapConstants.BYTES_LINE_END)).getBytes()));
+
+ super.channelConnected(ctx, e);
+
+ }
+
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ logger.debug("Error while processing imap request" ,e.getCause());
+
+ // logout on error not sure if that is the best way to handle it
+ final ImapSessionImpl imapSession = (ImapSessionImpl) getAttachment(ctx).get(IMAP_SESSION);
+ if (imapSession != null) imapSession.logout();
+
+ // just close the channel now!
+ ctx.getChannel().close();
+
+ super.exceptionCaught(ctx, e);
+ }
+
+
+}
Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NioImapServer.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NioImapServer.java?rev=934888&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NioImapServer.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/NioImapServer.java Fri Apr 16 14:07:08 2010
@@ -0,0 +1,101 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import static org.jboss.netty.channel.Channels.pipeline;
+
+import javax.annotation.Resource;
+
+import org.apache.commons.configuration.ConfigurationException;
+import org.apache.commons.configuration.HierarchicalConfiguration;
+import org.apache.james.imap.api.ImapConstants;
+import org.apache.james.imap.api.process.ImapProcessor;
+import org.apache.james.imap.decode.ImapDecoder;
+import org.apache.james.imap.encode.ImapEncoder;
+import org.apache.james.imap.main.ImapRequestHandler;
+import org.apache.james.socket.netty.AbstractAsyncServer;
+import org.jboss.netty.channel.ChannelPipeline;
+import org.jboss.netty.channel.ChannelPipelineFactory;
+
+public class NioImapServer extends AbstractAsyncServer implements ImapConstants{
+
+ private static final String softwaretype = "JAMES "+VERSION+" Server "; //+ Constants.SOFTWARE_VERSION;
+
+ private String hello;
+ private ImapProcessor processor;
+ private ImapEncoder encoder;
+
+ private ImapDecoder decoder;
+
+ @Resource(name="imapDecoder")
+ public void setImapDecoder(ImapDecoder decoder) {
+ this.decoder = decoder;
+ }
+
+ @Resource(name="imapEncoder")
+ public void setImapEncoder(ImapEncoder encoder) {
+ this.encoder = encoder;
+ }
+
+ @Resource(name="imapProcessor")
+ public void setImapProcessor(ImapProcessor processor) {
+ this.processor = processor;
+ }
+
+ @Override
+ public void doConfigure( final HierarchicalConfiguration configuration ) throws ConfigurationException {
+ super.doConfigure(configuration);
+ hello = softwaretype + " Server " + getHelloName() + " is ready.";
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.socket.mina.AbstractAsyncServer#getDefaultPort()
+ */
+ public int getDefaultPort() {
+ return 143;
+ }
+
+
+ /*
+ * (non-Javadoc)
+ * @see org.apache.james.socket.mina.AbstractAsyncServer#getServiceType()
+ */
+ public String getServiceType() {
+ return "IMAP Service";
+ }
+
+ @Override
+ protected ChannelPipelineFactory createPipelineFactory() {
+ return new ChannelPipelineFactory() {
+
+ public ChannelPipeline getPipeline() throws Exception {
+ ChannelPipeline pipeline = pipeline();
+ final ImapRequestHandler handler = new ImapRequestHandler(decoder, processor, encoder);
+ pipeline.addLast("coreHandler", new ImapChannelUpstreamHandler(hello, handler, getLogger(), NioImapServer.this.getTimeout()));
+ return pipeline;
+ }
+
+ };
+ }
+
+
+
+}
Copied: james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java (from r934704, james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java)
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java?p2=james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java&p1=james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java&r1=934704&r2=934888&rev=934888&view=diff
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java (original)
+++ james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/BlockingChannelBufferInputStream.java Fri Apr 16 14:07:08 2010
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
-package org.apache.james.imapserver.netty;
+package org.jboss.netty.handler.stream;
import java.io.IOException;
import java.io.InputStream;
@@ -24,6 +24,13 @@ import java.io.InputStream;
import org.jboss.netty.buffer.ChannelBuffer;
import org.jboss.netty.buffer.ChannelBuffers;
+/**
+ * {@link InputStream} implementation which can be used to write {@link ChannelBuffer}
+ * objects to and read them
+ *
+ *
+ * @author Norman Maurer
+ */
public class BlockingChannelBufferInputStream extends InputStream{
private final Object mutex = new Object();
@@ -37,9 +44,6 @@ public class BlockingChannelBufferInputS
public BlockingChannelBufferInputStream() {
buf = ChannelBuffers.dynamicBuffer();
- //buf..setA
- //buf.setAutoExpand(true);
- //buf.limit(0);
}
@Override
@@ -139,6 +143,12 @@ public class BlockingChannelBufferInputS
released = true;
}
+ /**
+ * Write the {@link ChannelBuffer} to {@link InputStream} and unblock the
+ * read methods
+ *
+ * @param src buffer
+ */
public void write(ChannelBuffer src) {
synchronized (mutex) {
if (closed) {
@@ -146,8 +156,9 @@ public class BlockingChannelBufferInputS
}
if (buf.readable()) {
+
this.buf.writeBytes(src);
- this.buf.readerIndex(0);
+ //this.buf.readerIndex(0);
} else {
this.buf.clear();
this.buf.writeBytes(src);
@@ -157,6 +168,11 @@ public class BlockingChannelBufferInputS
}
}
+ /**
+ * Throw the given {@link IOException} on the next read call
+ *
+ * @param e
+ */
public void throwException(IOException e) {
synchronized (mutex) {
if (exception == null) {
Copied: james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java (from r934704, james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java)
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java?p2=james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java&p1=james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java&r1=934704&r2=934888&rev=934888&view=diff
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java (original)
+++ james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/ChannelOutputStream.java Fri Apr 16 14:07:08 2010
@@ -16,7 +16,7 @@
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
-package org.apache.james.imapserver.netty;
+package org.jboss.netty.handler.stream;
import java.io.IOException;
import java.io.OutputStream;
@@ -26,13 +26,18 @@ import org.jboss.netty.buffer.ChannelBuf
import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelFuture;
-public class ChannelBufferOutputStream extends OutputStream{
+/**
+ * {@link OutputStream} which write data to the wrapped {@link Channel}
+ *
+ * @author Norman Maurer
+ */
+public class ChannelOutputStream extends OutputStream{
private final Channel channel;
private ChannelFuture lastChannelFuture;
- public ChannelBufferOutputStream(Channel channel) {
+ public ChannelOutputStream(Channel channel) {
this.channel = channel;
}
Copied: james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java (from r934704, james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java)
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java?p2=james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java&p1=james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java&r1=934704&r2=934888&rev=934888&view=diff
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java (original)
+++ james/server/trunk/imapserver/src/main/java/org/jboss/netty/handler/stream/StreamHandler.java Fri Apr 16 14:07:08 2010
@@ -16,90 +16,130 @@
* specific language governing permissions and limitations *
* under the License. *
****************************************************************/
-package org.apache.james.imapserver.netty;
+package org.jboss.netty.handler.stream;
import java.io.IOException;
import java.io.InputStream;
import java.io.OutputStream;
+import java.net.SocketTimeoutException;
import java.util.HashMap;
import java.util.Map;
+import java.util.concurrent.ExecutorService;
+import java.util.concurrent.Executors;
+import java.util.concurrent.TimeUnit;
import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.Channel;
import org.jboss.netty.channel.ChannelHandlerContext;
import org.jboss.netty.channel.ChannelStateEvent;
import org.jboss.netty.channel.ExceptionEvent;
import org.jboss.netty.channel.MessageEvent;
-import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+import org.jboss.netty.handler.timeout.ReadTimeoutHandler;
+import org.jboss.netty.util.Timer;
-public abstract class StreamChannelUpstreamHandler extends SimpleChannelUpstreamHandler{
+/**
+ * Abstract base class which could be used if you need to use {@link InputStream} and {@link OutputStream} directly in your Handler.
+ * Because of the blocking nature of {@link InputStream} it will spawn a new Thread on every new connected {@link Channel}
+ *
+ * @author Norman Maurer
+ */
+public abstract class StreamHandler extends ReadTimeoutHandler{
+
+ private final ExecutorService executor;
private static final String KEY_IN = "stream.in";
private static final String KEY_OUT = "stream.out";
-
- private int readTimeout;
-
- private int writeTimeout;
-
- protected StreamChannelUpstreamHandler() {
- // Do nothing
- }
-
+
/**
- * Implement this method to execute your stream I/O logic;
- * <b>please note that you must forward the process request to other
- * thread or thread pool.</b>
+ * Create a new Instance which use a cached ThreadPool with no limit to perform the stream handling
+ *
+ * @param timer
+ * @param readerIdleTimeSeconds
*/
- protected abstract void processStreamIo(ChannelHandlerContext ctx, InputStream in,
- OutputStream out);
+ public StreamHandler(Timer timer, int readerIdleTimeSeconds) {
+ this(timer, readerIdleTimeSeconds, Executors.newCachedThreadPool());
+ }
/**
- * Returns read timeout in seconds.
- * The default value is <tt>0</tt> (disabled).
+ * Create a new Instance which use a cached ThreadPool with no limit to perform the stream handling
+ *
+ * @param timer
+ * @param readerIdleTime
+ * @param unit
*/
- public int getReadTimeout() {
- return readTimeout;
+ public StreamHandler(Timer timer, long readerIdleTime, TimeUnit unit) {
+ this(timer, readerIdleTime, unit, Executors.newCachedThreadPool());
}
-
+
/**
- * Sets read timeout in seconds.
- * The default value is <tt>0</tt> (disabled).
+ * Create a new Instance which use thre give {@link ExecutorService} to perform the stream handling
+ *
+ * @param timer
+ * @param readerIdleTimeSeconds
+ * @param executor
*/
- public void setReadTimeout(int readTimeout) {
- this.readTimeout = readTimeout;
+ public StreamHandler(Timer timer, int readerIdleTimeSeconds, ExecutorService executor) {
+ super(timer, readerIdleTimeSeconds);
+ this.executor = executor;
}
/**
- * Returns write timeout in seconds.
- * The default value is <tt>0</tt> (disabled).
+ * Create a new Instance which use thre give {@link ExecutorService} to perform the stream handling
+ *
+ * @param timer
+ * @param readerIdleTime
+ * @param unit
+ * @param executor
*/
- public int getWriteTimeout() {
- return writeTimeout;
+ public StreamHandler(Timer timer, long readerIdleTime, TimeUnit unit, ExecutorService executor) {
+ super(timer, readerIdleTime, unit);
+ this.executor = executor;
}
+
/**
- * Sets write timeout in seconds.
- * The default value is <tt>0</tt> (disabled).
+ * Implement this method to execute your stream I/O logic
+ *
+ * The method will get executed in a new Thread
+ *
*/
- public void setWriteTimeout(int writeTimeout) {
- this.writeTimeout = writeTimeout;
- }
+ protected abstract void processStreamIo(final ChannelHandlerContext ctx, final InputStream in,
+ OutputStream out);
+
+
+ /**
+ * Fire of the {@link #processStreamIo(ChannelHandlerContext, InputStream, OutputStream)} method
+ */
@Override
- public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ public void channelConnected(final ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
// Create streams
- InputStream in = new BlockingChannelBufferInputStream();
- OutputStream out = new ChannelBufferOutputStream(ctx.getChannel());
+ final InputStream in = new BlockingChannelBufferInputStream();
+ final OutputStream out = new ChannelOutputStream(ctx.getChannel());
Map<Object, Object> attachment = getAttachment(ctx);
attachment.put(KEY_IN, in);
attachment.put(KEY_OUT, out);
- processStreamIo(ctx, in, out);
+ executor.execute(new Runnable() {
+
+ public void run() {
+ processStreamIo(ctx, in, out);
+ }
+ });
ctx.setAttachment(attachment);
- super.channelOpen(ctx, e);
+ super.channelConnected(ctx, e);
}
+ /**
+ * Return the Map which is used as Attachment to the {@link ChannelHandlerContext}
+ *
+ * You should use this map if you need to store attachments on the {@link ChannelHandlerContext}
+ *
+ * @param ctx
+ * @return attachmentMap
+ */
@SuppressWarnings("unchecked")
- protected Map<Object,Object> getAttachment(ChannelHandlerContext ctx) {
+ protected final Map<Object,Object> getAttachment(ChannelHandlerContext ctx) {
Map<Object,Object> attachment = (Map<Object, Object>) ctx.getAttachment();
if (attachment == null) {
attachment = new HashMap<Object, Object>();
@@ -142,7 +182,7 @@ public abstract class StreamChannelUpstr
public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
IOException ex = null;
- if (e.getCause() instanceof StreamIoException) {
+ if (e.getCause() instanceof ReadTimeOutException) {
ex = (IOException) e.getCause().getCause();
} else if (e.getCause() instanceof IOException) {
ex = (IOException) e.getCause();
@@ -155,10 +195,22 @@ public abstract class StreamChannelUpstr
}
}
- private static class StreamIoException extends RuntimeException {
+
+ @Override
+ protected void readTimedOut(ChannelHandlerContext ctx) throws Exception {
+ throw new ReadTimeOutException(new SocketTimeoutException("Read timeout"));
+ }
+
+
+ /**
+ *
+ * Exception thrown on a read timeount
+ *
+ */
+ private static class ReadTimeOutException extends RuntimeException {
private static final long serialVersionUID = 3976736960742503222L;
- public StreamIoException(IOException cause) {
+ public ReadTimeOutException(IOException cause) {
super(cause);
}
}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org