You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@directory.apache.org by tr...@apache.org on 2004/12/04 13:30:01 UTC
svn commit: r109795 - in incubator/directory/seda/branches/trustin: . src/examples src/examples/org src/examples/org/apache src/examples/org/apache/netty src/examples/org/apache/netty/examples src/examples/org/apache/netty/examples/echo src/examples/org/apache/netty/examples/echo/server src/java/org/apache/netty/common/util src/java/org/apache/netty/downstream src/java/org/apache/netty/downstream/impl/tcp src/java/org/apache/netty/registry src/java/org/apache/netty/upstream
Author: trustin
Date: Sat Dec 4 04:30:00 2004
New Revision: 109795
URL: http://svn.apache.org/viewcvs?view=rev&rev=109795
Log:
Basic implementation of downstream TCP layer.
It doesn't provide full functionality yet.
Added:
incubator/directory/seda/branches/trustin/src/examples/
incubator/directory/seda/branches/trustin/src/examples/org/
incubator/directory/seda/branches/trustin/src/examples/org/apache/
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java (contents, props changed)
Modified:
incubator/directory/seda/branches/trustin/maven.xml
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/Service.java (props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java (contents, props changed)
incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
Modified: incubator/directory/seda/branches/trustin/maven.xml
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/maven.xml?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/maven.xml&r1=109794&p2=incubator/directory/seda/branches/trustin/maven.xml&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/maven.xml (original)
+++ incubator/directory/seda/branches/trustin/maven.xml Sat Dec 4 04:30:00 2004
@@ -1,7 +1,6 @@
<project default="test"
xmlns:ant="jelly:ant" xmlns:maven="jelly:maven">
- <!--
<preGoal name="java:compile">
<ant:path
id="my.other.src.dir"
@@ -10,6 +9,5 @@
id="maven.compile.src.set"
refid="my.other.src.dir"/>
</preGoal>
- -->
</project>
Added: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/EchoServerSessionHandler.java Sat Dec 4 04:30:00 2004
@@ -0,0 +1,48 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.examples.echo.server;
+
+import java.nio.ByteBuffer;
+
+import org.apache.netty.common.IdleStatus;
+import org.apache.netty.downstream.Session;
+import org.apache.netty.downstream.SessionHandler;
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class EchoServerSessionHandler implements SessionHandler {
+
+ public void sessionOpened(Session session) {
+ System.out.println(session.getRemoteAddress() + ": OPEN");
+ }
+
+ public void sessionClosed(Session session) {
+ System.out.println(session.getRemoteAddress() + ": CLOSED");
+ }
+
+ public void sessionIdle(Session session, IdleStatus status) {
+ System.out.println(session.getRemoteAddress() + ": IDLE");
+ }
+
+ public void exceptionCaught(Session session, Throwable cause) {
+ System.out.println(session.getRemoteAddress() + ": EXCEPTION");
+ cause.printStackTrace(System.out);
+ }
+
+ public void dataRead(Session session, ByteBuffer buf) {
+ System.out.println(session.getRemoteAddress() + ": READ (" + buf.remaining() + " B)");
+ session.getWriteBuffer().put(buf);
+ session.flush();
+ }
+
+ public void markRemoved(Session session, Object mark) {
+ }
+
+ public void writeBufferAvailable(Session session) {
+ }
+}
Added: incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/examples/org/apache/netty/examples/echo/server/Main.java Sat Dec 4 04:30:00 2004
@@ -0,0 +1,22 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.examples.echo.server;
+
+import java.net.InetSocketAddress;
+
+import org.apache.netty.downstream.Acceptor;
+import org.apache.netty.downstream.impl.tcp.TcpAcceptor;
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class Main {
+ public static void main(String[] args) throws Exception {
+ Acceptor acceptor = new TcpAcceptor();
+ acceptor.bind(new InetSocketAddress(8080), new EchoServerSessionHandler());
+ }
+}
Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/BasicSessionConfig.java Sat Dec 4 04:30:00 2004
@@ -0,0 +1,72 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.common.util;
+
+import org.apache.netty.common.IdleStatus;
+import org.apache.netty.common.SessionConfig;
+
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public abstract class BasicSessionConfig implements SessionConfig {
+ private int idleTimeForRead;
+ private int idleTimeForWrite;
+ private int idleTimeForBoth;
+
+ protected BasicSessionConfig() {
+ }
+
+ public int getIdleTime(IdleStatus status) {
+ if (status == IdleStatus.BOTH_IDLE)
+ return idleTimeForBoth;
+
+ if (status == IdleStatus.READER_IDLE)
+ return idleTimeForRead;
+
+ if (status == IdleStatus.WRITER_IDLE)
+ return idleTimeForWrite;
+
+ throw new IllegalArgumentException("Unknown idle status: " + status);
+ }
+
+ public long getIdleTimeInMillis(IdleStatus status) {
+ return getIdleTime(status) * 1000L;
+ }
+
+ public void setIdleTime(IdleStatus status, int idleTime) {
+ if (idleTime < 0)
+ throw new IllegalArgumentException("Illegal idle time: " +
+ idleTime);
+
+ if (status == IdleStatus.BOTH_IDLE)
+ idleTimeForBoth = idleTime;
+ else if (status == IdleStatus.READER_IDLE)
+ idleTimeForRead = idleTime;
+ else if (status == IdleStatus.WRITER_IDLE)
+ idleTimeForWrite = idleTime;
+ else
+ throw new IllegalArgumentException("Unknown idle status: " +
+ status);
+ }
+}
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/ByteBufferPool.java Sat Dec 4 04:30:00 2004
@@ -22,7 +22,7 @@
/**
* TODO Insert type comment.
*
- * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ * @version $Rev$, $Date$
* @author Trustin Lee (http://gleamynode.net/dev/)
*/
public class ByteBufferPool {
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/common/util/Queue.java Sat Dec 4 04:30:00 2004
@@ -15,7 +15,7 @@
*
*/
/*
- * @(#) $Id: Queue.java 47 2004-11-24 05:58:31Z trustin $
+ * @(#) $Id$
*/
package org.apache.netty.common.util;
@@ -33,7 +33,7 @@
* href="http://projects.gleamynode.net/">http://projects.gleamynode.net/
* </a>)
*
- * @version $Rev: 47 $, $Date: 2004-11-24 14:58:31 +0900 (Wed, 24 Nov 2004) $
+ * @version $Rev$, $Date$
*/
public class Queue implements Serializable {
private Object[] items;
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/Session.java Sat Dec 4 04:30:00 2004
@@ -38,12 +38,14 @@
void setHandler(SessionHandler handler);
void close();
+
+ ByteBuffer getReadBuffer();
ByteBuffer getWriteBuffer();
- void setMark(Object mark);
-
void flush();
+
+ void flush(Object mark);
boolean isConnected();
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/SessionHandler.java Sat Dec 4 04:30:00 2004
@@ -31,7 +31,7 @@
* @version $Rev$, $Date$
*/
public interface SessionHandler {
- void sessionEstablished(Session session);
+ void sessionOpened(Session session);
void sessionClosed(Session session);
@@ -42,4 +42,6 @@
void dataRead(Session session, ByteBuffer buf);
void markRemoved(Session session, Object mark);
+
+ void writeBufferAvailable(Session session);
}
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpAcceptor.java Sat Dec 4 04:30:00 2004
@@ -54,8 +54,6 @@
/**
* Creates a new instance.
* @throws IOException
- *
- *
*/
public TcpAcceptor() throws IOException {
selector = Selector.open();
@@ -134,7 +132,7 @@
continue;
TcpSession session = new TcpSession(ch, (SessionHandler) key.attachment());
- session.start();
+ TcpIoProcessor.getInstance().addSession(session);
}
} catch (IOException e) {
log.error("Unexpected exception.", e);
Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpIoProcessor.java Sat Dec 4 04:30:00 2004
@@ -0,0 +1,289 @@
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.io.IOException;
+import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
+import java.nio.channels.Selector;
+import java.nio.channels.SocketChannel;
+import java.util.ArrayList;
+import java.util.Iterator;
+import java.util.List;
+import java.util.Set;
+
+import org.apache.commons.logging.Log;
+import org.apache.commons.logging.LogFactory;
+
+/**
+ * TODO Document me.
+ * TODO Implement idleTime/bufferWritable/
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+class TcpIoProcessor {
+ private static final Log log = LogFactory.getLog(TcpIoProcessor.class);
+ private static final TcpIoProcessor instance;
+
+ static {
+ TcpIoProcessor tmp;
+ try {
+ tmp = new TcpIoProcessor();
+ } catch (IOException e) {
+ tmp = null;
+ }
+
+ instance = tmp;
+ }
+
+ public static TcpIoProcessor getInstance() throws IOException {
+ if (instance == null)
+ throw new IOException("Failed to open selector.");
+ return instance;
+ }
+
+ private final Selector selector;
+ private final List newSessions = new ArrayList();
+ private final List removingSessions = new ArrayList();
+ private final List flushingSessions = new ArrayList();
+ private Worker worker;
+
+ private TcpIoProcessor() throws IOException {
+ selector = Selector.open();
+ }
+
+ public void addSession(TcpSession session) {
+ if (worker == null) {
+ synchronized (this) {
+ if (worker == null) {
+ worker = new Worker();
+ worker.start();
+ }
+ }
+ }
+
+ synchronized (newSessions) {
+ newSessions.add(session);
+ }
+ selector.wakeup();
+ }
+
+ public void removeSession(TcpSession session) {
+ synchronized (removingSessions) {
+ removingSessions.add(session);
+ }
+ selector.wakeup();
+ }
+
+ public void flushSession(TcpSession session) {
+ synchronized (flushingSessions) {
+ flushingSessions.add(session);
+ }
+ selector.wakeup();
+ }
+
+ private class Worker extends Thread {
+ public Worker() {
+ super("TcpIoProcessor");
+ setDaemon(true);
+ }
+
+ public void run() {
+ for (;;) {
+ try {
+ int nKeys = selector.select();
+ addSessions();
+ if (nKeys > 0) {
+ processSessions(selector.selectedKeys());
+ }
+ flushSessions();
+ removeSessions();
+ } catch (IOException e) {
+ log.error("Unexpected exception.", e);
+ try {
+ Thread.sleep(1000);
+ } catch (InterruptedException e1) {
+ }
+ }
+ }
+ }
+ }
+
+ private void addSessions() {
+ if (newSessions.size() == 0)
+ return;
+
+ synchronized (newSessions) {
+ Iterator it = newSessions.iterator();
+ while (it.hasNext()) {
+ TcpSession session = (TcpSession) it.next();
+ SocketChannel ch = session.getChannel();
+ boolean registered;
+ try {
+ ch.configureBlocking(false);
+ session.setSelectionKey(ch.register(selector, SelectionKey.OP_READ, session));
+ registered = true;
+ } catch (IOException e) {
+ registered = false;
+ fireExceptionCaught(session, e);
+ }
+
+ if (registered) {
+ fireSessionOpened(session);
+ }
+ }
+
+ newSessions.clear();
+ }
+ }
+
+ private void removeSessions() {
+ if (removingSessions.size() == 0)
+ return;
+
+ synchronized (removingSessions) {
+ Iterator it = removingSessions.iterator();
+ while (it.hasNext()) {
+ TcpSession session = (TcpSession) it.next();
+ SocketChannel ch = session.getChannel();
+ session.getSelectionKey().cancel();
+ session.dispose();
+ try {
+ ch.close();
+ } catch (IOException e) {
+ fireExceptionCaught(session, e);
+ } finally {
+ fireSessionClosed(session);
+ }
+ }
+
+ removingSessions.clear();
+ }
+ }
+
+ private void processSessions(Set selectedKeys) {
+ Iterator it = selectedKeys.iterator();
+ while (it.hasNext()) {
+ SelectionKey key = (SelectionKey) it.next();
+ TcpSession session = (TcpSession) key.attachment();
+ if (key.isReadable()) {
+ read(session);
+ } else if (key.isWritable()) {
+ scheduleFlush(session);
+ }
+ }
+ }
+
+ private void read(TcpSession session) {
+ ByteBuffer readBuf = session.getReadBuffer();
+ SocketChannel ch = session.getChannel();
+ try {
+ int readBytes = 0;
+ int ret;
+
+ synchronized (readBuf) {
+ while ((ret = ch.read(readBuf)) > 0) {
+ readBytes += ret;
+ }
+
+ if (readBytes > 0) {
+ session.increaseReadBytes(readBytes);
+ readBuf.flip();
+ fireDataRead(session, readBuf);
+ readBuf.compact();
+ }
+ }
+
+ if (ret < 0) {
+ synchronized (removingSessions) {
+ removingSessions.add(session);
+ }
+ }
+ } catch (IOException e) {
+ fireExceptionCaught(session, e);
+ }
+ }
+
+ private void scheduleFlush(TcpSession session) {
+ session.getSelectionKey().interestOps(SelectionKey.OP_READ);
+ synchronized (flushingSessions) {
+ flushingSessions.add(session);
+ }
+ }
+
+ private void flushSessions() {
+ if (flushingSessions.size() == 0)
+ return;
+
+ synchronized (flushingSessions) {
+ Iterator it = flushingSessions.iterator();
+ while (it.hasNext()) {
+ TcpSession session = (TcpSession) it.next();
+ if (session.isClosed())
+ continue;
+
+ flush(session);
+ }
+
+ flushingSessions.clear();
+ }
+ }
+
+ private void flush(TcpSession session) {
+ ByteBuffer writeBuf = session.getWriteBuffer();
+ SocketChannel ch = session.getChannel();
+
+ try {
+ synchronized (writeBuf) {
+ writeBuf.flip();
+ int nBytes = ch.write(writeBuf);
+ writeBuf.compact();
+
+ if (nBytes > 0)
+ session.increaseWrittenBytes(nBytes);
+
+ int remaining = writeBuf.remaining();
+ if (remaining > 0){
+ // Kernel buffer is full
+ session.getSelectionKey().interestOps(SelectionKey.OP_READ | SelectionKey.OP_WRITE);
+ }
+ }
+ } catch (IOException e) {
+ fireExceptionCaught(session, e);
+ }
+ }
+
+ private void fireSessionOpened(TcpSession session) {
+ try {
+ session.getHandler().sessionOpened(session);
+ } catch (Throwable e) {
+ fireExceptionCaught(session, e);
+ }
+ }
+
+ private void fireSessionClosed(TcpSession session) {
+ try {
+ session.getHandler().sessionClosed(session);
+ } catch (Throwable e) {
+ fireExceptionCaught(session, e);
+ }
+ }
+
+ private void fireDataRead(TcpSession session, ByteBuffer readBuf) {
+ try {
+ session.getHandler().dataRead(session, readBuf);
+ } catch (Throwable e) {
+ fireExceptionCaught(session, e);
+ }
+ }
+
+ private void fireExceptionCaught(TcpSession session, Throwable cause) {
+ try {
+ session.getHandler().exceptionCaught(session, cause);
+ } catch (Throwable t) {
+ log.error("Exception from excaptionCaught.", t);
+ }
+ }
+
+}
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSession.java Sat Dec 4 04:30:00 2004
@@ -3,12 +3,15 @@
*/
package org.apache.netty.downstream.impl.tcp;
+import java.io.IOException;
import java.net.SocketAddress;
import java.nio.ByteBuffer;
+import java.nio.channels.SelectionKey;
import java.nio.channels.SocketChannel;
import org.apache.commons.lang.Validate;
import org.apache.netty.common.SessionConfig;
+import org.apache.netty.common.util.ByteBufferPool;
import org.apache.netty.downstream.Session;
import org.apache.netty.downstream.SessionHandler;
@@ -21,18 +24,44 @@
class TcpSession implements Session {
private final SocketChannel ch;
- private final SessionConfig config = new SimpleSessionConfig();
+ private final TcpSessionConfig config;
+ private final ByteBuffer readBuf;
+ private final ByteBuffer writeBuf;
+
+ private SelectionKey key;
private SessionHandler handler;
+ private long readBytes;
+ private long writtenBytes;
+ private long lastReadTime;
+ private long lastWriteTime;
/**
* Creates a new instance.
- *
- *
*/
TcpSession(SocketChannel ch, SessionHandler defaultHandler) {
this.ch = ch;
+ this.config = new TcpSessionConfig(ch);
+ this.readBuf = ByteBufferPool.open();
+ this.writeBuf = ByteBufferPool.open();
this.handler = defaultHandler;
}
+
+ SocketChannel getChannel() {
+ return ch;
+ }
+
+ SelectionKey getSelectionKey() {
+ return key;
+ }
+
+ void setSelectionKey(SelectionKey key) {
+ this.key = key;
+ }
+
+ void dispose() {
+ ByteBufferPool.close(readBuf);
+ ByteBufferPool.close(writeBuf);
+ }
public SessionHandler getHandler() {
return handler;
@@ -44,16 +73,35 @@
}
public void close() {
+ try {
+ TcpIoProcessor.getInstance().removeSession(this);
+ } catch (IOException e) {
+ // This cannot happen
+ }
}
-
+
+ public ByteBuffer getReadBuffer() {
+ return readBuf;
+ }
+
public ByteBuffer getWriteBuffer() {
- return null;
+ return writeBuf;
}
-
- public void setMark(Object mark) {
+
+ public void flush() {
+ try {
+ TcpIoProcessor.getInstance().flushSession(this);
+ } catch (IOException e) {
+ // This cannot happen
+ }
}
- public void flush() {
+ public void flush(Object mark) {
+ try {
+ TcpIoProcessor.getInstance().flushSession(this);
+ } catch (IOException e) {
+ // This cannot happen
+ }
}
public boolean isConnected() {
@@ -61,7 +109,7 @@
}
public boolean isClosed() {
- return !ch.isConnected();
+ return !isConnected();
}
public SessionConfig getConfig() {
@@ -83,15 +131,25 @@
public long getWrittenBytes() {
return writtenBytes;
}
+
+ void increaseReadBytes(int increment) {
+ readBytes += increment;
+ lastReadTime = System.currentTimeMillis();
+ }
+
+ void increaseWrittenBytes(int increment) {
+ writtenBytes += increment;
+ lastWriteTime = System.currentTimeMillis();
+ }
public long getLastIoTime() {
- return Math.max(lastReadtime, lastWriteTime);
+ return Math.max(lastReadTime, lastWriteTime);
}
public long getLastReadTime() {
return lastReadTime;
}
-
+
public long getLastWriteTime() {
return lastWriteTime;
}
Added: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java?view=auto&rev=109795
==============================================================================
--- (empty file)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/downstream/impl/tcp/TcpSessionConfig.java Sat Dec 4 04:30:00 2004
@@ -0,0 +1,90 @@
+/*
+ * Copyright 2004 The Apache Software Foundation
+ *
+ * Licensed under the Apache License, Version 2.0 (the "License");
+ * you may not use this file except in compliance with the License.
+ * You may obtain a copy of the License at
+ *
+ * http://www.apache.org/licenses/LICENSE-2.0
+ *
+ * Unless required by applicable law or agreed to in writing, software
+ * distributed under the License is distributed on an "AS IS" BASIS,
+ * WITHOUT WARRANTIES OR CONDITIONS OF ANY KIND, either express or implied.
+ * See the License for the specific language governing permissions and
+ * limitations under the License.
+ *
+ */
+/*
+ * @(#) $Id$
+ */
+package org.apache.netty.downstream.impl.tcp;
+
+import java.net.SocketException;
+
+import java.nio.channels.SocketChannel;
+
+import org.apache.netty.common.util.BasicSessionConfig;
+
+
+/**
+ * TODO Document me.
+ *
+ * @author Trustin Lee (trustin@apache.org)
+ * @version $Rev$, $Date$,
+ */
+public class TcpSessionConfig extends BasicSessionConfig {
+ private final SocketChannel ch;
+
+ TcpSessionConfig(SocketChannel ch) {
+ this.ch = ch;
+ }
+
+ public boolean getKeepAlive() throws SocketException {
+ return ch.socket().getKeepAlive();
+ }
+
+ public void setKeepAlive(boolean on) throws SocketException {
+ ch.socket().setKeepAlive(on);
+ }
+
+ public boolean getOOBInline() throws SocketException {
+ return ch.socket().getOOBInline();
+ }
+
+ public void setOOBInline(boolean on) throws SocketException {
+ ch.socket().setOOBInline(on);
+ }
+
+ public boolean getReuseAddress() throws SocketException {
+ return ch.socket().getReuseAddress();
+ }
+
+ public void setReuseAddress(boolean on) throws SocketException {
+ ch.socket().setReuseAddress(on);
+ }
+
+ public int getSoLinger() throws SocketException {
+ return ch.socket().getSoLinger();
+ }
+
+ public void setSoLinger(boolean on, int linger)
+ throws SocketException {
+ ch.socket().setSoLinger(on, linger);
+ }
+
+ public boolean getTcpNoDelay() throws SocketException {
+ return ch.socket().getTcpNoDelay();
+ }
+
+ public void setTcpNoDelay(boolean on) throws SocketException {
+ ch.socket().setTcpNoDelay(on);
+ }
+
+ public int getTrafficClass() throws SocketException {
+ return ch.socket().getTrafficClass();
+ }
+
+ public void setTrafficClass(int tc) throws SocketException {
+ ch.socket().setTrafficClass(tc);
+ }
+}
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/registry/ServiceRegistry.java Sat Dec 4 04:30:00 2004
@@ -30,7 +30,7 @@
*
* @author akarasulu@apache.org
* @author trustin@apache.org
- * @version $Rev: 56478 $, $Date$
+ * @version $Rev$, $Date$
*/
public interface ServiceRegistry {
void bind(Service service,
Modified: incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java
Url: http://svn.apache.org/viewcvs/incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java?view=diff&rev=109795&p1=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r1=109794&p2=incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java&r2=109795
==============================================================================
--- incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java (original)
+++ incubator/directory/seda/branches/trustin/src/java/org/apache/netty/upstream/SessionHandler.java Sat Dec 4 04:30:00 2004
@@ -29,7 +29,7 @@
* @version $Rev$, $Date$
*/
public interface SessionHandler {
- void sessionEstablished(Session session);
+ void sessionOpened(Session session);
void sessionClosed(Session session);