You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/12/04 13:30:01 UTC

svn commit: r109795 - in incubator/directory/seda/branches/trustin: . src/examples src/examples/org src/examples/org/apache src/examples/org/apache/netty src/examples/org/apache/netty/examples src/examples/org/apache/netty/examples/echo src/examples/org/apache/netty/examples/echo/server src/java/org/apache/netty/common/util src/java/org/apache/netty/downstream src/java/org/apache/netty/downstream/impl/tcp src/java/org/apache/netty/registry src/java/org/apache/netty/upstream

Author: trustin
Date: Sat Dec  4 04:30:00 2004
New Revision: 109795

URL: http://svn.apache.org/viewcvs?view=rev&rev=109795
Log:
Basic implementation of downstream TCP layer.
It doesn't provide full functionality yet.
Added:
   incubator/directory/seda/branches/trustin/src/examples/
   incubator/directory/seda/branches/trustin/src/examples/org/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java   (contents, props changed)
Modified:
   incubator/directory/seda/branches/trustin/maven.xml
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/Service.java   (props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java   (contents, props changed)
   incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java

Modified: incubator/directory/seda/branches/trustin/maven.xml
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/maven.xml?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/maven.xml&r1=109794&p2=incubator/directory/seda/branches/trustin/maven.xml&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/maven.xml	(original)
+++ incubator/directory/seda/branches/trustin/maven.xml	Sat Dec  4 04:30:00 2004
@@ -1,7 +1,6 @@
 <project default="test"
   xmlns:ant="jelly:ant" xmlns:maven="jelly:maven">
 
-  <!--
   <preGoal name="java:compile">
     <ant:path
       id="my.other.src.dir"
@@ -10,6 +9,5 @@
       id="maven.compile.src.set"
       refid="my.other.src.dir"/>
   </preGoal>
-  -->
 
 </project>

Added: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java	Sat Dec  4 04:30:00 2004
@@ -0,0 +1,48 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.examples.echo.server;
+
+import java.nio.ByteBuffer;
+
+import org.apache.netty.common.IdleStatus;
+import org.apache.netty.downstream.Session;
+import org.apache.netty.downstream.SessionHandler;
+
+/**
+ * TODO Document me.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$, 
+ */
+public class EchoServerSessionHandler implements SessionHandler {
+
+	public void sessionOpened(Session session) {
+		System.out.println(session.getRemoteAddress() + ": OPEN");
+	}
+
+	public void sessionClosed(Session session) {
+		System.out.println(session.getRemoteAddress() + ": CLOSED");
+	}
+
+	public void sessionIdle(Session session, IdleStatus status) {
+		System.out.println(session.getRemoteAddress() + ": IDLE");
+	}
+
+	public void exceptionCaught(Session session, Throwable cause) {
+		System.out.println(session.getRemoteAddress() + ": EXCEPTION");
+		cause.printStackTrace(System.out);
+	}
+
+	public void dataRead(Session session, ByteBuffer buf) {
+		System.out.println(session.getRemoteAddress() + ": READ (" + buf.remaining() + " B)");
+		session.getWriteBuffer().put(buf);
+		session.flush();
+	}
+
+	public void markRemoved(Session session, Object mark) {
+	}
+
+	public void writeBufferAvailable(Session session) {
+	}
+}

Added: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java	Sat Dec  4 04:30:00 2004
@@ -0,0 +1,22 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.examples.echo.server;
+
+import java.net.InetSocketAddress;
+
+import org.apache.netty.downstream.Acceptor;
+import org.apache.netty.downstream.impl.tcp.TcpAcceptor;
+
+/**
+ * TODO Document me.
+ * 
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$, 
+ */
+public class Main {
+	public static void main(String[] args) throws Exception {
+		Acceptor acceptor = new TcpAcceptor();
+		acceptor.bind(new InetSocketAddress(8080), new EchoServerSessionHandler());
+	}
+}

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java	Sat Dec  4 04:30:00 2004
@@ -0,0 +1,72 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.common.util;
+
+import org.apache.netty.common.IdleStatus;
+import org.apache.netty.common.SessionConfig;
+
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public abstract class BasicSessionConfig implements SessionConfig {
+    private int idleTimeForRead;
+    private int idleTimeForWrite;
+    private int idleTimeForBoth;
+
+    protected BasicSessionConfig() {
+    }
+
+    public int getIdleTime(IdleStatus status) {
+        if (status == IdleStatus.BOTH_IDLE)
+            return idleTimeForBoth;
+
+        if (status == IdleStatus.READER_IDLE)
+            return idleTimeForRead;
+
+        if (status == IdleStatus.WRITER_IDLE)
+            return idleTimeForWrite;
+
+        throw new IllegalArgumentException("Unknown idle status: " + status);
+    }
+
+    public long getIdleTimeInMillis(IdleStatus status) {
+        return getIdleTime(status) * 1000L;
+    }
+
+    public void setIdleTime(IdleStatus status, int idleTime) {
+        if (idleTime < 0)
+            throw new IllegalArgumentException("Illegal idle time: " +
+                                               idleTime);
+
+        if (status == IdleStatus.BOTH_IDLE)
+            idleTimeForBoth = idleTime;
+        else if (status == IdleStatus.READER_IDLE)
+            idleTimeForRead = idleTime;
+        else if (status == IdleStatus.WRITER_IDLE)
+            idleTimeForWrite = idleTime;
+        else
+            throw new IllegalArgumentException("Unknown idle status: " +
+                                               status);
+    }
+}

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java	Sat Dec  4 04:30:00 2004
@@ -22,7 +22,7 @@
 /**
  * TODO Insert type comment.
  *
- * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ * @version $Rev$, $Date$
  * @author Trustin Lee (http://gleamynode.net/dev/)
  */
 public class ByteBufferPool {

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java	Sat Dec  4 04:30:00 2004
@@ -15,7 +15,7 @@
  *
  */
 /*
- * @(#) $Id: Queue.java 47 2004-11-24 05:58:31Z trustin $
+ * @(#) $Id$
  */
 package org.apache.netty.common.util;
 
@@ -33,7 +33,7 @@
  *         href="http://projects.gleamynode.net/">http://projects.gleamynode.net/
  *         </a>)
  *
- * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ * @version $Rev$, $Date$
  */
 public class Queue implements Serializable {
     private Object[] items;

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java	Sat Dec  4 04:30:00 2004
@@ -38,12 +38,14 @@
     void setHandler(SessionHandler handler);
 
     void close();
+    
+    ByteBuffer getReadBuffer();
 
     ByteBuffer getWriteBuffer();
 
-    void setMark(Object mark);
-
     void flush();
+    
+    void flush(Object mark);
 
     boolean isConnected();
 

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java	Sat Dec  4 04:30:00 2004
@@ -31,7 +31,7 @@
  * @version $Rev$, $Date$
  */
 public interface SessionHandler {
-    void sessionEstablished(Session session);
+    void sessionOpened(Session session);
 
     void sessionClosed(Session session);
 
@@ -42,4 +42,6 @@
     void dataRead(Session session, ByteBuffer buf);
 
     void markRemoved(Session session, Object mark);
+    
+    void writeBufferAvailable(Session session);
 }

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java	Sat Dec  4 04:30:00 2004
@@ -54,8 +54,6 @@
     /**
      * Creates a new instance.
      * @throws IOException
-     *
-     *
      */
     public TcpAcceptor() throws IOException {
         selector = Selector.open();
@@ -134,7 +132,7 @@
                             continue;
                         
                         TcpSession session = new TcpSession(ch, (SessionHandler) key.attachment());
-                        session.start();
+                        TcpIoProcessor.getInstance().addSession(session);
                     }
                 } catch (IOException e) {
                     log.error("Unexpected exception.", e);

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java	Sat Dec  4 04:30:00 2004
@@ -0,0 +1,289 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * TODO Document me.
+ * TODO Implement idleTime/bufferWritable/
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$, 
+ */
+class TcpIoProcessor {
+	private static final Log log = LogFactory.getLog(TcpIoProcessor.class);
+	private static final TcpIoProcessor instance;
+	
+	static {
+		TcpIoProcessor tmp;
+		try {
+			tmp = new TcpIoProcessor();
+		} catch (IOException e) {
+			tmp = null;
+		}
+		
+		instance = tmp;
+	}
+	
+	public static TcpIoProcessor getInstance() throws IOException {
+		if (instance == null)
+			throw new IOException("Failed to open selector.");
+		return instance;
+	}
+	
+	private final Selector selector;
+	private final List newSessions = new ArrayList();
+	private final List removingSessions = new ArrayList();
+	private final List flushingSessions = new ArrayList();
+	private Worker worker;
+	
+	private TcpIoProcessor() throws IOException {
+		selector = Selector.open();
+	}
+	
+	public void addSession(TcpSession session) {
+		if (worker == null) {
+			synchronized (this) {
+				if (worker == null) {
+					worker = new Worker();
+					worker.start();
+				}
+			}
+		}
+		
+		synchronized (newSessions) {
+			newSessions.add(session);
+		}
+		selector.wakeup();
+	}
+	
+	public void removeSession(TcpSession session) {
+		synchronized (removingSessions) {
+			removingSessions.add(session);
+		}
+		selector.wakeup();
+	}
+	
+	public void flushSession(TcpSession session) {
+		synchronized (flushingSessions) {
+			flushingSessions.add(session);
+		}
+		selector.wakeup();
+	}
+	
+	private class Worker extends Thread {
+		public Worker() {
+			super("TcpIoProcessor");
+			setDaemon(true);
+		}
+		
+		public void run() {
+			for (;;) {
+				try {
+					int nKeys = selector.select();
+					addSessions();
+					if (nKeys > 0) {
+						processSessions(selector.selectedKeys());
+					}
+					flushSessions();
+					removeSessions();
+				} catch (IOException e) {
+					log.error("Unexpected exception.", e);
+					try {
+						Thread.sleep(1000);
+					} catch (InterruptedException e1) {
+					}
+				}
+			}
+		}
+	}
+	
+	private void addSessions() {
+		if (newSessions.size() == 0)
+			return;
+		
+		synchronized (newSessions) {
+			Iterator it = newSessions.iterator();
+			while (it.hasNext()) {
+				TcpSession session = (TcpSession) it.next();
+				SocketChannel ch = session.getChannel();
+				boolean registered;
+				try {
+					ch.configureBlocking(false);
+					session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
+					registered = true;
+				} catch (IOException e) {
+					registered = false;
+					fireExceptionCaught(session, e);
+				}
+				
+				if (registered) {
+					fireSessionOpened(session);
+				}
+			}
+			
+			newSessions.clear();
+		}
+	}
+
+	private void removeSessions() {
+		if (removingSessions.size() == 0)
+			return;
+
+		synchronized (removingSessions) {
+			Iterator it = removingSessions.iterator();
+			while (it.hasNext()) {
+				TcpSession session = (TcpSession) it.next();
+				SocketChannel ch = session.getChannel();
+				session.getSelectionKey().cancel();
+				session.dispose();
+				try {
+					ch.close();
+				} catch (IOException e) {
+					fireExceptionCaught(session, e);
+				} finally {
+					fireSessionClosed(session);
+				}
+			}
+			
+			removingSessions.clear();
+		}
+	}
+	
+	private void processSessions(Set selectedKeys) {
+		Iterator it = selectedKeys.iterator();
+		while (it.hasNext()) {
+			SelectionKey key = (SelectionKey) it.next();
+			TcpSession session = (TcpSession) key.attachment();
+			if (key.isReadable()) {
+				read(session);
+			} else if (key.isWritable()) {
+				scheduleFlush(session);
+			}
+		}
+	}
+	
+	private void read(TcpSession session) {
+		ByteBuffer readBuf = session.getReadBuffer();
+		SocketChannel ch = session.getChannel();
+		try {
+			int readBytes = 0;
+			int ret;
+			
+			synchronized (readBuf) {
+				while ((ret = ch.read(readBuf)) > 0) {
+					readBytes += ret;
+				}
+				
+				if (readBytes > 0) {
+					session.increaseReadBytes(readBytes);
+					readBuf.flip();
+					fireDataRead(session, readBuf);
+					readBuf.compact();
+				}
+			}
+			
+			if (ret < 0) {
+				synchronized (removingSessions) {
+					removingSessions.add(session);
+				}
+			}
+		} catch (IOException e) {
+			fireExceptionCaught(session, e);
+		}
+	}
+
+	private void scheduleFlush(TcpSession session) {
+		session.getSelectionKey().interestOps(SelectionKey.OP_READ);
+		synchronized (flushingSessions) {
+			flushingSessions.add(session);
+		}
+	}
+
+	private void flushSessions() {
+		if (flushingSessions.size() == 0)
+			return;
+
+		synchronized (flushingSessions) {
+			Iterator it = flushingSessions.iterator();
+			while (it.hasNext()) {
+				TcpSession session = (TcpSession) it.next();
+				if (session.isClosed())
+					continue;
+
+				flush(session);
+			}
+			
+			flushingSessions.clear();
+		}
+	}
+	
+	private void flush(TcpSession session) {
+		ByteBuffer writeBuf = session.getWriteBuffer();
+		SocketChannel ch = session.getChannel();
+		
+		try {
+			synchronized (writeBuf) {
+				writeBuf.flip();
+				int nBytes = ch.write(writeBuf);
+				writeBuf.compact();
+				
+				if (nBytes > 0)
+					session.increaseWrittenBytes(nBytes);
+
+				int remaining = writeBuf.remaining();
+				if (remaining > 0){
+					// Kernel buffer is full
+					session.getSelectionKey().interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+				}
+			}
+		} catch (IOException e) {
+			fireExceptionCaught(session, e);
+		}
+	}
+
+	private void fireSessionOpened(TcpSession session) {
+		try {
+			session.getHandler().sessionOpened(session);
+		} catch (Throwable e) {
+			fireExceptionCaught(session, e);
+		}
+	}
+	
+	private void fireSessionClosed(TcpSession session) {
+		try {
+			session.getHandler().sessionClosed(session);
+		} catch (Throwable e) {
+			fireExceptionCaught(session, e);
+		}
+	}
+	
+	private void fireDataRead(TcpSession session, ByteBuffer readBuf) {
+		try {
+			session.getHandler().dataRead(session, readBuf);
+		} catch (Throwable e) {
+			fireExceptionCaught(session, e);
+		}
+	}
+	
+	private void fireExceptionCaught(TcpSession session, Throwable cause) {
+		try {
+			session.getHandler().exceptionCaught(session, cause);
+		} catch (Throwable t) {
+			log.error("Exception from excaptionCaught.", t);
+		}
+	}
+
+}

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java	Sat Dec  4 04:30:00 2004
@@ -3,12 +3,15 @@
  */
 package org.apache.netty.downstream.impl.tcp;
 
+import java.io.IOException;
 import java.net.SocketAddress;
 import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
 import java.nio.channels.SocketChannel;
 
 import org.apache.commons.lang.Validate;
 import org.apache.netty.common.SessionConfig;
+import org.apache.netty.common.util.ByteBufferPool;
 import org.apache.netty.downstream.Session;
 import org.apache.netty.downstream.SessionHandler;
 
@@ -21,18 +24,44 @@
 class TcpSession implements Session {
 
     private final SocketChannel ch;
-    private final SessionConfig config = new SimpleSessionConfig();
+    private final TcpSessionConfig config;
+    private final ByteBuffer readBuf;
+    private final ByteBuffer writeBuf;
+
+    private SelectionKey key;
     private SessionHandler handler;
+    private long readBytes;
+	private long writtenBytes;
+	private long lastReadTime;
+	private long lastWriteTime;
     
     /**
      * Creates a new instance.
-     * 
-     * 
      */
     TcpSession(SocketChannel ch, SessionHandler defaultHandler) {
         this.ch = ch;
+        this.config = new TcpSessionConfig(ch);
+        this.readBuf = ByteBufferPool.open();
+        this.writeBuf = ByteBufferPool.open();
         this.handler = defaultHandler;
     }
+    
+    SocketChannel getChannel() {
+    	return ch;
+    }
+    
+    SelectionKey getSelectionKey() {
+    	return key;
+    }
+    
+    void setSelectionKey(SelectionKey key) {
+    	this.key = key;
+    }
+    
+    void dispose() {
+    	ByteBufferPool.close(readBuf);
+    	ByteBufferPool.close(writeBuf);
+    }
 
     public SessionHandler getHandler() {
         return handler;
@@ -44,16 +73,35 @@
     }
 
     public void close() {
+    	try {
+			TcpIoProcessor.getInstance().removeSession(this);
+		} catch (IOException e) {
+			// This cannot happen
+		}
     }
-
+    
+    public ByteBuffer getReadBuffer() {
+    	return readBuf;
+    }
+    
     public ByteBuffer getWriteBuffer() {
-        return null;
+        return writeBuf;
     }
-
-    public void setMark(Object mark) {
+    
+    public void flush() {
+    	try {
+			TcpIoProcessor.getInstance().flushSession(this);
+		} catch (IOException e) {
+			// This cannot happen
+		}
     }
 
-    public void flush() {
+    public void flush(Object mark) {
+    	try {
+			TcpIoProcessor.getInstance().flushSession(this);
+		} catch (IOException e) {
+			// This cannot happen
+		}
     }
 
     public boolean isConnected() {
@@ -61,7 +109,7 @@
     }
 
     public boolean isClosed() {
-        return !ch.isConnected();
+        return !isConnected();
     }
 
     public SessionConfig getConfig() {
@@ -83,15 +131,25 @@
     public long getWrittenBytes() {
         return writtenBytes;
     }
+    
+    void increaseReadBytes(int increment) {
+   		readBytes += increment;
+    	lastReadTime = System.currentTimeMillis();
+    }
+    
+    void increaseWrittenBytes(int increment) {
+    	writtenBytes += increment;
+    	lastWriteTime = System.currentTimeMillis();
+    }
 
     public long getLastIoTime() {
-        return Math.max(lastReadtime, lastWriteTime);
+        return Math.max(lastReadTime, lastWriteTime);
     }
 
     public long getLastReadTime() {
         return lastReadTime;
     }
-
+    
     public long getLastWriteTime() {
         return lastWriteTime;
     }

Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java	Sat Dec  4 04:30:00 2004
@@ -0,0 +1,90 @@
+/*
+ *   Copyright 2004 The Apache Software Foundation
+ *
+ *   Licensed under the Apache License, Version 2.0 (the "License");
+ *   you may not use this file except in compliance with the License.
+ *   You may obtain a copy of the License at
+ *
+ *       http://www.apache.org/licenses/LICENSE-2.0
+ *
+ *   Unless required by applicable law or agreed to in writing, software
+ *   distributed under the License is distributed on an "AS IS" BASIS,
+ *   WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ *   See the License for the specific language governing permissions and
+ *   limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.net.SocketException;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.netty.common.util.BasicSessionConfig;
+
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class TcpSessionConfig extends BasicSessionConfig {
+    private final SocketChannel ch;
+
+    TcpSessionConfig(SocketChannel ch) {
+        this.ch = ch;
+    }
+
+    public boolean getKeepAlive() throws SocketException {
+        return ch.socket().getKeepAlive();
+    }
+
+    public void setKeepAlive(boolean on) throws SocketException {
+        ch.socket().setKeepAlive(on);
+    }
+
+    public boolean getOOBInline() throws SocketException {
+        return ch.socket().getOOBInline();
+    }
+
+    public void setOOBInline(boolean on) throws SocketException {
+        ch.socket().setOOBInline(on);
+    }
+
+    public boolean getReuseAddress() throws SocketException {
+        return ch.socket().getReuseAddress();
+    }
+
+    public void setReuseAddress(boolean on) throws SocketException {
+        ch.socket().setReuseAddress(on);
+    }
+
+    public int getSoLinger() throws SocketException {
+        return ch.socket().getSoLinger();
+    }
+
+    public void setSoLinger(boolean on, int linger)
+                     throws SocketException {
+        ch.socket().setSoLinger(on, linger);
+    }
+
+    public boolean getTcpNoDelay() throws SocketException {
+        return ch.socket().getTcpNoDelay();
+    }
+
+    public void setTcpNoDelay(boolean on) throws SocketException {
+        ch.socket().setTcpNoDelay(on);
+    }
+
+    public int getTrafficClass() throws SocketException {
+        return ch.socket().getTrafficClass();
+    }
+
+    public void setTrafficClass(int tc) throws SocketException {
+        ch.socket().setTrafficClass(tc);
+    }
+}

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java	Sat Dec  4 04:30:00 2004
@@ -30,7 +30,7 @@
  *
  * @author akarasulu@apache.org
  * @author trustin@apache.org
- * @version $Rev: 56478 $, $Date$
+ * @version $Rev$, $Date$
  */
 public interface ServiceRegistry {
     void bind(Service service,

Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java	(original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java	Sat Dec  4 04:30:00 2004
@@ -29,7 +29,7 @@
  * @version $Rev$, $Date$
  */
 public interface SessionHandler {
-    void sessionEstablished(Session session);
+    void sessionOpened(Session session);
 
     void sessionClosed(Session session);