You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/04/11 21:44:49 UTC

svn commit: r932980 - in /james/server/trunk/imapserver: ./ src/main/java/org/apache/james/imapserver/netty/

Author: norman
Date: Sun Apr 11 19:44:48 2010
New Revision: 932980

URL: http://svn.apache.org/viewvc?rev=932980&view=rev
Log:
Start to add code for using netty with imap. Thats more ore less a copy of StreamIoHandler stuff of MINA with a little refactoring to use netty's stuff

Added:
    james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/
    james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java
    james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java
    james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java
Modified:
    james/server/trunk/imapserver/pom.xml

Modified: james/server/trunk/imapserver/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/pom.xml?rev=932980&r1=932979&r2=932980&view=diff
==============================================================================
--- james/server/trunk/imapserver/pom.xml (original)
+++ james/server/trunk/imapserver/pom.xml Sun Apr 11 19:44:48 2010
@@ -63,6 +63,10 @@
       <groupId>org.apache.james</groupId>
       <artifactId>james-server-mina-socket</artifactId>
     </dependency>
+   <dependency>
+      <groupId>org.apache.james</groupId>
+      <artifactId>james-server-netty-socket</artifactId>
+    </dependency>
     <dependency>
       <groupId>org.apache.james</groupId>
       <artifactId>apache-james-imap-api</artifactId>
@@ -110,6 +114,10 @@
       <artifactId>mina-core</artifactId>
     </dependency>
     <dependency>
+      <groupId>org.jboss.netty</groupId>
+      <artifactId>netty</artifactId>
+    </dependency>
+    <dependency>
       <groupId>org.apache.db.torque</groupId>
       <artifactId>runtime</artifactId>
     </dependency>

Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java?rev=932980&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java Sun Apr 11 19:44:48 2010
@@ -0,0 +1,171 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+public class BlockingChannelBufferInputStream extends InputStream{
+    private final Object mutex = new Object();
+
+    private final ChannelBuffer buf;
+
+    private volatile boolean closed;
+
+    private volatile boolean released;
+
+    private IOException exception;
+
+    public BlockingChannelBufferInputStream() {
+        buf = ChannelBuffers.dynamicBuffer();
+        //buf..setA
+        //buf.setAutoExpand(true);
+        //buf.limit(0);
+    }
+
+    @Override
+    public int available() {
+        if (released) {
+            return 0;
+        }
+
+        synchronized (mutex) {
+            return buf.readableBytes();
+        }
+    }
+
+    @Override
+    public void close() {
+        if (closed) {
+            return;
+        }
+
+        synchronized (mutex) {
+            closed = true;
+            releaseBuffer();
+
+            mutex.notifyAll();
+        }
+    }
+
+    @Override
+    public int read() throws IOException {
+        synchronized (mutex) {
+            if (!waitForData()) {
+                return -1;
+            }
+
+            return buf.readByte() & 0xff;
+        }
+    }
+
+    @Override
+    public int read(byte[] b, int off, int len) throws IOException {
+        synchronized (mutex) {
+            if (!waitForData()) {
+                return -1;
+            }
+
+            int readBytes;
+
+            if (len > buf.readableBytes()) {
+                readBytes = buf.readableBytes();
+            } else {
+                readBytes = len;
+            }
+
+            buf.readBytes(b, off, readBytes);
+
+            return readBytes;
+        }
+    }
+
+    private boolean waitForData() throws IOException {
+        if (released) {
+            return false;
+        }
+
+        synchronized (mutex) {
+            while (!released && buf.readableBytes() == 0 && exception == null) {
+                try {
+                    mutex.wait();
+                } catch (InterruptedException e) {
+                    IOException ioe = new IOException(
+                            "Interrupted while waiting for more data");
+                    ioe.initCause(e);
+                    throw ioe;
+                }
+            }
+        }
+
+        if (exception != null) {
+            releaseBuffer();
+            throw exception;
+        }
+
+        if (closed && buf.readableBytes() == 0) {
+            releaseBuffer();
+
+            return false;
+        }
+
+        return true;
+    }
+
+    private void releaseBuffer() {
+        if (released) {
+            return;
+        }
+
+        released = true;
+    }
+
+    public void write(ChannelBuffer src) {
+        synchronized (mutex) {
+            if (closed) {
+                return;
+            }
+
+            if (buf.readable()) {
+                this.buf.writeBytes(src);
+                this.buf.readerIndex(0);
+            } else {
+                this.buf.clear();
+                this.buf.writeBytes(src);
+                this.buf.readerIndex(0);
+                mutex.notifyAll();
+            }
+        }
+    }
+
+    public void throwException(IOException e) {
+        synchronized (mutex) {
+            if (exception == null) {
+                exception = e;
+
+                mutex.notifyAll();
+            }
+        }
+    }
+
+
+}

Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java?rev=932980&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java Sun Apr 11 19:44:48 2010
@@ -0,0 +1,85 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+
+public class ChannelBufferOutputStream extends OutputStream{
+
+    private final Channel channel;
+
+    private ChannelFuture lastChannelFuture;
+
+    public ChannelBufferOutputStream(Channel channel) {
+        this.channel = channel;
+    }
+
+    @Override
+    public void close() throws IOException {
+        try {
+            flush();
+        } finally {
+            channel.close().awaitUninterruptibly();
+        }
+    }
+
+    private void checkClosed() throws IOException {
+        if (!channel.isConnected()) {
+            throw new IOException("The session has been closed.");
+        }
+    }
+
+    private synchronized void write(ChannelBuffer buf) throws IOException {
+        checkClosed();
+        ChannelFuture future = channel.write(buf);
+        lastChannelFuture = future;
+    }
+
+    @Override
+    public void write(byte[] b, int off, int len) throws IOException {
+        write(ChannelBuffers.copiedBuffer(b.clone(), off, len));
+    }
+
+    @Override
+    public void write(int b) throws IOException {
+        ChannelBuffer buf = ChannelBuffers.buffer(1);
+        buf.writeByte((byte) b);
+        write(buf);
+    }
+
+    @Override
+    public synchronized void flush() throws IOException {
+        if (lastChannelFuture == null) {
+            return;
+        }
+
+        lastChannelFuture.awaitUninterruptibly();
+        if (!lastChannelFuture.isSuccess()) {
+            throw new IOException(
+                    "The bytes could not be written to the session");
+        }
+    }
+
+}

Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java?rev=932980&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java Sun Apr 11 19:44:48 2010
@@ -0,0 +1,166 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one   *
+ * or more contributor license agreements.  See the NOTICE file *
+ * distributed with this work for additional information        *
+ * regarding copyright ownership.  The ASF licenses this file   *
+ * to you under the Apache License, Version 2.0 (the            *
+ * "License"); you may not use this file except in compliance   *
+ * with the License.  You may obtain a copy of the License at   *
+ *                                                              *
+ *   http://www.apache.org/licenses/LICENSE-2.0                 *
+ *                                                              *
+ * Unless required by applicable law or agreed to in writing,   *
+ * software distributed under the License is distributed on an  *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY       *
+ * KIND, either express or implied.  See the License for the    *
+ * specific language governing permissions and limitations      *
+ * under the License.                                           *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+public abstract class StreamChannelUpstreamHandler extends SimpleChannelUpstreamHandler{
+    
+    private static final String KEY_IN = "stream.in";
+    private static final String KEY_OUT = "stream.out";
+
+    private int readTimeout;
+
+    private int writeTimeout;
+
+    protected StreamChannelUpstreamHandler() {
+        // Do nothing
+    }
+
+    /**
+     * Implement this method to execute your stream I/O logic;
+     * <b>please note that you must forward the process request to other
+     * thread or thread pool.</b>
+     */
+    protected abstract void processStreamIo(ChannelHandlerContext ctx, InputStream in,
+            OutputStream out);
+
+    /**
+     * Returns read timeout in seconds.
+     * The default value is <tt>0</tt> (disabled).
+     */
+    public int getReadTimeout() {
+        return readTimeout;
+    }
+
+    /**
+     * Sets read timeout in seconds.
+     * The default value is <tt>0</tt> (disabled).
+     */
+    public void setReadTimeout(int readTimeout) {
+        this.readTimeout = readTimeout;
+    }
+
+    /**
+     * Returns write timeout in seconds.
+     * The default value is <tt>0</tt> (disabled).
+     */
+    public int getWriteTimeout() {
+        return writeTimeout;
+    }
+
+    /**
+     * Sets write timeout in seconds.
+     * The default value is <tt>0</tt> (disabled).
+     */
+    public void setWriteTimeout(int writeTimeout) {
+        this.writeTimeout = writeTimeout;
+    }
+
+    @Override
+    public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        
+        // Create streams
+        InputStream in = new  BlockingChannelBufferInputStream();
+        OutputStream out = new ChannelBufferOutputStream(ctx.getChannel());
+        Map<Object, Object> attachment = getAttachment(ctx);
+        attachment.put(KEY_IN, in);
+        attachment.put(KEY_OUT, out);
+        processStreamIo(ctx, in, out);
+        ctx.setAttachment(attachment);
+        super.channelOpen(ctx, e);
+    }
+
+    @SuppressWarnings("unchecked")
+    protected Map<Object,Object> getAttachment(ChannelHandlerContext ctx) {
+        Map<Object,Object> attachment = (Map<Object, Object>) ctx.getAttachment();
+        if (attachment == null) {
+            attachment = new HashMap<Object, Object>();
+            ctx.setAttachment(attachment);
+        }
+        return attachment;
+    }
+    /**
+     * Closes streams
+     */
+    @Override
+    public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+        Map<Object, Object> attachment = getAttachment(ctx);
+
+        final InputStream in = (InputStream) attachment.get(KEY_IN);
+        final OutputStream out = (OutputStream) attachment.get(KEY_OUT);
+        try {
+            in.close();
+        } finally {
+            out.close();
+        }
+        super.channelClosed(ctx, e);
+    }
+    
+
+    /**
+     * Forwards read data to input stream.
+     */
+    @Override
+    public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+        final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
+        in.write((ChannelBuffer) e.getMessage());
+        super.messageReceived(ctx, e);
+    }
+
+    /**
+     * Forwards caught exceptions to input stream.
+     */
+    @Override
+    public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+        final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
+        IOException ex = null;
+        if (e.getCause() instanceof StreamIoException) {
+            ex = (IOException) e.getCause().getCause();
+        } else if (e.getCause() instanceof IOException) {
+            ex = (IOException) e.getCause();
+        }
+
+        if (e != null && in != null) {
+            in.throwException(ex);
+        } else {
+            ctx.getChannel().close();
+        }
+    }
+    
+    private static class StreamIoException extends RuntimeException {
+        private static final long serialVersionUID = 3976736960742503222L;
+
+        public StreamIoException(IOException cause) {
+            super(cause);
+        }
+    }
+
+}



---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org