You are viewing a plain text version of this content. The canonical link for it is here.
Posted to commits@mina.apache.org by jv...@apache.org on 2007/09/17 18:24:21 UTC
svn commit: r576502 - in
/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr:
APRConnector.java APRFilterChain.java APRIoProcessor.java APRSession.java
APRSessionImpl.java
Author: jvermillard
Date: Mon Sep 17 09:24:19 2007
New Revision: 576502
URL: http://svn.apache.org/viewvc?rev=576502&view=rev
Log:
trying to match last API changes (not compiling)
Removed:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRFilterChain.java
Modified:
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java?rev=576502&r1=576501&r2=576502&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRConnector.java Mon Sep 17 09:24:19 2007
@@ -25,9 +25,9 @@
import java.util.concurrent.Executor;
import org.apache.mina.common.AbstractIoConnector;
-import org.apache.mina.common.AbstractIoFilterChain;
import org.apache.mina.common.ConnectFuture;
import org.apache.mina.common.DefaultConnectFuture;
+import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.IoConnector;
import org.apache.mina.common.IoServiceListenerSupport;
import org.apache.mina.common.TransportMetadata;
@@ -147,7 +147,7 @@
// Set the ConnectFuture of the specified session, which will be
// removed and notified by AbstractIoFilterChain eventually.
- session.setAttribute(AbstractIoFilterChain.CONNECT_FUTURE, future);
+ session.setAttribute(DefaultIoFilterChain.CONNECT_FUTURE, future);
// Forward the remaining process to the APRIoProcessor.
// it's will validate the COnnectFuture when the session is in the poll set
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java?rev=576502&r1=576501&r2=576502&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRIoProcessor.java Mon Sep 17 09:24:19 2007
@@ -20,22 +20,19 @@
package org.apache.mina.transport.apr;
import java.util.HashMap;
+import java.util.Iterator;
import java.util.Map;
import java.util.Queue;
import java.util.concurrent.ConcurrentLinkedQueue;
import java.util.concurrent.Executor;
-import org.apache.mina.common.AbstractIoFilterChain;
+import org.apache.mina.common.AbstractIoProcessor;
+import org.apache.mina.common.AbstractIoSession;
import org.apache.mina.common.ByteBuffer;
import org.apache.mina.common.ConnectFuture;
-import org.apache.mina.common.ExceptionMonitor;
-import org.apache.mina.common.IdleStatus;
-import org.apache.mina.common.IoFilterChain;
-import org.apache.mina.common.IoService;
+import org.apache.mina.common.DefaultIoFilterChain;
+import org.apache.mina.common.FileRegion;
import org.apache.mina.common.IoSession;
-import org.apache.mina.common.WriteRequest;
-import org.apache.mina.common.IoServiceListenerSupport;
-import org.apache.mina.util.NamePreservingRunnable;
import org.apache.tomcat.jni.Error;
import org.apache.tomcat.jni.Poll;
import org.apache.tomcat.jni.Pool;
@@ -44,6 +41,8 @@
import org.slf4j.Logger;
import org.slf4j.LoggerFactory;
+import com.sun.org.apache.bcel.internal.generic.GETSTATIC;
+
/**
* The class in charge of processing socket level IO events for the {@link APRConnector}
*
@@ -51,43 +50,29 @@
* @version $Rev: $, $Date: $
*/
-class APRIoProcessor {
+class APRIoProcessor extends AbstractIoProcessor {
private final Logger logger = LoggerFactory.getLogger(getClass());
private final Object lock = new Object();
- private final String threadName;
-
- private final Executor executor;
-
private long pool = 0; // memory pool
private long pollset = 0; // socket poller
- private final Queue<APRSessionImpl> newSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
-
- private final Queue<APRSessionImpl> removingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
-
- private final Queue<APRSessionImpl> trafficControllingSessions = new ConcurrentLinkedQueue<APRSessionImpl>();
-
private final Map<Long, APRSessionImpl> managedSessions = new HashMap<Long, APRSessionImpl>();
- private long lastIdleCheckTime = System.currentTimeMillis();
-
- private int socketCount = 0;
-
- private Worker worker;
+// private Worker worker;
APRIoProcessor(String threadName, Executor executor) {
- this.threadName = threadName;
- this.executor = executor;
+ super(threadName,executor);
+
// initialize a memory pool for APR functions
pool = Pool.create(APRLibrary.getLibrary().getPool());
try {
- // TODO : optimize/parametrize those values
+ // TODO : optimize/parameterize those values
pollset = Poll
.create(
32,
@@ -97,103 +82,186 @@
} catch (Error e) {
logger.error("APR Error : " + e.getDescription(), e);
- // TODO : send that to the good logger
}
}
- void addNew(APRSessionImpl session) {
- newSessions.offer(session);
- startupWorker();
+ @Override
+ protected Iterator<AbstractIoSession> allSessions() throws Exception {
+ // TODO Auto-generated method stub
+ return null;
}
- void remove(APRSessionImpl session) {
- scheduleRemove(session);
- startupWorker();
+ @Override
+ protected void doAdd(IoSession sess) throws Exception {
+ APRSessionImpl session = (APRSessionImpl) sess;
+ int rv;
+ rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
+ if (rv == Status.APR_SUCCESS) {
+ System.out.println("Added session to pollset");
+ managedSessions.put(session.getAPRSocket(), session);
+ } else
+ throw new RuntimeException("APR error while Poll.add(..) : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
+ }
+
+ @Override
+ protected void doRemove(IoSession session) throws Exception {
+ remove(session); // will schedule it
+ }
+
+ @Override
+ protected int read(IoSession sess, ByteBuffer buffer) throws Exception {
+ APRSessionImpl session=(APRSessionImpl)sess;
+
+ byte[] buf = session.getReadBuffer();
+ // FIXME : hardcoded read value for testing
+ int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
+ if (bytes > 0) {
+ buffer.put(buf);
+ } else if (bytes < 0) {
+ logger.debug("Read {} bytes, need closing ?", bytes);
+ return -1;
+ }
+ return bytes;
}
+
- private void startupWorker() {
- synchronized (lock) {
- if (worker == null) {
- worker = new Worker();
- executor.execute(new NamePreservingRunnable(worker));
+ private long[] pollResult;
+
+ @Override
+ protected boolean select(int timeout) throws Exception {
+ // poll the socket descriptors
+ /* is it OK ? : Two times size of the created pollset */
+ long[] pollResult = new long[managedSessions.size() * 2];
+
+ int rv = Poll.poll(pollset, 1000 * timeout, pollResult, false);
+ if (rv > 0) {
+ return true;
+ } else if(rv<0) {
+ if(rv!=-100002) { // timeout ( FIXME : can't find the good constant in APR)
+ System.err.println("APR Poll error : "+Error.strerror(-1*rv)+" "+rv);
+ throw new RuntimeException("APR polling error : "+Error.strerror(-1*rv)+" ( code : "+rv+")");
}
}
+ return false;
}
- void flush(APRSessionImpl session) {
- // re-add the session to polling with POLLOUT flag
- pollOutSession(session);
+ @Override
+ protected Iterator<AbstractIoSession> selectedSessions() throws Exception {
+ return new PollSetIterator(pollResult);
}
- private void scheduleRemove(APRSessionImpl session) {
- removingSessions.offer(session);
+ @Override
+ protected SessionState state(IoSession session) {
+ long socket=((APRSessionImpl)session).getAPRSocket();
+ if(socket>0)
+ return SessionState.OPEN;
+ else if(managedSessions.get(socket)!=null)
+ return SessionState.PREPARING; // will occur ?
+ else
+ return SessionState.CLOSED;
}
- // TODO : do something with traffic control
- private void scheduleTrafficControl(APRSessionImpl session) {
- trafficControllingSessions.offer(session);
+ @Override
+ protected long transferFile(IoSession session, FileRegion region)
+ throws Exception {
+ throw new UnsupportedOperationException("Not supposed for APR (TODO)");
}
- private void doAddNew() {
- for (;;) {
- APRSessionImpl session = newSessions.poll();
+ @Override
+ protected void wakeup() {
+ // FIXME : is it possible to interrupt a Poll.poll ?
+
+ }
- if (session == null) {
- break;
+ @Override
+ protected int write(IoSession session, ByteBuffer buf) throws Exception {
+ // be sure APR_SO_NONBLOCK was set, or it will block
+ int toWrite = buf.remaining();
+
+ int writtenBytes;
+ // APR accept ByteBuffer, only if they are Direct ones, due to native code
+ if (buf.isDirect()) {
+ writtenBytes = Socket.sendb( ((APRSessionImpl)session).getAPRSocket(), buf.buf(),
+ 0, toWrite);
+ } else {
+ writtenBytes = Socket.send( ((APRSessionImpl)session).getAPRSocket(), buf.array(),
+ 0, toWrite);
+ // FIXME : kludgy ?
+ buf.position(buf.position() + writtenBytes);
+ }
+ return writtenBytes;
+ }
+
+ @Override
+ protected boolean isOpRead(IoSession sess) throws Exception {
+ APRSessionImpl session=(APRSessionImpl)sess;
+ long[] descriptors=new long[managedSessions.size()*2];
+ Poll.pollset(pollset, descriptors);
+ for(int i=0;i<managedSessions.size();i++) {
+ if(descriptors[i*2+1]== session.getAPRSocket()) {
+ return (descriptors[i*2] & Poll.APR_POLLOUT) >0;
}
+ }
+ return false;
+ }
- // polling the socket for read
- System.err.println("pollset : " + pollset);
- System.err.println("Socket : " + session.getAPRSocket());
- int rv;
- rv = Poll
- .add(pollset, session.getAPRSocket(), Poll.APR_POLLIN/*| Poll.APR_POLLOUT*/);
- if (rv == Status.APR_SUCCESS) {
- ((ConnectFuture) session
- .getAttribute(AbstractIoFilterChain.CONNECT_FUTURE))
- .setSession(session);
- System.out.println("Added worker to pollset");
- managedSessions.put(session.getAPRSocket(), session);
- socketCount++;
- // AbstractIoFilterChain.CONNECT_FUTURE is cleared inside here
- // in AbstractIoFilterChain.fireSessionOpened().
- getServiceListeners(session).fireSessionCreated(session);
- } else {
- // FIXME: find a way to bring the real APR error from returned codes
- session.getFilterChain().fireExceptionCaught(
- session,
- new RuntimeException("APR Error : "
- + Error.strerror(rv)));
+ @Override
+ protected boolean isOpWrite(IoSession sess) throws Exception {
+ APRSessionImpl session=(APRSessionImpl)sess;
+ long[] descriptors=new long[managedSessions.size()*2];
+ Poll.pollset(pollset, descriptors);
+ for(int i=0;i<managedSessions.size();i++) {
+ if(descriptors[i*2+1]== session.getAPRSocket()) {
+ return (descriptors[i*2] & Poll.APR_POLLIN) >0;
}
}
+ return false;
}
- private void doRemove() {
- for (;;) {
- APRSessionImpl session = removingSessions.poll();
-
- if (session == null) {
- break;
+ @Override
+ protected boolean isReadable(IoSession session) throws Exception {
+ long socket=((APRSessionImpl)session).getAPRSocket();
+ for(int i=0;i<pollResult.length/2;i++) {
+ if(pollResult[i+1]==socket) {
+ if( (pollResult[i]&Poll.APR_POLLIN) >0 )
+ return true;
+ else
+ return false;
}
+ }
+ return false;
+ }
- // remove of the pollset
- Poll.remove(pollset, session.getAPRSocket());
-
- // close the socket
- Socket.close(session.getAPRSocket());
- clearWriteRequestQueue(session);
- getServiceListeners(session).fireSessionDestroyed(session);
+ @Override
+ protected boolean isWritable(IoSession session) throws Exception {
+ long socket=((APRSessionImpl)session).getAPRSocket();
+ for(int i=0;i<pollResult.length/2;i++) {
+ if(pollResult[i+1]==socket) {
+ if( (pollResult[i]&Poll.APR_POLLOUT) >0 )
+ return true;
+ else
+ return false;
+ }
}
+ return false;
}
- private void pollOutSession(APRSessionImpl session) {
+ @Override
+ protected void setOpRead(IoSession sess, boolean value) throws Exception {
+ APRSessionImpl session=(APRSessionImpl)sess;
int rv = Poll.remove(pollset, session.getAPRSocket());
if (rv != Status.APR_SUCCESS) {
System.err.println("poll.remove Error : " + Error.strerror(rv));
}
- rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
- | Poll.APR_POLLOUT);
+ boolean write=isOpWrite(sess);
+
+ if(write)
+ rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
+ | Poll.APR_POLLOUT);
+ else
+ rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
+
if (rv == Status.APR_SUCCESS) {
// ok
} else {
@@ -201,211 +269,133 @@
}
}
- private void read(APRSessionImpl session) {
- byte[] buf = session.getReadBuffer();
- // FIXME : hardcoded read value for testing
- int bytes = Socket.recv(session.getAPRSocket(), buf, 0, 1024);
- if (bytes > 0) {
- ByteBuffer bbuf = ByteBuffer.allocate(bytes);
- bbuf.put(buf, 0, bytes);
- bbuf.flip();
- session.increaseReadBytes(bytes);
- session.getFilterChain().fireMessageReceived(session, bbuf);
- } else if (bytes < 0) {
- logger.debug("Read {} bytes, scheduling for remove", bytes);
- scheduleRemove(session);
+ @Override
+ protected void setOpWrite(IoSession sess, boolean value)
+ throws Exception {
+ APRSessionImpl session=(APRSessionImpl)sess;
+ int rv = Poll.remove(pollset, session.getAPRSocket());
+ if (rv != Status.APR_SUCCESS) {
+ System.err.println("poll.remove Error : " + Error.strerror(rv));
}
+ boolean read=isOpRead(sess);
+
+ if(read)
+ rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN
+ | Poll.APR_POLLOUT);
+ else
+ rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLOUT);
+
+ if (rv == Status.APR_SUCCESS) {
+ // ok
+ } else {
+ System.err.println("poll.add Error : " + Error.strerror(rv));
+ }
}
- private void write(APRSessionImpl session) {
- if (session.getWriteRequestQueue().size() <= 0)
- return;
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
-
- for (;;) {
-
- WriteRequest req;
-
- synchronized (writeRequestQueue) {
- req = writeRequestQueue.peek();
- }
-
- if (req == null) {
- // remove of write polling
- int rv = Poll.remove(pollset, session.getAPRSocket());
- if (rv != Status.APR_SUCCESS) {
- System.err.println("poll.remove Error : "
- + Error.strerror(rv));
- }
- rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
- if (rv == Status.APR_SUCCESS) {
- // ok
- } else {
- System.err
- .println("poll.add Error : " + Error.strerror(rv));
- }
- break;
- }
-
- ByteBuffer buf = (ByteBuffer) req.getMessage();
- if (buf.remaining() == 0) {
- synchronized (writeRequestQueue) {
- writeRequestQueue.poll();
- }
- session.increaseWrittenMessages();
- buf.reset();
- session.getFilterChain().fireMessageSent(session, req);
- continue;
- }
- // be sure APR_SO_NONBLOCK was set, or it will block
- int toWrite = buf.remaining();
-
- int writtenBytes;
- // APR accept ByteBuffer, only if they are Direct ones, due to native code
- if (buf.isDirect()) {
- writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
- 0, toWrite);
- } else {
- writtenBytes = Socket.send(session.getAPRSocket(), buf.array(),
- 0, toWrite);
- // FIXME : kludgy ?
- buf.position(buf.position() + writtenBytes);
- }
- if (writtenBytes > 0) {
- // increase
+ public void remove(IoSession session) {
+ Poll.remove(pollset, ((APRSessionImpl)session).getAPRSocket());
+ Socket.close(((APRSessionImpl)session).getAPRSocket());
+ }
+
+
+
+// private void write(APRSessionImpl session) {
+// if (session.getWriteRequestQueue().size() <= 0)
+// return;
+// Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
+//
+// for (;;) {
+//
+// WriteRequest req;
+//
+// synchronized (writeRequestQueue) {
+// req = writeRequestQueue.peek();
+// }
+//
+// if (req == null) {
+// // remove of write polling
+// int rv = Poll.remove(pollset, session.getAPRSocket());
+// if (rv != Status.APR_SUCCESS) {
+// System.err.println("poll.remove Error : "
+// + Error.strerror(rv));
+// }
+// rv = Poll.add(pollset, session.getAPRSocket(), Poll.APR_POLLIN);
+// if (rv == Status.APR_SUCCESS) {
+// // ok
+// } else {
+// System.err
+// .println("poll.add Error : " + Error.strerror(rv));
+// }
+// break;
+// }
+//
+// ByteBuffer buf = (ByteBuffer) req.getMessage();
+// if (buf.remaining() == 0) {
+// synchronized (writeRequestQueue) {
+// writeRequestQueue.poll();
+// }
+// session.increaseWrittenMessages();
+// buf.reset();
+// session.getFilterChain().fireMessageSent(req);
+// continue;
+// }
+// // be sure APR_SO_NONBLOCK was set, or it will block
+// int toWrite = buf.remaining();
+//
+// int writtenBytes;
+// // APR accept ByteBuffer, only if they are Direct ones, due to native code
+// if (buf.isDirect()) {
+// writtenBytes = Socket.sendb(session.getAPRSocket(), buf.buf(),
+// 0, toWrite);
+// } else {
+// writtenBytes = Socket.send(session.getAPRSocket(), buf.array(),
+// 0, toWrite);
+// // FIXME : kludgy ?
+// buf.position(buf.position() + writtenBytes);
+// }
+// if (writtenBytes > 0) {
+// // increase
+//
+// session.increaseWrittenBytes(writtenBytes);
+// } else {
+// // FIXME : send the exception
+// System.err.println(Error.strerror(writtenBytes * -1));
+// }
+//
+// // kernel buffer full for this socket, wait next polling
+// if (buf.hasRemaining())
+// break;
+// }
+// }
- session.increaseWrittenBytes(writtenBytes);
- } else {
- // FIXME : send the exception
- System.err.println(Error.strerror(writtenBytes * -1));
- }
- // kernel buffer full for this socket, wait next polling
- if (buf.hasRemaining())
- break;
- }
+ @Override
+ protected void finalize() throws Throwable {
+ // TODO : necessary I think, need to check APR doc
+ Pool.clear(pool);
}
- private void clearWriteRequestQueue(APRSessionImpl session) {
- Queue<WriteRequest> writeRequestQueue = session.getWriteRequestQueue();
- WriteRequest req;
-
- while ((req = writeRequestQueue.poll()) != null) {
- req.getFuture().setWritten(false);
+ protected class PollSetIterator implements Iterator<AbstractIoSession> {
+ private long[] pollResult;
+
+ int index=0;
+ public PollSetIterator(long[] pollResult) {
+ this.pollResult=pollResult;
}
- }
- private void notifyIdleness() {
- // process idle sessions
- long currentTime = System.currentTimeMillis();
- if ((currentTime - lastIdleCheckTime) >= 1000) {
- lastIdleCheckTime = currentTime;
- for (APRSessionImpl session : managedSessions.values()) {
- notifyIdleness(session, currentTime);
- }
+ public boolean hasNext() {
+ return index*2< pollResult.length;
}
- }
- private void notifyIdleness(APRSessionImpl session, long currentTime) {
- notifyIdleness0(session, currentTime, session.getConfig()
- .getIdleTimeInMillis(IdleStatus.BOTH_IDLE),
- IdleStatus.BOTH_IDLE, Math.max(session.getLastIoTime(), session
- .getLastIdleTime(IdleStatus.BOTH_IDLE)));
- notifyIdleness0(session, currentTime, session.getConfig()
- .getIdleTimeInMillis(IdleStatus.READER_IDLE),
- IdleStatus.READER_IDLE, Math.max(session.getLastReadTime(),
- session.getLastIdleTime(IdleStatus.READER_IDLE)));
- notifyIdleness0(session, currentTime, session.getConfig()
- .getIdleTimeInMillis(IdleStatus.WRITER_IDLE),
- IdleStatus.WRITER_IDLE, Math.max(session.getLastWriteTime(),
- session.getLastIdleTime(IdleStatus.WRITER_IDLE)));
-
- notifyWriteTimeout(session, currentTime, session.getConfig()
- .getWriteTimeoutInMillis(), session.getLastWriteTime());
- }
-
- private void notifyIdleness0(APRSessionImpl session, long currentTime,
- long idleTime, IdleStatus status, long lastIoTime) {
- if (idleTime > 0 && lastIoTime != 0
- && (currentTime - lastIoTime) >= idleTime) {
- session.increaseIdleCount(status);
- session.getFilterChain().fireSessionIdle(session, status);
+ public AbstractIoSession next() {
+
+ AbstractIoSession sess=managedSessions.get(pollResult[index*2]);
+ index++;
+ return sess;
}
- }
- private void notifyWriteTimeout(APRSessionImpl session, long currentTime,
- long writeTimeout, long lastIoTime) {
- // TODO : I understand nothing here :)
- }
-
- private IoServiceListenerSupport getServiceListeners(IoSession session) {
- IoService service = session.getService();
- if (service instanceof APRConnector) {
- return ((APRConnector) service).getListeners();
- } else
- return null;
- }
-
- private class Worker implements Runnable {
-
- public void run() {
- Thread.currentThread().setName(APRIoProcessor.this.threadName);
-
- for (;;) {
-
- try {
-
- // pop new sessions
- doAddNew();
-
- if (socketCount < 1) {
- return; // no need to poll an empty pollset
- }
-
- // TODO : doUpdateTrafficMask();
-
- /* is it OK ? : Two times size of the created pollset */
- long[] desc = new long[socketCount * 2];
-
- /* use 100 milliseconds poll timeout, TODO : parameterize for more latency/CPU usage control*/
- int rv = Poll.poll(pollset, 1000000, desc, false);
- if (rv > 0) {
- for (int n = 0; n < rv; n++) {
- long clientSock = desc[n * 2 + 1];
-
- APRSessionImpl session = managedSessions
- .get(clientSock);
-
- if (!session.isConnected()) {
- clearWriteRequestQueue(session);
- continue;
- }
-
- if ((desc[n * 2] & Poll.APR_POLLIN) > 0)
- read(session);
- if ((desc[n * 2] & Poll.APR_POLLOUT) > 0)
- write(session);
- }
- }
- // doFlush();
- notifyIdleness();
- doRemove();
- } catch (Throwable t) {
- ExceptionMonitor.getInstance().exceptionCaught(t);
-
- try {
- Thread.sleep(1000);
- } catch (InterruptedException e1) {
- ExceptionMonitor.getInstance().exceptionCaught(e1);
- }
- }
- }
+ public void remove() {
+ throw new UnsupportedOperationException("remove");
}
- }
-
- @Override
- protected void finalize() throws Throwable {
- // TODO : necessary I think, need to check APR doc
- Pool.clear(pool);
}
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java?rev=576502&r1=576501&r2=576502&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSession.java Mon Sep 17 09:24:19 2007
@@ -31,10 +31,6 @@
*/
public interface APRSession extends IoSession {
APRSessionConfig getConfig();
-
- InetSocketAddress getRemoteAddress();
-
- InetSocketAddress getLocalAddress();
-
+
InetSocketAddress getServiceAddress();
}
Modified: mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java
URL: http://svn.apache.org/viewvc/mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java?rev=576502&r1=576501&r2=576502&view=diff
==============================================================================
--- mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java (original)
+++ mina/sandbox/jvermillard/apr/src/main/java/org/apache/mina/transport/apr/APRSessionImpl.java Mon Sep 17 09:24:19 2007
@@ -24,15 +24,13 @@
import java.util.Queue;
import org.apache.mina.common.AbstractIoSession;
-import org.apache.mina.common.AbstractIoSessionConfig;
import org.apache.mina.common.ByteBuffer;
+import org.apache.mina.common.DefaultIoFilterChain;
import org.apache.mina.common.DefaultTransportMetadata;
import org.apache.mina.common.IoFilterChain;
import org.apache.mina.common.IoHandler;
+import org.apache.mina.common.IoProcessor;
import org.apache.mina.common.IoService;
-import org.apache.mina.common.IoSession;
-import org.apache.mina.common.IoSessionConfig;
-import org.apache.mina.common.RuntimeIOException;
import org.apache.mina.common.TransportMetadata;
import org.apache.mina.common.WriteRequest;
import org.apache.tomcat.jni.Socket;
@@ -52,7 +50,7 @@
private final APRIoProcessor ioProcessor;
- private final APRFilterChain filterChain;
+ private final IoFilterChain filterChain = new DefaultIoFilterChain(this);
private final Queue<WriteRequest> writeRequestQueue;
@@ -75,7 +73,6 @@
InetSocketAddress remoteAddress, InetSocketAddress localAddress) {
this.service = service;
this.ioProcessor = ioProcessor;
- this.filterChain = new APRFilterChain(this);
this.writeRequestQueue = new LinkedList<WriteRequest>();
this.handler = service.getHandler();
this.remoteAddress = remoteAddress;
@@ -87,11 +84,6 @@
return socket;
}
- @Override
- protected void updateTrafficMask() {
- // TODO : this.ioProcessor.updateTrafficMask( this );
- }
-
public APRSessionConfig getConfig() {
return config;
}
@@ -108,10 +100,6 @@
return remoteAddress;
}
- Queue<WriteRequest> getWriteRequestQueue() {
- return writeRequestQueue;
- }
-
public IoFilterChain getFilterChain() {
return filterChain;
@@ -121,33 +109,6 @@
return handler;
}
- public int getScheduledWriteMessages() {
- synchronized (writeRequestQueue) {
- return writeRequestQueue.size();
- }
- }
-
- protected void write0(WriteRequest writeRequest) {
- filterChain.fireFilterWrite(this, writeRequest);
- }
-
- public long getScheduledWriteBytes() {
- int size = 0;
- synchronized (writeRequestQueue) {
- for (Object o : writeRequestQueue) {
- if (o instanceof ByteBuffer) {
- size += ((ByteBuffer) o).remaining();
- }
- }
- }
-
- return size;
- }
-
- @Override
- protected void close0() {
- filterChain.fireFilterClose(this);
- }
public IoService getService() {
return service;
@@ -157,15 +118,15 @@
return ioProcessor;
}
+ public TransportMetadata getTransportMetadata() {
+ return METADATA;
+ }
+
@Override
public InetSocketAddress getServiceAddress() {
return (InetSocketAddress) super.getServiceAddress();
}
- public TransportMetadata getTransportMetadata() {
- return METADATA;
- }
-
private class APRSessionConfigImpl extends AbstractAPRSessionConfig
implements APRSessionConfig {
@@ -260,5 +221,11 @@
Socket.optSet(getAPRSocket(), Socket.APR_SO_RCVBUF, size);
}
+ }
+
+ @Override
+ protected IoProcessor getProcessor() {
+ // TODO Auto-generated method stub
+ return null;
}
}