You are viewing a plain text version of this content. The canonical link for it is here.
Posted to server-dev@james.apache.org by no...@apache.org on 2010/04/11 21:44:49 UTC
svn commit: r932980 - in /james/server/trunk/imapserver: ./
src/main/java/org/apache/james/imapserver/netty/
Author: norman
Date: Sun Apr 11 19:44:48 2010
New Revision: 932980
URL: http://svn.apache.org/viewvc?rev=932980&view=rev
Log:
Start to add code for using netty with imap. Thats more ore less a copy of StreamIoHandler stuff of MINA with a little refactoring to use netty's stuff
Added:
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java
james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java
Modified:
james/server/trunk/imapserver/pom.xml
Modified: james/server/trunk/imapserver/pom.xml
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/pom.xml?rev=932980&r1=932979&r2=932980&view=diff
==============================================================================
--- james/server/trunk/imapserver/pom.xml (original)
+++ james/server/trunk/imapserver/pom.xml Sun Apr 11 19:44:48 2010
@@ -63,6 +63,10 @@
<groupId>org.apache.james</groupId>
<artifactId>james-server-mina-socket</artifactId>
</dependency>
+ <dependency>
+ <groupId>org.apache.james</groupId>
+ <artifactId>james-server-netty-socket</artifactId>
+ </dependency>
<dependency>
<groupId>org.apache.james</groupId>
<artifactId>apache-james-imap-api</artifactId>
@@ -110,6 +114,10 @@
<artifactId>mina-core</artifactId>
</dependency>
<dependency>
+ <groupId>org.jboss.netty</groupId>
+ <artifactId>netty</artifactId>
+ </dependency>
+ <dependency>
<groupId>org.apache.db.torque</groupId>
<artifactId>runtime</artifactId>
</dependency>
Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java?rev=932980&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/BlockingChannelBufferInputStream.java Sun Apr 11 19:44:48 2010
@@ -0,0 +1,171 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+
+public class BlockingChannelBufferInputStream extends InputStream{
+ private final Object mutex = new Object();
+
+ private final ChannelBuffer buf;
+
+ private volatile boolean closed;
+
+ private volatile boolean released;
+
+ private IOException exception;
+
+ public BlockingChannelBufferInputStream() {
+ buf = ChannelBuffers.dynamicBuffer();
+ //buf..setA
+ //buf.setAutoExpand(true);
+ //buf.limit(0);
+ }
+
+ @Override
+ public int available() {
+ if (released) {
+ return 0;
+ }
+
+ synchronized (mutex) {
+ return buf.readableBytes();
+ }
+ }
+
+ @Override
+ public void close() {
+ if (closed) {
+ return;
+ }
+
+ synchronized (mutex) {
+ closed = true;
+ releaseBuffer();
+
+ mutex.notifyAll();
+ }
+ }
+
+ @Override
+ public int read() throws IOException {
+ synchronized (mutex) {
+ if (!waitForData()) {
+ return -1;
+ }
+
+ return buf.readByte() & 0xff;
+ }
+ }
+
+ @Override
+ public int read(byte[] b, int off, int len) throws IOException {
+ synchronized (mutex) {
+ if (!waitForData()) {
+ return -1;
+ }
+
+ int readBytes;
+
+ if (len > buf.readableBytes()) {
+ readBytes = buf.readableBytes();
+ } else {
+ readBytes = len;
+ }
+
+ buf.readBytes(b, off, readBytes);
+
+ return readBytes;
+ }
+ }
+
+ private boolean waitForData() throws IOException {
+ if (released) {
+ return false;
+ }
+
+ synchronized (mutex) {
+ while (!released && buf.readableBytes() == 0 && exception == null) {
+ try {
+ mutex.wait();
+ } catch (InterruptedException e) {
+ IOException ioe = new IOException(
+ "Interrupted while waiting for more data");
+ ioe.initCause(e);
+ throw ioe;
+ }
+ }
+ }
+
+ if (exception != null) {
+ releaseBuffer();
+ throw exception;
+ }
+
+ if (closed && buf.readableBytes() == 0) {
+ releaseBuffer();
+
+ return false;
+ }
+
+ return true;
+ }
+
+ private void releaseBuffer() {
+ if (released) {
+ return;
+ }
+
+ released = true;
+ }
+
+ public void write(ChannelBuffer src) {
+ synchronized (mutex) {
+ if (closed) {
+ return;
+ }
+
+ if (buf.readable()) {
+ this.buf.writeBytes(src);
+ this.buf.readerIndex(0);
+ } else {
+ this.buf.clear();
+ this.buf.writeBytes(src);
+ this.buf.readerIndex(0);
+ mutex.notifyAll();
+ }
+ }
+ }
+
+ public void throwException(IOException e) {
+ synchronized (mutex) {
+ if (exception == null) {
+ exception = e;
+
+ mutex.notifyAll();
+ }
+ }
+ }
+
+
+}
Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java?rev=932980&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/ChannelBufferOutputStream.java Sun Apr 11 19:44:48 2010
@@ -0,0 +1,85 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.io.IOException;
+import java.io.OutputStream;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.buffer.ChannelBuffers;
+import org.jboss.netty.channel.Channel;
+import org.jboss.netty.channel.ChannelFuture;
+
+public class ChannelBufferOutputStream extends OutputStream{
+
+ private final Channel channel;
+
+ private ChannelFuture lastChannelFuture;
+
+ public ChannelBufferOutputStream(Channel channel) {
+ this.channel = channel;
+ }
+
+ @Override
+ public void close() throws IOException {
+ try {
+ flush();
+ } finally {
+ channel.close().awaitUninterruptibly();
+ }
+ }
+
+ private void checkClosed() throws IOException {
+ if (!channel.isConnected()) {
+ throw new IOException("The session has been closed.");
+ }
+ }
+
+ private synchronized void write(ChannelBuffer buf) throws IOException {
+ checkClosed();
+ ChannelFuture future = channel.write(buf);
+ lastChannelFuture = future;
+ }
+
+ @Override
+ public void write(byte[] b, int off, int len) throws IOException {
+ write(ChannelBuffers.copiedBuffer(b.clone(), off, len));
+ }
+
+ @Override
+ public void write(int b) throws IOException {
+ ChannelBuffer buf = ChannelBuffers.buffer(1);
+ buf.writeByte((byte) b);
+ write(buf);
+ }
+
+ @Override
+ public synchronized void flush() throws IOException {
+ if (lastChannelFuture == null) {
+ return;
+ }
+
+ lastChannelFuture.awaitUninterruptibly();
+ if (!lastChannelFuture.isSuccess()) {
+ throw new IOException(
+ "The bytes could not be written to the session");
+ }
+ }
+
+}
Added: james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java
URL: http://svn.apache.org/viewvc/james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java?rev=932980&view=auto
==============================================================================
--- james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java (added)
+++ james/server/trunk/imapserver/src/main/java/org/apache/james/imapserver/netty/StreamChannelUpstreamHandler.java Sun Apr 11 19:44:48 2010
@@ -0,0 +1,166 @@
+/****************************************************************
+ * Licensed to the Apache Software Foundation (ASF) under one *
+ * or more contributor license agreements. See the NOTICE file *
+ * distributed with this work for additional information *
+ * regarding copyright ownership. The ASF licenses this file *
+ * to you under the Apache License, Version 2.0 (the *
+ * "License"); you may not use this file except in compliance *
+ * with the License. You may obtain a copy of the License at *
+ * *
+ * http://www.apache.org/licenses/LICENSE-2.0 *
+ * *
+ * Unless required by applicable law or agreed to in writing, *
+ * software distributed under the License is distributed on an *
+ * "AS IS" BASIS, WITHOUT WARRANTIES OR CONDITIONS OF ANY *
+ * KIND, either express or implied. See the License for the *
+ * specific language governing permissions and limitations *
+ * under the License. *
+ ****************************************************************/
+package org.apache.james.imapserver.netty;
+
+import java.io.IOException;
+import java.io.InputStream;
+import java.io.OutputStream;
+import java.util.HashMap;
+import java.util.Map;
+
+import org.jboss.netty.buffer.ChannelBuffer;
+import org.jboss.netty.channel.ChannelHandlerContext;
+import org.jboss.netty.channel.ChannelStateEvent;
+import org.jboss.netty.channel.ExceptionEvent;
+import org.jboss.netty.channel.MessageEvent;
+import org.jboss.netty.channel.SimpleChannelUpstreamHandler;
+
+public abstract class StreamChannelUpstreamHandler extends SimpleChannelUpstreamHandler{
+
+ private static final String KEY_IN = "stream.in";
+ private static final String KEY_OUT = "stream.out";
+
+ private int readTimeout;
+
+ private int writeTimeout;
+
+ protected StreamChannelUpstreamHandler() {
+ // Do nothing
+ }
+
+ /**
+ * Implement this method to execute your stream I/O logic;
+ * <b>please note that you must forward the process request to other
+ * thread or thread pool.</b>
+ */
+ protected abstract void processStreamIo(ChannelHandlerContext ctx, InputStream in,
+ OutputStream out);
+
+ /**
+ * Returns read timeout in seconds.
+ * The default value is <tt>0</tt> (disabled).
+ */
+ public int getReadTimeout() {
+ return readTimeout;
+ }
+
+ /**
+ * Sets read timeout in seconds.
+ * The default value is <tt>0</tt> (disabled).
+ */
+ public void setReadTimeout(int readTimeout) {
+ this.readTimeout = readTimeout;
+ }
+
+ /**
+ * Returns write timeout in seconds.
+ * The default value is <tt>0</tt> (disabled).
+ */
+ public int getWriteTimeout() {
+ return writeTimeout;
+ }
+
+ /**
+ * Sets write timeout in seconds.
+ * The default value is <tt>0</tt> (disabled).
+ */
+ public void setWriteTimeout(int writeTimeout) {
+ this.writeTimeout = writeTimeout;
+ }
+
+ @Override
+ public void channelOpen(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+
+ // Create streams
+ InputStream in = new BlockingChannelBufferInputStream();
+ OutputStream out = new ChannelBufferOutputStream(ctx.getChannel());
+ Map<Object, Object> attachment = getAttachment(ctx);
+ attachment.put(KEY_IN, in);
+ attachment.put(KEY_OUT, out);
+ processStreamIo(ctx, in, out);
+ ctx.setAttachment(attachment);
+ super.channelOpen(ctx, e);
+ }
+
+ @SuppressWarnings("unchecked")
+ protected Map<Object,Object> getAttachment(ChannelHandlerContext ctx) {
+ Map<Object,Object> attachment = (Map<Object, Object>) ctx.getAttachment();
+ if (attachment == null) {
+ attachment = new HashMap<Object, Object>();
+ ctx.setAttachment(attachment);
+ }
+ return attachment;
+ }
+ /**
+ * Closes streams
+ */
+ @Override
+ public void channelClosed(ChannelHandlerContext ctx, ChannelStateEvent e) throws Exception {
+ Map<Object, Object> attachment = getAttachment(ctx);
+
+ final InputStream in = (InputStream) attachment.get(KEY_IN);
+ final OutputStream out = (OutputStream) attachment.get(KEY_OUT);
+ try {
+ in.close();
+ } finally {
+ out.close();
+ }
+ super.channelClosed(ctx, e);
+ }
+
+
+ /**
+ * Forwards read data to input stream.
+ */
+ @Override
+ public void messageReceived(ChannelHandlerContext ctx, MessageEvent e) throws Exception {
+ final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
+ in.write((ChannelBuffer) e.getMessage());
+ super.messageReceived(ctx, e);
+ }
+
+ /**
+ * Forwards caught exceptions to input stream.
+ */
+ @Override
+ public void exceptionCaught(ChannelHandlerContext ctx, ExceptionEvent e) throws Exception {
+ final BlockingChannelBufferInputStream in = (BlockingChannelBufferInputStream) getAttachment(ctx).get(KEY_IN);
+ IOException ex = null;
+ if (e.getCause() instanceof StreamIoException) {
+ ex = (IOException) e.getCause().getCause();
+ } else if (e.getCause() instanceof IOException) {
+ ex = (IOException) e.getCause();
+ }
+
+ if (e != null && in != null) {
+ in.throwException(ex);
+ } else {
+ ctx.getChannel().close();
+ }
+ }
+
+ private static class StreamIoException extends RuntimeException {
+ private static final long serialVersionUID = 3976736960742503222L;
+
+ public StreamIoException(IOException cause) {
+ super(cause);
+ }
+ }
+
+}
---------------------------------------------------------------------
To unsubscribe, e-mail: server-dev-unsubscribe@james.apache.org
For additional commands, e-mail: server-dev-help@james.apache.org